Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

RAIDb1ec.java

00001 /**
00002  * C-JDBC: Clustered JDBC.
00003  * Copyright (C) 2002-2004 French National Institute For Research In Computer
00004  * Science And Control (INRIA).
00005  * Contact: c-jdbc@objectweb.org
00006  * 
00007  * This library is free software; you can redistribute it and/or modify it
00008  * under the terms of the GNU Lesser General Public License as published by the
00009  * Free Software Foundation; either version 2.1 of the License, or any later
00010  * version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but WITHOUT
00013  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00014  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
00015  * for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public License
00018  * along with this library; if not, write to the Free Software Foundation,
00019  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
00020  *
00021  * Initial developer(s): Emmanuel Cecchet.
00022  * Contributor(s): ______________________.
00023  */
00024 
00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb1;
00026 
00027 import java.sql.SQLException;
00028 import java.util.ArrayList;
00029 
00030 import org.objectweb.cjdbc.common.i18n.Translate;
00031 import org.objectweb.cjdbc.common.log.Trace;
00032 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00033 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00034 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00035 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
00036 import org.objectweb.cjdbc.controller.loadbalancer.policies.errorchecking.ErrorCheckingPolicy;
00037 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00038 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00039 
00040 /**
00041  * RAIDb-1 load balancer.
00042  * <p>
00043  * This class is an abstract call because the read requests coming from the
00044  * request manager are NOT treated here but in the subclasses. Transaction
00045  * management and write requests are broadcasted to all backends.
00046  * 
00047  * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
00048  * @version 1.0
00049  */
00050 public abstract class RAIDb1ec extends RAIDb1
00051 {
00052   /*
00053    * How the code is organized ? 1. Member variables 2. Constructor(s) 3.
00054    * Request handling 4. Transaction handling 5. Backend management
00055    */
00056 
00057   protected ArrayList           backendReadThreads;
00058   protected int                 nbOfConcurrentReads;
00059   protected ErrorCheckingPolicy errorCheckingPolicy;
00060 
00061   protected static Trace        logger = Trace
00062                                            .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb1ec");
00063 
00064   /*
00065    * Constructors
00066    */
00067 
00068   /**
00069    * Creates a new RAIDb-1 Round Robin request load balancer. A new backend
00070    * worker thread is created for each backend.
00071    * 
00072    * @param vdb the virtual database this load balancer belongs to
00073    * @param waitForCompletionPolicy how many backends must complete before
00074    *                    returning the result?
00075    * @param errorCheckingPolicy policy to apply for error checking.
00076    * @param nbOfConcurrentReads number of concurrent reads allowed
00077    * @exception Exception if an error occurs
00078    */
00079   public RAIDb1ec(VirtualDatabase vdb,
00080       WaitForCompletionPolicy waitForCompletionPolicy,
00081       ErrorCheckingPolicy errorCheckingPolicy, int nbOfConcurrentReads)
00082       throws Exception
00083   {
00084     super(vdb, waitForCompletionPolicy);
00085     backendReadThreads = new ArrayList();
00086     this.errorCheckingPolicy = errorCheckingPolicy;
00087     this.nbOfConcurrentReads = nbOfConcurrentReads;
00088   }
00089 
00090   /*
00091    * Backends management
00092    */
00093 
00094   /**
00095    * Enables a backend that was previously disabled.
00096    * <p>
00097    * Ask the corresponding connection manager to initialize the connections if
00098    * needed.
00099    * <p>
00100    * No sanity checks are performed by this function.
00101    * 
00102    * @param db the database backend to enable
00103    * @param writeEnabled True if the backend must be enabled for writes
00104    * @throws SQLException if an error occurs
00105    */
00106   public void enableBackend(DatabaseBackend db, boolean writeEnabled)
00107       throws SQLException
00108   {
00109     // Create 2 worker threads for writes
00110     BackendWorkerThread blockingThread = new BackendWorkerThread(db, this);
00111     BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this);
00112 
00113     // Add first to the blocking thread list
00114     try
00115     {
00116       backendBlockingThreadsRWLock.acquireWrite();
00117     }
00118     catch (InterruptedException e)
00119     {
00120       String msg = Translate.get(
00121           "loadbalancer.backendlist.acquire.writelock.failed", e);
00122       logger.error(msg);
00123       throw new SQLException(msg);
00124     }
00125     backendBlockingThreads.add(blockingThread);
00126     backendBlockingThreadsRWLock.releaseWrite();
00127     blockingThread.start();
00128     logger.info(Translate.get("loadbalancer.backend.workerthread.blocking.add",
00129         db.getName()));
00130 
00131     // Then add to the non-blocking thread list
00132     try
00133     {
00134       backendNonBlockingThreadsRWLock.acquireWrite();
00135     }
00136     catch (InterruptedException e)
00137     {
00138       String msg = Translate.get(
00139           "loadbalancer.backendlist.acquire.writelock.failed", e);
00140       logger.error(msg);
00141       throw new SQLException(msg);
00142     }
00143     backendNonBlockingThreads.add(nonBlockingThread);
00144     backendNonBlockingThreadsRWLock.releaseWrite();
00145     nonBlockingThread.start();
00146     logger.info(Translate.get(
00147         "loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
00148 
00149     if (!db.isInitialized())
00150       db.initializeConnections();
00151     db.enableRead();
00152     if (writeEnabled)
00153       db.enableWrite();
00154   }
00155 
00156   /**
00157    * Disables a backend that was previously enabled.
00158    * <p>
00159    * Ask the corresponding connection manager to finalize the connections if
00160    * needed.
00161    * <p>
00162    * No sanity checks are performed by this function.
00163    * 
00164    * @param db the database backend to disable
00165    * @throws SQLException if an error occurs
00166    */
00167   public synchronized void disableBackend(DatabaseBackend db)
00168       throws SQLException
00169   {
00170     int nbOfThreads = backendBlockingThreads.size();
00171 
00172     // Find the right blocking thread
00173     for (int i = 0; i < nbOfThreads; i++)
00174     {
00175       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
00176           .get(i);
00177       if (thread.getBackend().equals(db))
00178       {
00179         logger.info(Translate.get(
00180             "loadbalancer.backend.workerthread.blocking.remove", db.getName()));
00181 
00182         // Remove it from the backendBlockingThread list
00183         try
00184         {
00185           backendBlockingThreadsRWLock.acquireWrite();
00186         }
00187         catch (InterruptedException e)
00188         {
00189           String msg = Translate.get(
00190               "loadbalancer.backendlist.acquire.writelock.failed", e);
00191           logger.error(msg);
00192           throw new SQLException(msg);
00193         }
00194         backendBlockingThreads.remove(thread);
00195         backendBlockingThreadsRWLock.releaseWrite();
00196 
00197         synchronized (thread)
00198         {
00199           // Kill the thread
00200           thread.addPriorityTask(new KillThreadTask(1, 1));
00201           thread.notify();
00202         }
00203         break;
00204       }
00205     }
00206 
00207     // Find the right non-blocking thread
00208     nbOfThreads = backendNonBlockingThreads.size();
00209     for (int i = 0; i < nbOfThreads; i++)
00210     {
00211       BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
00212           .get(i);
00213       if (thread.getBackend().equals(db))
00214       {
00215         logger.info(Translate.get(
00216             "loadbalancer.backend.workerthread.non.blocking.remove", db
00217                 .getName()));
00218 
00219         // Remove it from the backendNonBlockingThreads list
00220         try
00221         {
00222           backendNonBlockingThreadsRWLock.acquireWrite();
00223         }
00224         catch (InterruptedException e)
00225         {
00226           String msg = Translate.get(
00227               "loadbalancer.backendlist.acquire.writelock.failed", e);
00228           logger.error(msg);
00229           throw new SQLException(msg);
00230         }
00231         backendNonBlockingThreads.remove(thread);
00232         backendNonBlockingThreadsRWLock.releaseWrite();
00233 
00234         synchronized (thread)
00235         {
00236           // Kill the thread
00237           thread.addPriorityTask(new KillThreadTask(1, 1));
00238           thread.notify();
00239         }
00240         break;
00241       }
00242     }
00243 
00244     db.disable();
00245     if (db.isInitialized())
00246       db.finalizeConnections();
00247   }
00248 
00249   /**
00250    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
00251    */
00252   public String getXmlImpl()
00253   {
00254     StringBuffer info = new StringBuffer();
00255     info.append("<" + DatabasesXmlTags.ELT_RAIDb_1ec + " "
00256         + DatabasesXmlTags.ATT_nbOfConcurrentReads + "=\""
00257         + this.nbOfConcurrentReads + "\">");
00258     this.getRaidb1Xml();
00259     if (waitForCompletionPolicy != null)
00260       info.append(waitForCompletionPolicy.getXml());
00261     info.append("</" + DatabasesXmlTags.ELT_RAIDb_1ec + ">");
00262     return info.toString();
00263   }
00264 }

Generated on Mon Apr 11 22:01:33 2005 for C-JDBC by  doxygen 1.3.9.1