Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

RAIDb2ec.java

00001 /**
00002  * C-JDBC: Clustered JDBC.
00003  * Copyright (C) 2002-2004 French National Institute For Research In Computer
00004  * Science And Control (INRIA).
00005  * Contact: c-jdbc@objectweb.org
00006  * 
00007  * This library is free software; you can redistribute it and/or modify it
00008  * under the terms of the GNU Lesser General Public License as published by the
00009  * Free Software Foundation; either version 2.1 of the License, or any later
00010  * version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but WITHOUT
00013  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00014  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
00015  * for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public License
00018  * along with this library; if not, write to the Free Software Foundation,
00019  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
00020  *
00021  * Initial developer(s): Emmanuel Cecchet.
00022  * Contributor(s): ______________________.
00023  */
00024 
00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb2;
00026 
00027 import java.sql.SQLException;
00028 import java.util.ArrayList;
00029 
00030 import org.objectweb.cjdbc.common.i18n.Translate;
00031 import org.objectweb.cjdbc.common.log.Trace;
00032 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00033 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00034 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00035 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
00036 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy;
00037 import org.objectweb.cjdbc.controller.loadbalancer.policies.errorchecking.ErrorCheckingPolicy;
00038 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00039 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00040 
00041 /**
00042  * RAIDb-2ec load balancer.
00043  * <p>
00044  * This class is an abstract call because the read requests coming from the
00045  * request manager are NOT treated here but in the subclasses. This class deals
00046  * with backend enable/disable for backendReadThreads creation/termination.
00047  * 
00048  * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
00049  * @version 1.0
00050  */
00051 public abstract class RAIDb2ec extends RAIDb2
00052 {
00053   /*
00054    * How the code is organized ? 1. Member variables 2. Constructor(s) 3.
00055    * Request handling 4. Transaction handling 5. Backend management
00056    */
00057 
00058   protected ArrayList           backendReadThreads;
00059   protected int                 nbOfConcurrentReads;
00060   protected ErrorCheckingPolicy errorCheckingPolicy;
00061 
00062   protected static Trace        logger = Trace
00063                                            .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb2ec");
00064 
00065   /*
00066    * Constructors
00067    */
00068 
00069   /**
00070    * Creates a new RAIDb-1 Round Robin request load balancer. A new backend
00071    * worker thread is created for each backend.
00072    * 
00073    * @param vdb the virtual database this load balancer belongs to.
00074    * @param waitForCompletionPolicy how many backends must complete before
00075    *          returning the result?
00076    * @param createTablePolicy the policy defining how 'create table' statements
00077    *          should be handled
00078    * @param errorCheckingPolicy policy to apply for error checking.
00079    * @param nbOfConcurrentReads number of concurrent reads allowed
00080    * @exception Exception if an error occurs
00081    */
00082   public RAIDb2ec(VirtualDatabase vdb,
00083       WaitForCompletionPolicy waitForCompletionPolicy,
00084       CreateTablePolicy createTablePolicy,
00085       ErrorCheckingPolicy errorCheckingPolicy, int nbOfConcurrentReads)
00086       throws Exception
00087   {
00088     super(vdb, waitForCompletionPolicy, createTablePolicy);
00089     backendReadThreads = new ArrayList();
00090     this.errorCheckingPolicy = errorCheckingPolicy;
00091     this.nbOfConcurrentReads = nbOfConcurrentReads;
00092   }
00093 
00094   /*
00095    * Backends management
00096    */
00097 
00098   /**
00099    * Enables a backend that was previously disabled.
00100    * <p>
00101    * Ask the corresponding connection manager to initialize the connections if
00102    * needed.
00103    * <p>
00104    * No sanity checks are performed by this function.
00105    * 
00106    * @param db the database backend to enable
00107    * @param writeEnabled True if the backend must be enabled for writes
00108    * @throws SQLException if an error occurs
00109    */
00110   public void enableBackend(DatabaseBackend db, boolean writeEnabled)
00111       throws SQLException
00112   {
00113     // Create 2 worker threads for writes
00114     BackendWorkerThread blockingThread = new BackendWorkerThread(db, this);
00115     BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this);
00116 
00117     // Add first to the blocking thread list
00118     try
00119     {
00120       backendBlockingThreadsRWLock.acquireWrite();
00121     }
00122     catch (InterruptedException e)
00123     {
00124       String msg = Translate.get(
00125           "loadbalancer.backendlist.acquire.writelock.failed", e);
00126       logger.error(msg);
00127       throw new SQLException(msg);
00128     }
00129     backendBlockingThreads.add(blockingThread);
00130     backendBlockingThreadsRWLock.releaseWrite();
00131     blockingThread.start();
00132     logger.info(Translate.get("loadbalancer.backend.workerthread.blocking.add",
00133         db.getName()));
00134 
00135     // Then add to the non-blocking thread list
00136     try
00137     {
00138       backendNonBlockingThreadsRWLock.acquireWrite();
00139     }
00140     catch (InterruptedException e)
00141     {
00142       String msg = Translate.get(
00143           "loadbalancer.backendlist.acquire.writelock.failed", e);
00144       logger.error(msg);
00145       throw new SQLException(msg);
00146     }
00147     backendNonBlockingThreads.add(nonBlockingThread);
00148     backendNonBlockingThreadsRWLock.releaseWrite();
00149     nonBlockingThread.start();
00150     logger.info(Translate.get(
00151         "loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
00152 
00153     if (!db.isInitialized())
00154       db.initializeConnections();
00155     db.enableRead();
00156     if (writeEnabled)
00157       db.enableWrite();
00158   }
00159 
00160   /**
00161    * Disables a backend that was previously enabled.
00162    * <p>
00163    * Ask the corresponding connection manager to finalize the connections if
00164    * needed.
00165    * <p>
00166    * No sanity checks are performed by this function.
00167    * 
00168    * @param db the database backend to disable
00169    * @throws SQLException if an error occurs
00170    */
00171   public synchronized void disableBackend(DatabaseBackend db)
00172       throws SQLException
00173   {
00174     int nbOfThreads = backendBlockingThreads.size();
00175 
00176     // Find the right blocking thread
00177     for (int i = 0; i < nbOfThreads; i++)
00178     {
00179       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
00180           .get(i);
00181       if (thread.getBackend().equals(db))
00182       {
00183         logger.info(Translate.get(
00184             "loadbalancer.backend.workerthread.blocking.remove", db.getName()));
00185 
00186         // Remove it from the backendBlockingThread list
00187         try
00188         {
00189           backendBlockingThreadsRWLock.acquireWrite();
00190         }
00191         catch (InterruptedException e)
00192         {
00193           String msg = Translate.get(
00194               "loadbalancer.backendlist.acquire.writelock.failed", e);
00195           logger.error(msg);
00196           throw new SQLException(msg);
00197         }
00198         backendBlockingThreads.remove(thread);
00199         backendBlockingThreadsRWLock.releaseWrite();
00200 
00201         synchronized (thread)
00202         {
00203           // Kill the thread
00204           thread.addPriorityTask(new KillThreadTask(1, 1));
00205           thread.notify();
00206         }
00207         break;
00208       }
00209     }
00210 
00211     // Find the right non-blocking thread
00212     nbOfThreads = backendNonBlockingThreads.size();
00213     for (int i = 0; i < nbOfThreads; i++)
00214     {
00215       BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
00216           .get(i);
00217       if (thread.getBackend().equals(db))
00218       {
00219         logger.info(Translate.get(
00220             "loadbalancer.backend.workerthread.non.blocking.remove", db
00221                 .getName()));
00222 
00223         // Remove it from the backendNonBlockingThreads list
00224         try
00225         {
00226           backendNonBlockingThreadsRWLock.acquireWrite();
00227         }
00228         catch (InterruptedException e)
00229         {
00230           String msg = Translate.get(
00231               "loadbalancer.backendlist.acquire.writelock.failed", e);
00232           logger.error(msg);
00233           throw new SQLException(msg);
00234         }
00235         backendNonBlockingThreads.remove(thread);
00236         backendNonBlockingThreadsRWLock.releaseWrite();
00237 
00238         synchronized (thread)
00239         {
00240           // Kill the thread
00241           thread.addPriorityTask(new KillThreadTask(1, 1));
00242           thread.notify();
00243         }
00244         break;
00245       }
00246     }
00247 
00248     db.disable();
00249     if (db.isInitialized())
00250       db.finalizeConnections();
00251   }
00252 
00253   /**
00254    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
00255    */
00256   public String getXmlImpl()
00257   {
00258     StringBuffer info = new StringBuffer();
00259     info.append("<" + DatabasesXmlTags.ELT_RAIDb_2ec + " "
00260         + DatabasesXmlTags.ATT_nbOfConcurrentReads + "=\""
00261         + this.nbOfConcurrentReads + "\">");
00262     this.getRaidb2Xml();
00263     if (waitForCompletionPolicy != null)
00264       info.append(waitForCompletionPolicy.getXml());
00265     info.append("</" + DatabasesXmlTags.ELT_RAIDb_2ec + ">");
00266     return info.toString();
00267   }
00268 }

Generated on Mon Apr 11 22:01:34 2005 for C-JDBC by  doxygen 1.3.9.1