Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

RAIDb1.java

00001 /**
00002  * C-JDBC: Clustered JDBC.
00003  * Copyright (C) 2002-2005 French National Institute For Research In Computer
00004  * Science And Control (INRIA).
00005  * Contact: c-jdbc@objectweb.org
00006  * 
00007  * This library is free software; you can redistribute it and/or modify it
00008  * under the terms of the GNU Lesser General Public License as published by the
00009  * Free Software Foundation; either version 2.1 of the License, or any later
00010  * version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but WITHOUT
00013  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00014  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
00015  * for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public License
00018  * along with this library; if not, write to the Free Software Foundation,
00019  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
00020  *
00021  * Initial developer(s): Emmanuel Cecchet 
00022  * Contributor(s): ______________________
00023  */
00024 
00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb1;
00026 
00027 import java.sql.Connection;
00028 import java.sql.SQLException;
00029 import java.util.ArrayList;
00030 
00031 import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
00032 import org.objectweb.cjdbc.common.exceptions.NoMoreBackendException;
00033 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException;
00034 import org.objectweb.cjdbc.common.exceptions.SQLExceptionFactory;
00035 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
00036 import org.objectweb.cjdbc.common.i18n.Translate;
00037 import org.objectweb.cjdbc.common.log.Trace;
00038 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00039 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00040 import org.objectweb.cjdbc.common.sql.SelectRequest;
00041 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00042 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock;
00043 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00044 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00045 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
00046 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
00047 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00048 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00049 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00050 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
00051 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
00052 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask;
00053 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00054 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask;
00055 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask;
00056 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestTask;
00057 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestWithKeysTask;
00058 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask;
00059 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00060 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00061 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00062 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00063 
00064 /**
00065  * RAIDb-1 load balancer.
00066  * <p>
00067  * This class is an abstract call because the read requests coming from the
00068  * request controller are NOT treated here but in the subclasses. Transaction
00069  * management and write requests are broadcasted to all backends.
00070  * 
00071  * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
00072  * @version 1.0
00073  */
00074 public abstract class RAIDb1 extends AbstractLoadBalancer
00075 {
00076   //
00077   // How the code is organized ?
00078   //
00079   // 1. Member variables
00080   // 2. Constructor(s)
00081   // 3. Request handling
00082   // 4. Transaction handling
00083   // 5. Backend management
00084   //
00085 
00086   /**
00087    * List of <code>BackendWorkerThread</code> that executes possibly blocking
00088    * queries
00089    */
00090   protected ArrayList                   backendBlockingThreads;
00091   /**
00092    * List of <code>BackendWorkerThread</code> that executes non-blocking
00093    * queries
00094    */
00095   protected ArrayList                   backendNonBlockingThreads;
00096   /** Lock on backendBlockingThreads list */
00097   protected ReadPrioritaryFIFOWriteLock backendBlockingThreadsRWLock    = new ReadPrioritaryFIFOWriteLock();
00098   /** Lock on backendNonBlockingThreads list */
00099   protected ReadPrioritaryFIFOWriteLock backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock();
00100   /** Should we wait for all backends to commit before returning ? */
00101   protected WaitForCompletionPolicy     waitForCompletionPolicy;
00102 
00103   protected static Trace                logger                          = Trace
00104                                                                             .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb1");
00105 
00106   /*
00107    * Constructors
00108    */
00109 
00110   /**
00111    * Creates a new RAIDb-1 Round Robin request load balancer. A new backend
00112    * worker thread is created for each backend.
00113    * 
00114    * @param vdb the virtual database this load balancer belongs to.
00115    * @param waitForCompletionPolicy How many backends must complete before
00116    *          returning the result?
00117    * @exception Exception if an error occurs
00118    */
00119   public RAIDb1(VirtualDatabase vdb,
00120       WaitForCompletionPolicy waitForCompletionPolicy) throws Exception
00121   {
00122     super(vdb, RAIDbLevels.RAIDb1, ParsingGranularities.NO_PARSING);
00123     this.waitForCompletionPolicy = waitForCompletionPolicy;
00124     backendBlockingThreads = new ArrayList();
00125     backendNonBlockingThreads = new ArrayList();
00126   }
00127 
00128   /*
00129    * Request Handling
00130    */
00131 
00132   /**
00133    * Returns the number of nodes to wait for according to the defined
00134    * <code>waitForCompletion</code> policy.
00135    * 
00136    * @param nbOfThreads total number of threads
00137    * @return int number of threads to wait for
00138    */
00139   private int getNbToWait(int nbOfThreads)
00140   {
00141     int nbToWait;
00142     switch (waitForCompletionPolicy.getPolicy())
00143     {
00144       case WaitForCompletionPolicy.FIRST :
00145         nbToWait = 1;
00146         break;
00147       case WaitForCompletionPolicy.MAJORITY :
00148         nbToWait = nbOfThreads / 2 + 1;
00149         break;
00150       default :
00151         logger
00152             .warn(Translate.get("loadbalancer.waitforcompletion.unsupported"));
00153       case WaitForCompletionPolicy.ALL :
00154         nbToWait = nbOfThreads;
00155         break;
00156     }
00157     return nbToWait;
00158   }
00159 
00160   /**
00161    * @see AbstractLoadBalancer#execReadRequest(SelectRequest, MetadataCache)
00162    */
00163   public abstract ControllerResultSet execReadRequest(SelectRequest request,
00164       MetadataCache metadataCache) throws SQLException;
00165 
00166   /**
00167    * Execute a read request on the selected backend.
00168    * 
00169    * @param request the request to execute
00170    * @param backend the backend that will execute the request
00171    * @param metadataCache the metadataCache if any or null
00172    * @return the ResultSet
00173    * @throws SQLException if an error occurs
00174    */
00175   protected ControllerResultSet executeRequestOnBackend(SelectRequest request,
00176       DatabaseBackend backend, MetadataCache metadataCache)
00177       throws SQLException, UnreachableBackendException
00178   {
00179     // Handle macros
00180     handleMacros(request);
00181 
00182     // Ok, we have a backend, let's execute the request
00183     AbstractConnectionManager cm = backend.getConnectionManager(request
00184         .getLogin());
00185 
00186     // Sanity check
00187     if (cm == null)
00188     {
00189       String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00190           new String[]{request.getLogin(), backend.getName()});
00191       logger.error(msg);
00192       throw new SQLException(msg);
00193     }
00194 
00195     // Execute the query
00196     if (request.isAutoCommit())
00197     {
00198       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00199         // We could do something finer grain here by waiting
00200         // only for writes that depend on the tables we need
00201         // but is that really worth the overhead ?
00202         waitForAllWritesToComplete(backend);
00203 
00204       ControllerResultSet rs = null;
00205       boolean badConnection;
00206       do
00207       {
00208         badConnection = false;
00209         // Use a connection just for this request
00210         Connection c = null;
00211         try
00212         {
00213           c = cm.getConnection();
00214         }
00215         catch (UnreachableBackendException e1)
00216         {
00217           logger.error(Translate.get(
00218               "loadbalancer.backend.disabling.unreachable", backend.getName()));
00219           disableBackend(backend);
00220           throw new UnreachableBackendException(Translate.get(
00221               "loadbalancer.backend.unreacheable", backend.getName()));
00222         }
00223 
00224         // Sanity check
00225         if (c == null)
00226           throw new SQLException(Translate.get(
00227               "loadbalancer.backend.no.connection", backend.getName()));
00228 
00229         // Execute Query
00230         try
00231         {
00232           rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00233           cm.releaseConnection(c);
00234         }
00235         catch (SQLException e)
00236         {
00237           cm.releaseConnection(c);
00238           throw SQLExceptionFactory.getSQLException(e, Translate.get(
00239               "loadbalancer.request.failed.on.backend", new String[]{
00240                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00241                   backend.getName(), e.getMessage()}));
00242         }
00243         catch (BadConnectionException e)
00244         { // Get rid of the bad connection
00245           cm.deleteConnection(c);
00246           badConnection = true;
00247         }
00248       }
00249       while (badConnection);
00250       if (logger.isDebugEnabled())
00251         logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00252             String.valueOf(request.getId()), backend.getName()}));
00253       return rs;
00254     }
00255     else
00256     { // Inside a transaction
00257       Connection c;
00258       long tid = request.getTransactionId();
00259       Long lTid = new Long(tid);
00260 
00261       // Wait for previous writes to complete
00262       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00263         waitForAllWritesToComplete(backend, request.getTransactionId());
00264 
00265       try
00266       {
00267         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00268       }
00269       catch (UnreachableBackendException e1)
00270       {
00271         logger.error(Translate.get(
00272             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00273         disableBackend(backend);
00274         throw new SQLException(Translate.get(
00275             "loadbalancer.backend.unreacheable", backend.getName()));
00276       }
00277       catch (NoTransactionStartWhenDisablingException e)
00278       {
00279         String msg = Translate.get("loadbalancer.backend.is.disabling",
00280             new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00281                 backend.getName()});
00282         logger.error(msg);
00283         throw new SQLException(msg);
00284       }
00285 
00286       // Sanity check
00287       if (c == null)
00288         throw new SQLException(Translate.get(
00289             "loadbalancer.unable.retrieve.connection", new String[]{
00290                 String.valueOf(tid), backend.getName()}));
00291 
00292       // Execute Query
00293       ControllerResultSet rs = null;
00294       try
00295       {
00296         rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00297       }
00298       catch (SQLException e)
00299       {
00300         throw SQLExceptionFactory.getSQLException(e, Translate.get(
00301             "loadbalancer.request.failed.on.backend", new String[]{
00302                 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00303                 backend.getName(), e.getMessage()}));
00304       }
00305       catch (BadConnectionException e)
00306       { // Connection failed, so did the transaction
00307         // Disable the backend.
00308         cm.deleteConnection(tid);
00309         String msg = Translate.get(
00310             "loadbalancer.backend.disabling.connection.failure", backend
00311                 .getName());
00312         logger.error(msg);
00313         disableBackend(backend);
00314         throw new SQLException(msg);
00315       }
00316       if (logger.isDebugEnabled())
00317         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00318             new String[]{String.valueOf(tid), String.valueOf(request.getId()),
00319                 backend.getName()}));
00320       return rs;
00321     }
00322   }
00323 
00324   /**
00325    * Performs a write request. This request is broadcasted to all nodes.
00326    * 
00327    * @param request an <code>AbstractWriteRequest</code>
00328    * @return number of rows affected by the request
00329    * @throws AllBackendsFailedException if all backends failed to execute the
00330    *           request
00331    * @exception SQLException if an error occurs
00332    * @exception NoMoreBackendException if no backends left to execute the
00333    *              request
00334    */
00335   public int execWriteRequest(AbstractWriteRequest request)
00336       throws AllBackendsFailedException, NoMoreBackendException, SQLException
00337   {
00338     return ((WriteRequestTask) execWriteRequest(request, false, null))
00339         .getResult();
00340   }
00341 
00342   /**
00343    * Perform a write request and return the auto generated keys.
00344    * 
00345    * @param request the request to execute
00346    * @param metadataCache the metadataCache if any or null
00347    * @return auto generated keys.
00348    * @throws AllBackendsFailedException if all backends failed to execute the
00349    *           request
00350    * @exception SQLException if an error occurs
00351    */
00352   public ControllerResultSet execWriteRequestWithKeys(
00353       AbstractWriteRequest request, MetadataCache metadataCache)
00354       throws AllBackendsFailedException, SQLException
00355   {
00356     return ((WriteRequestWithKeysTask) execWriteRequest(request, true,
00357         metadataCache)).getResult();
00358   }
00359 
00360   /**
00361    * Common code for execWriteRequest(AbstractWriteRequest) and
00362    * execWriteRequestWithKeys(AbstractWriteRequest). The result is given back in
00363    * AbstractTask.getResult().
00364    * 
00365    * @param request the request to execute
00366    * @param useKeys true if this must give an auto generated keys ResultSet
00367    * @param metadataCache the metadataCache if any or null
00368    * @throws AllBackendsFailedException if all backends failed to execute the
00369    *           request
00370    * @throws SQLException if an error occurs
00371    */
00372   private AbstractTask execWriteRequest(AbstractWriteRequest request,
00373       boolean useKeys, MetadataCache metadataCache)
00374       throws AllBackendsFailedException, NoMoreBackendException, SQLException
00375   {
00376     ArrayList backendThreads;
00377     ReadPrioritaryFIFOWriteLock lock;
00378 
00379     // Handle macros
00380     handleMacros(request);
00381 
00382     // Determine which list (blocking or not) to use
00383     if (request.mightBlock())
00384     { // Blocking
00385       backendThreads = backendBlockingThreads;
00386       lock = backendBlockingThreadsRWLock;
00387     }
00388     else
00389     { // Non-blocking
00390       backendThreads = backendNonBlockingThreads;
00391       lock = backendNonBlockingThreadsRWLock;
00392       if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00393           && (request.getTransactionId() != 0))
00394         waitForAllWritesToComplete(request.getTransactionId());
00395       // else all previous requests for this transaction have already completed
00396       // since the policy is to wait for all backends to complete and there is
00397       // no risk of an asynchrony here.
00398     }
00399 
00400     try
00401     {
00402       lock.acquireWrite();
00403     }
00404     catch (InterruptedException e)
00405     {
00406       String msg = Translate.get(
00407           "loadbalancer.backendlist.acquire.writelock.failed", e);
00408       logger.error(msg);
00409       throw new SQLException(msg);
00410     }
00411 
00412     // Note that the backendThreads list is only supposed to contain enabled
00413     // backends. When a backend is disabled, its backendWorkerThread is removed
00414     // from the list.
00415     int nbOfThreads = backendThreads.size();
00416     if (nbOfThreads == 0)
00417     {
00418       lock.releaseWrite();
00419       throw new NoMoreBackendException(Translate
00420           .get("loadbalancer.backendlist.empty"));
00421     }
00422     else
00423     {
00424       if (logger.isDebugEnabled())
00425         logger.debug(Translate.get("loadbalancer.execute.on.several",
00426             new String[]{String.valueOf(request.getId()),
00427                 String.valueOf(nbOfThreads)}));
00428     }
00429 
00430     // Create the task
00431     AbstractTask task;
00432     if (useKeys)
00433       task = new WriteRequestWithKeysTask(getNbToWait(nbOfThreads),
00434           nbOfThreads, request, metadataCache);
00435     else
00436       task = new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads,
00437           request);
00438 
00439     synchronized (task)
00440     {
00441       // We have to first post the request on each backend before letting the
00442       // first backend to execute the request. Therefore we have 2 phases:
00443       // 1. post the task in each thread queue
00444       // 2. notify each thread to execute the query
00445 
00446       // 1. Post the task
00447       if (request.isAutoCommit())
00448       {
00449         for (int i = 0; i < nbOfThreads; i++)
00450         {
00451           BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00452               .get(i);
00453           synchronized (thread)
00454           {
00455             thread.addTask(task);
00456           }
00457         }
00458       }
00459       else
00460       {
00461         for (int i = 0; i < nbOfThreads; i++)
00462         {
00463           BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00464               .get(i);
00465           synchronized (thread)
00466           {
00467             thread.addTask(task, request.getTransactionId());
00468           }
00469         }
00470       }
00471 
00472       // 2. Start the task execution on each backend
00473       for (int i = 0; i < nbOfThreads; i++)
00474       {
00475         BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00476             .get(i);
00477         synchronized (thread)
00478         {
00479           thread.notify();
00480         }
00481       }
00482 
00483       lock.releaseWrite();
00484 
00485       // Wait for completion (notified by the task)
00486       try
00487       {
00488         // Wait on task
00489         long timeout = request.getTimeout() * 1000;
00490         if (timeout > 0)
00491         {
00492           long start = System.currentTimeMillis();
00493           task.wait(timeout);
00494           long end = System.currentTimeMillis();
00495           long remaining = timeout - (end - start);
00496           if (remaining <= 0)
00497           {
00498             if (task.setExpiredTimeout())
00499             { // Task will be ignored by all backends
00500               String msg = Translate.get("loadbalancer.request.timeout",
00501                   new String[]{String.valueOf(request.getId()),
00502                       String.valueOf(task.getSuccess()),
00503                       String.valueOf(task.getFailed())});
00504 
00505               logger.warn(msg);
00506               throw new SQLException(msg);
00507             }
00508             // else task execution already started, to late to cancel
00509           }
00510           // No need to update request timeout since the execution is finished
00511         }
00512         else
00513           task.wait();
00514       }
00515       catch (InterruptedException e)
00516       {
00517         if (task.setExpiredTimeout())
00518         { // Task will be ignored by all backends
00519           String msg = Translate.get("loadbalancer.request.timeout",
00520               new String[]{String.valueOf(request.getId()),
00521                   String.valueOf(task.getSuccess()),
00522                   String.valueOf(task.getFailed())});
00523 
00524           logger.warn(msg);
00525           throw new SQLException(msg);
00526         }
00527         // else task execution already started, to late to cancel
00528       }
00529 
00530       if (task.getSuccess() > 0)
00531         return task;
00532       else
00533       { // All tasks failed
00534         ArrayList exceptions = task.getExceptions();
00535         if (exceptions == null)
00536           throw new AllBackendsFailedException(Translate.get(
00537               "loadbalancer.request.failed.all", request.getId()));
00538         else
00539         {
00540           String errorMsg = Translate.get("loadbalancer.request.failed.stack",
00541               request.getId())
00542               + "\n";
00543           SQLException ex = SQLExceptionFactory.getSQLException(exceptions,
00544               errorMsg);
00545           logger.error(ex.getMessage());
00546           throw ex;
00547         }
00548       }
00549     }
00550   }
00551 
00552   /**
00553    * Execute a stored procedure on the selected backend.
00554    * 
00555    * @param proc the stored procedure to execute
00556    * @param backend the backend that will execute the request
00557    * @param metadataCache the metadataCache if any or null
00558    * @return the ResultSet
00559    * @throws SQLException if an error occurs
00560    */
00561   protected ControllerResultSet executeStoredProcedureOnBackend(
00562       StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache)
00563       throws SQLException, UnreachableBackendException
00564   {
00565     // Handle macros
00566     handleMacros(proc);
00567 
00568     // Ok, we have a backend, let's execute the request
00569     AbstractConnectionManager cm = backend
00570         .getConnectionManager(proc.getLogin());
00571 
00572     // Sanity check
00573     if (cm == null)
00574     {
00575       String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00576           new String[]{proc.getLogin(), backend.getName()});
00577       logger.error(msg);
00578       throw new SQLException(msg);
00579     }
00580 
00581     // Execute the query
00582     if (proc.isAutoCommit())
00583     {
00584       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00585         // We could do something finer grain here by waiting
00586         // only for writes that depend on the tables we need
00587         // but is that really worth the overhead ?
00588         waitForAllWritesToComplete(backend);
00589 
00590       // Use a connection just for this request
00591       Connection c = null;
00592       try
00593       {
00594         c = cm.getConnection();
00595       }
00596       catch (UnreachableBackendException e1)
00597       {
00598         logger.error(Translate.get(
00599             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00600         disableBackend(backend);
00601         throw new UnreachableBackendException(Translate.get(
00602             "loadbalancer.backend.unreacheable", backend.getName()));
00603       }
00604 
00605       // Sanity check
00606       if (c == null)
00607         throw new UnreachableBackendException(Translate.get(
00608             "loadbalancer.backend.no.connection", backend.getName()));
00609 
00610       // Execute Query
00611       ControllerResultSet rs = null;
00612       try
00613       {
00614         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00615             backend, c, metadataCache);
00616       }
00617       catch (Exception e)
00618       {
00619         throw new SQLException(Translate.get(
00620             "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00621                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00622                 backend.getName(), e.getMessage()}));
00623       }
00624       finally
00625       {
00626         cm.releaseConnection(c);
00627       }
00628       if (logger.isDebugEnabled())
00629         logger.debug(Translate.get("loadbalancer.storedprocedure.on",
00630             new String[]{String.valueOf(proc.getId()), backend.getName()}));
00631       return rs;
00632     }
00633     else
00634     { // Inside a transaction
00635       Connection c;
00636       long tid = proc.getTransactionId();
00637       Long lTid = new Long(tid);
00638 
00639       // Wait for previous writes to complete
00640       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00641         waitForAllWritesToComplete(backend, proc.getTransactionId());
00642 
00643       try
00644       {
00645         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00646       }
00647       catch (UnreachableBackendException e1)
00648       {
00649         logger.error(Translate.get(
00650             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00651         disableBackend(backend);
00652         throw new SQLException(Translate.get(
00653             "loadbalancer.backend.unreacheable", backend.getName()));
00654       }
00655       catch (NoTransactionStartWhenDisablingException e)
00656       {
00657         String msg = Translate.get("loadbalancer.backend.is.disabling",
00658             new String[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00659                 backend.getName()});
00660         logger.error(msg);
00661         throw new SQLException(msg);
00662       }
00663 
00664       // Sanity check
00665       if (c == null)
00666         throw new SQLException(Translate.get(
00667             "loadbalancer.unable.retrieve.connection", new String[]{
00668                 String.valueOf(tid), backend.getName()}));
00669 
00670       // Execute Query
00671       ControllerResultSet rs;
00672       try
00673       {
00674         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00675             backend, c, metadataCache);
00676       }
00677       catch (Exception e)
00678       {
00679         throw new SQLException(Translate.get(
00680             "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00681                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00682                 backend.getName(), e.getMessage()}));
00683       }
00684       if (logger.isDebugEnabled())
00685         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00686             new String[]{String.valueOf(tid), String.valueOf(proc.getId()),
00687                 backend.getName()}));
00688       return rs;
00689     }
00690   }
00691 
00692   /**
00693    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execReadStoredProcedure(StoredProcedure,
00694    *      MetadataCache)
00695    */
00696   public ControllerResultSet execReadStoredProcedure(StoredProcedure proc,
00697       MetadataCache metadataCache) throws SQLException
00698   {
00699     ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure(
00700         proc, true, metadataCache);
00701     return task.getResult();
00702   }
00703 
00704   /**
00705    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execWriteStoredProcedure(org.objectweb.cjdbc.common.sql.StoredProcedure)
00706    */
00707   public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException
00708   {
00709     WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure(
00710         proc, false, null);
00711     return task.getResult();
00712   }
00713 
00714   /**
00715    * Post the stored procedure call in the threads task list.
00716    * 
00717    * @param proc the stored procedure to call
00718    * @param isRead true if the call returns a ResultSet
00719    * @param metadataCache the metadataCache if any or null
00720    * @return the task that has been executed (caller can get the result by
00721    *         calling getResult())
00722    * @throws SQLException if an error occurs
00723    */
00724   private AbstractTask callStoredProcedure(StoredProcedure proc,
00725       boolean isRead, MetadataCache metadataCache) throws SQLException
00726   {
00727     ArrayList backendThreads = backendBlockingThreads;
00728     ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock;
00729 
00730     try
00731     {
00732       // Note that a read stored procedure here is supposed to also execute
00733       // writes and as the scheduler cannot block atomically on multiple tables
00734       // for a writes, we have to lock as a write even for a read stored
00735       // procedure. A read-only stored procedure will execute in a separate
00736       // method call (see executeReadOnlyStoredProcedure).
00737       lock.acquireWrite();
00738     }
00739     catch (InterruptedException e)
00740     {
00741       String msg;
00742       msg = Translate.get("loadbalancer.backendlist.acquire.writelock.failed",
00743           e);
00744       logger.error(msg);
00745       throw new SQLException(msg);
00746     }
00747 
00748     int nbOfThreads = backendThreads.size();
00749     if (nbOfThreads == 0)
00750     {
00751       lock.releaseWrite();
00752       throw new NoMoreBackendException(Translate
00753           .get("loadbalancer.backendlist.empty"));
00754     }
00755     else
00756     {
00757       if (logger.isDebugEnabled())
00758         logger.debug(Translate.get("loadbalancer.execute.on.several",
00759             new String[]{String.valueOf(proc.getId()),
00760                 String.valueOf(nbOfThreads)}));
00761     }
00762 
00763     // Create the task
00764     AbstractTask task;
00765     if (isRead)
00766       task = new ReadStoredProcedureTask(getNbToWait(nbOfThreads), nbOfThreads,
00767           proc, metadataCache);
00768     else
00769       task = new WriteStoredProcedureTask(getNbToWait(nbOfThreads),
00770           nbOfThreads, proc);
00771 
00772     synchronized (task)
00773     {
00774       // Post the task in each backendThread tasklist and wakeup the threads
00775       for (int i = 0; i < nbOfThreads; i++)
00776       {
00777         BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00778             .get(i);
00779         synchronized (thread)
00780         {
00781           if (proc.isAutoCommit())
00782             thread.addTask(task);
00783           else
00784             thread.addTask(task, proc.getTransactionId());
00785           thread.notify();
00786         }
00787       }
00788 
00789       lock.releaseWrite();
00790 
00791       // Wait for completion (notified by the task)
00792       try
00793       {
00794         // Wait on task
00795         long timeout = proc.getTimeout() * 1000;
00796         if (timeout > 0)
00797         {
00798           long start = System.currentTimeMillis();
00799           task.wait(timeout);
00800           long end = System.currentTimeMillis();
00801           long remaining = timeout - (end - start);
00802           if (remaining <= 0)
00803           {
00804             if (task.setExpiredTimeout())
00805             { // Task will be ignored by all backends
00806               String msg = Translate.get(
00807                   "loadbalancer.storedprocedure.timeout", new String[]{
00808                       String.valueOf(proc.getId()),
00809                       String.valueOf(task.getSuccess()),
00810                       String.valueOf(task.getFailed())});
00811               logger.warn(msg);
00812               throw new SQLException(msg);
00813             }
00814             // else task execution already started, to late to cancel
00815           }
00816           // No need to update request timeout since the execution is finished
00817         }
00818         else
00819           task.wait();
00820       }
00821       catch (InterruptedException e)
00822       {
00823         if (task.setExpiredTimeout())
00824         { // Task will be ignored by all backends
00825           String msg = Translate.get("loadbalancer.storedprocedure.timeout",
00826               new String[]{String.valueOf(proc.getId()),
00827                   String.valueOf(task.getSuccess()),
00828                   String.valueOf(task.getFailed())});
00829           logger.warn(msg);
00830           throw new SQLException(msg);
00831         }
00832         // else task execution already started, to late to cancel
00833       }
00834 
00835       if (task.getSuccess() > 0)
00836         return task;
00837       else
00838       { // All tasks failed
00839         ArrayList exceptions = task.getExceptions();
00840         if (exceptions == null)
00841           throw new SQLException(Translate.get(
00842               "loadbalancer.storedprocedure.all.failed", proc.getId()));
00843         else
00844         {
00845           String errorMsg = Translate.get(
00846               "loadbalancer.storedprocedure.failed.stack", proc.getId())
00847               + "\n";
00848           SQLException ex = SQLExceptionFactory.getSQLException(exceptions,
00849               errorMsg);
00850           logger.error(ex.getMessage());
00851           throw ex;
00852         }
00853       }
00854     }
00855   }
00856 
00857   /*
00858    * Transaction management
00859    */
00860 
00861   /**
00862    * Begins a new transaction.
00863    * 
00864    * @param tm the transaction marker metadata
00865    * @exception SQLException if an error occurs
00866    */
00867   public final void begin(TransactionMarkerMetaData tm) throws SQLException
00868   {
00869   }
00870 
00871   /**
00872    * Commits a transaction.
00873    * 
00874    * @param tm the transaction marker metadata
00875    * @exception SQLException if an error occurs
00876    */
00877   public void commit(TransactionMarkerMetaData tm) throws SQLException
00878   {
00879     long tid = tm.getTransactionId();
00880     Long lTid = new Long(tid);
00881     // List of backends that still have pending queries for the transaction to
00882     // commit
00883     ArrayList asynchronousBackends = null;
00884     CommitTask task = null;
00885 
00886     if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00887     {
00888       try
00889       {
00890         backendBlockingThreadsRWLock.acquireWrite();
00891         // Lock in write to ensure that all writes are posted and we wait in the
00892         // queue, else a read lock has the priority with the implementation we
00893         // are using.
00894       }
00895       catch (InterruptedException e)
00896       {
00897         String msg = Translate.get(
00898             "loadbalancer.backendlist.acquire.writelock.failed", e);
00899         logger.error(msg);
00900         throw new SQLException(msg);
00901       }
00902 
00903       int nbOfThreads = backendBlockingThreads.size();
00904       // Create the task
00905       task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm
00906           .getTimeout(), tm.getLogin(), tid);
00907 
00908       for (int i = 0; i < nbOfThreads; i++)
00909       {
00910         BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
00911             .get(i);
00912         if (thread.hasTaskForTransaction(lTid))
00913         {
00914           if (asynchronousBackends == null)
00915             asynchronousBackends = new ArrayList();
00916           asynchronousBackends.add(thread.getBackend());
00917           synchronized (thread)
00918           {
00919             thread.insertTaskAfterLastWriteForTransaction(task, lTid);
00920             thread.notify();
00921           }
00922         }
00923       }
00924 
00925       backendBlockingThreadsRWLock.releaseWrite();
00926     }
00927 
00928     try
00929     {
00930       backendNonBlockingThreadsRWLock.acquireWrite();
00931     }
00932     catch (InterruptedException e)
00933     {
00934       String msg = Translate.get(
00935           "loadbalancer.backendlist.acquire.writelock.failed", e);
00936       logger.error(msg);
00937       throw new SQLException(msg);
00938     }
00939 
00940     int nbOfThreads = backendNonBlockingThreads.size();
00941     ArrayList commitList = new ArrayList();
00942 
00943     // Build the list of backends that need to commit this transaction
00944     for (int i = 0; i < nbOfThreads; i++)
00945     {
00946       BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
00947           .get(i);
00948       DatabaseBackend backend = thread.getBackend();
00949       // If the transaction has been started on this backend and it was not
00950       // previously treated in the asynchronous backend list (late nodes), then
00951       // we have to post the task now in the asynchronous list.
00952       if (backend.isStartedTransaction(lTid)
00953           && ((asynchronousBackends == null) || (!asynchronousBackends
00954               .contains(backend))))
00955         commitList.add(thread);
00956     }
00957 
00958     nbOfThreads = commitList.size();
00959     if (nbOfThreads == 0)
00960     {
00961       backendNonBlockingThreadsRWLock.releaseWrite();
00962       return;
00963     }
00964 
00965     if (task == null)
00966       task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm
00967           .getTimeout(), tm.getLogin(), tid);
00968 
00969     synchronized (task)
00970     {
00971       // Post the task in each backendThread tasklist and wakeup the threads
00972       for (int i = 0; i < nbOfThreads; i++)
00973       {
00974         BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i);
00975         synchronized (thread)
00976         {
00977           thread.addTask(task, tid);
00978           thread.notify();
00979         }
00980       }
00981 
00982       backendNonBlockingThreadsRWLock.releaseWrite();
00983 
00984       // Wait for completion (notified by the task)
00985       try
00986       {
00987         // Wait on task
00988         long timeout = tm.getTimeout();
00989         if (timeout > 0)
00990         {
00991           long start = System.currentTimeMillis();
00992           task.wait(timeout);
00993           long end = System.currentTimeMillis();
00994           long remaining = timeout - (end - start);
00995           if (remaining <= 0)
00996           {
00997             if (task.setExpiredTimeout())
00998             { // Task will be ignored by all backends
00999               String msg = Translate.get("loadbalancer.commit.timeout",
01000                   new String[]{String.valueOf(tid),
01001                       String.valueOf(task.getSuccess()),
01002                       String.valueOf(task.getFailed())});
01003               logger.warn(msg);
01004               throw new SQLException(msg);
01005             }
01006             // else task execution already started, to late to cancel
01007           }
01008         }
01009         else
01010           task.wait();
01011       }
01012       catch (InterruptedException e)
01013       {
01014         if (task.setExpiredTimeout())
01015         { // Task will be ignored by all backends
01016           String msg = Translate.get("loadbalancer.commit.timeout",
01017               new String[]{String.valueOf(tid),
01018                   String.valueOf(task.getSuccess()),
01019                   String.valueOf(task.getFailed())});
01020           logger.warn(msg);
01021           throw new SQLException(msg);
01022         }
01023         // else task execution already started, to late to cancel
01024       }
01025 
01026       if (task.getSuccess() > 0)
01027         return;
01028       else
01029       { // All tasks failed
01030         ArrayList exceptions = task.getExceptions();
01031         if (exceptions == null)
01032           throw new SQLException(Translate.get(
01033               "loadbalancer.commit.all.failed", tid));
01034         else
01035         {
01036           String errorMsg = Translate.get("loadbalancer.commit.failed.stack",
01037               tid)
01038               + "\n";
01039           SQLException ex = SQLExceptionFactory.getSQLException(exceptions,
01040               errorMsg);
01041           logger.error(ex.getMessage());
01042           throw ex;
01043         }
01044       }
01045     }
01046   }
01047 
01048   /**
01049    * Rollbacks a transaction.
01050    * 
01051    * @param tm the transaction marker metadata
01052    * @exception SQLException if an error occurs
01053    */
01054   public void rollback(TransactionMarkerMetaData tm) throws SQLException
01055   {
01056     long tid = tm.getTransactionId();
01057     Long lTid = new Long(tid);
01058     // List of backends that still have pending queries for the transaction to
01059     // commit
01060     ArrayList asynchronousBackends = null;
01061     RollbackTask task = null;
01062 
01063     if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
01064     {
01065       try
01066       {
01067         backendBlockingThreadsRWLock.acquireWrite();
01068         // Lock in write to ensure that all writes are posted and we wait in the
01069         // queue, else a read lock has the priority with the implementation we
01070         // are using.
01071       }
01072       catch (InterruptedException e)
01073       {
01074         String msg = Translate.get(
01075             "loadbalancer.backendlist.acquire.writelock.failed", e);
01076         logger.error(msg);
01077         throw new SQLException(msg);
01078       }
01079 
01080       int nbOfThreads = backendBlockingThreads.size();
01081       // Create the task
01082       task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, tm
01083           .getTimeout(), tm.getLogin(), tid);
01084 
01085       for (int i = 0; i < nbOfThreads; i++)
01086       {
01087         BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01088             .get(i);
01089         if (thread.hasTaskForTransaction(lTid))
01090         {
01091           if (asynchronousBackends == null)
01092             asynchronousBackends = new ArrayList();
01093           asynchronousBackends.add(thread.getBackend());
01094           synchronized (thread)
01095           {
01096             thread.insertTaskAfterLastWriteForTransaction(task, lTid);
01097             thread.notify();
01098           }
01099         }
01100       }
01101 
01102       backendBlockingThreadsRWLock.releaseWrite();
01103     }
01104 
01105     try
01106     {
01107       backendNonBlockingThreadsRWLock.acquireWrite();
01108     }
01109     catch (InterruptedException e)
01110     {
01111       String msg = Translate.get(
01112           "loadbalancer.backendlist.acquire.writelock.failed", e);
01113       logger.error(msg);
01114       throw new SQLException(msg);
01115     }
01116     int nbOfThreads = backendNonBlockingThreads.size();
01117     ArrayList rollbackList = new ArrayList();
01118 
01119     // Build the list of backend that need to rollback this transaction
01120     for (int i = 0; i < nbOfThreads; i++)
01121     {
01122       BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
01123           .get(i);
01124       DatabaseBackend backend = thread.getBackend();
01125       // If the transaction has been started on this backend and it was not
01126       // previously treated in the asynchronous backend list (late nodes), then
01127       // we have to post the task now in the asynchronous list.
01128       if (backend.isStartedTransaction(lTid)
01129           && ((asynchronousBackends == null) || (!asynchronousBackends
01130               .contains(backend))))
01131         rollbackList.add(thread);
01132     }
01133 
01134     nbOfThreads = rollbackList.size();
01135     if (nbOfThreads == 0)
01136     {
01137       backendNonBlockingThreadsRWLock.releaseWrite();
01138       return;
01139     }
01140 
01141     if (task == null)
01142       task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, tm
01143           .getTimeout(), tm.getLogin(), tid);
01144 
01145     synchronized (task)
01146     {
01147       // Post the task in each backendThread tasklist and wakeup the threads
01148       for (int i = 0; i < nbOfThreads; i++)
01149       {
01150         BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i);
01151         synchronized (thread)
01152         {
01153           thread.addTask(task, tid);
01154           thread.notify();
01155         }
01156       }
01157 
01158       backendNonBlockingThreadsRWLock.releaseWrite();
01159 
01160       // Wait for completion (notified by the task)
01161       try
01162       {
01163         // Wait on task
01164         long timeout = tm.getTimeout();
01165         if (timeout > 0)
01166         {
01167           long start = System.currentTimeMillis();
01168           task.wait(timeout);
01169           long end = System.currentTimeMillis();
01170           long remaining = timeout - (end - start);
01171           if (remaining <= 0)
01172           {
01173             if (task.setExpiredTimeout())
01174             { // Task will be ignored by all backends
01175               String msg = Translate.get("loadbalancer.rollback.timeout",
01176                   new String[]{String.valueOf(tid),
01177                       String.valueOf(task.getSuccess()),
01178                       String.valueOf(task.getFailed())});
01179               logger.warn(msg);
01180               throw new SQLException(msg);
01181             }
01182             // else task execution already started, to late to cancel
01183           }
01184         }
01185         else
01186           task.wait();
01187       }
01188       catch (InterruptedException e)
01189       {
01190         if (task.setExpiredTimeout())
01191         { // Task will be ignored by all backends
01192           String msg = Translate.get("loadbalancer.rollback.timeout",
01193               new String[]{String.valueOf(tid),
01194                   String.valueOf(task.getSuccess()),
01195                   String.valueOf(task.getFailed())});
01196           logger.warn(msg);
01197           throw new SQLException(msg);
01198         }
01199         // else task execution already started, to late to cancel
01200       }
01201 
01202       if (task.getSuccess() > 0)
01203         return;
01204       else
01205       { // All tasks failed
01206         ArrayList exceptions = task.getExceptions();
01207         if (exceptions == null)
01208           throw new SQLException(Translate.get(
01209               "loadbalancer.rollback.all.failed", tid));
01210         else
01211         {
01212           String errorMsg = Translate.get("loadbalancer.rollback.failed.stack",
01213               tid)
01214               + "\n";
01215           SQLException ex = SQLExceptionFactory.getSQLException(exceptions,
01216               errorMsg);
01217           logger.error(ex.getMessage());
01218           throw ex;
01219         }
01220       }
01221     }
01222   }
01223 
01224   /**
01225    * Waits for all writes of the given transaction in the blocking thread queue
01226    * to complete before being able to complete the transaction.
01227    */
01228   protected void waitForAllWritesToComplete(long transactionId)
01229       throws SQLException
01230   {
01231     try
01232     {
01233       backendBlockingThreadsRWLock.acquireWrite();
01234       // Lock in write to ensure that all writes are posted and we wait in the
01235       // queue, else a read lock has the priority with the implementation we are
01236       // using.
01237     }
01238     catch (InterruptedException e)
01239     {
01240       String msg = Translate.get(
01241           "loadbalancer.backendlist.acquire.writelock.failed", e);
01242       logger.error(msg);
01243       throw new SQLException(msg);
01244     }
01245 
01246     int nbOfThreads = backendBlockingThreads.size();
01247 
01248     for (int i = 0; i < nbOfThreads; i++)
01249     {
01250       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01251           .get(i);
01252       thread.waitForAllTasksToComplete(transactionId);
01253     }
01254 
01255     backendBlockingThreadsRWLock.releaseWrite();
01256   }
01257 
01258   /**
01259    * Waits for all writes of the given transaction in the blocking thread queue
01260    * of the given backend to complete before being able to complete the
01261    * transaction.
01262    * 
01263    * @see #executeRequestOnBackend
01264    */
01265   protected void waitForAllWritesToComplete(DatabaseBackend backend,
01266       long transactionId) throws SQLException
01267   {
01268     try
01269     {
01270       backendBlockingThreadsRWLock.acquireWrite();
01271       // Lock in write to ensure that all writes are posted and we wait in the
01272       // queue, else a read lock has the priority with the implementation we are
01273       // using.
01274     }
01275     catch (InterruptedException e)
01276     {
01277       String msg = Translate.get(
01278           "loadbalancer.backendlist.acquire.writelock.failed", e);
01279       logger.error(msg);
01280       throw new SQLException(msg);
01281     }
01282 
01283     int nbOfThreads = backendBlockingThreads.size();
01284 
01285     for (int i = 0; i < nbOfThreads; i++)
01286     {
01287       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01288           .get(i);
01289       if (thread.getBackend() == backend)
01290         thread.waitForAllTasksToComplete(transactionId);
01291     }
01292 
01293     backendBlockingThreadsRWLock.releaseWrite();
01294   }
01295 
01296   /**
01297    * Waits for all writes in the blocking thread queue of the given backend to
01298    * complete.
01299    * 
01300    * @see #executeRequestOnBackend
01301    */
01302   protected void waitForAllWritesToComplete(DatabaseBackend backend)
01303       throws SQLException
01304   {
01305     try
01306     {
01307       backendBlockingThreadsRWLock.acquireWrite();
01308       // Lock in write to ensure that all writes are posted and we wait in the
01309       // queue, else a read lock has the priority with the implementation we are
01310       // using.
01311     }
01312     catch (InterruptedException e)
01313     {
01314       String msg = Translate.get(
01315           "loadbalancer.backendlist.acquire.writelock.failed", e);
01316       logger.error(msg);
01317       throw new SQLException(msg);
01318     }
01319 
01320     int nbOfThreads = backendBlockingThreads.size();
01321 
01322     for (int i = 0; i < nbOfThreads; i++)
01323     {
01324       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01325           .get(i);
01326       if (thread.getBackend() == backend)
01327         thread.waitForAllTasksToComplete();
01328     }
01329 
01330     backendBlockingThreadsRWLock.releaseWrite();
01331   }
01332 
01333   /*
01334    * Backends management
01335    */
01336 
01337   /**
01338    * Enables a Backend that was previously disabled.
01339    * <p>
01340    * Ask the corresponding connection manager to initialize the connections if
01341    * needed.
01342    * <p>
01343    * No sanity checks are performed by this function.
01344    * 
01345    * @param db the database backend to enable
01346    * @param writeEnabled True if the backend must be enabled for writes
01347    * @throws SQLException if an error occurs
01348    */
01349   public void enableBackend(DatabaseBackend db, boolean writeEnabled)
01350       throws SQLException
01351   {
01352     if (writeEnabled && db.isWriteCanBeEnabled())
01353     {
01354       // Create 2 worker threads
01355       BackendWorkerThread blockingThread = new BackendWorkerThread(db, this);
01356       BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this);
01357 
01358       // Add first to the blocking thread list
01359       try
01360       {
01361         backendBlockingThreadsRWLock.acquireWrite();
01362       }
01363       catch (InterruptedException e)
01364       {
01365         String msg = Translate.get(
01366             "loadbalancer.backendlist.acquire.writelock.failed", e);
01367         logger.error(msg);
01368         throw new SQLException(msg);
01369       }
01370       backendBlockingThreads.add(blockingThread);
01371       backendBlockingThreadsRWLock.releaseWrite();
01372       blockingThread.start();
01373       logger.info(Translate.get(
01374           "loadbalancer.backend.workerthread.blocking.add", db.getName()));
01375 
01376       // Then add to the non-blocking thread list
01377       try
01378       {
01379         backendNonBlockingThreadsRWLock.acquireWrite();
01380       }
01381       catch (InterruptedException e)
01382       {
01383         String msg = Translate.get(
01384             "loadbalancer.backendlist.acquire.writelock.failed", e);
01385         logger.error(msg);
01386         throw new SQLException(msg);
01387       }
01388       backendNonBlockingThreads.add(nonBlockingThread);
01389       backendNonBlockingThreadsRWLock.releaseWrite();
01390       nonBlockingThread.start();
01391       logger.info(Translate.get(
01392           "loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
01393       db.enableWrite();
01394     }
01395 
01396     if (!db.isInitialized())
01397       db.initializeConnections();
01398     db.enableRead();
01399   }
01400 
01401   /**
01402    * Disables a backend that was previously enabled.
01403    * <p>
01404    * Ask the corresponding connection manager to finalize the connections if
01405    * needed.
01406    * <p>
01407    * No sanity checks are performed by this function.
01408    * 
01409    * @param db the database backend to disable
01410    * @throws SQLException if an error occurs
01411    */
01412   public synchronized void disableBackend(DatabaseBackend db)
01413       throws SQLException
01414   {
01415     if (db.isWriteEnabled())
01416     {
01417       // Starts with backendBlockingThreads
01418       try
01419       {
01420         backendBlockingThreadsRWLock.acquireWrite();
01421       }
01422       catch (InterruptedException e)
01423       {
01424         String msg = Translate.get(
01425             "loadbalancer.backendlist.acquire.writelock.failed", e);
01426         logger.error(msg);
01427         throw new SQLException(msg);
01428       }
01429 
01430       int nbOfThreads = backendBlockingThreads.size();
01431 
01432       // Find the right blocking thread
01433       for (int i = 0; i < nbOfThreads; i++)
01434       {
01435         BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01436             .get(i);
01437         if (thread.getBackend().equals(db))
01438         {
01439           logger.info(Translate
01440               .get("loadbalancer.backend.workerthread.blocking.remove", db
01441                   .getName()));
01442 
01443           // Remove it from the backendBlockingThread list
01444           backendBlockingThreads.remove(thread);
01445 
01446           synchronized (thread)
01447           {
01448             // Kill the thread
01449             thread.addPriorityTask(new KillThreadTask(1, 1));
01450             thread.notify();
01451           }
01452           break;
01453         }
01454       }
01455 
01456       backendBlockingThreadsRWLock.releaseWrite();
01457 
01458       // Continue with backendNonBlockingThreads
01459 
01460       try
01461       {
01462         backendNonBlockingThreadsRWLock.acquireWrite();
01463       }
01464       catch (InterruptedException e)
01465       {
01466         String msg = Translate.get(
01467             "loadbalancer.backendlist.acquire.writelock.failed", e);
01468         logger.error(msg);
01469         throw new SQLException(msg);
01470       }
01471 
01472       // Find the right non-blocking thread
01473       nbOfThreads = backendNonBlockingThreads.size();
01474       for (int i = 0; i < nbOfThreads; i++)
01475       {
01476         BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
01477             .get(i);
01478         if (thread.getBackend().equals(db))
01479         {
01480           logger.info(Translate.get(
01481               "loadbalancer.backend.workerthread.non.blocking.remove", db
01482                   .getName()));
01483 
01484           // Remove it from the backendNonBlockingThreads list
01485           backendNonBlockingThreads.remove(thread);
01486 
01487           synchronized (thread)
01488           {
01489             // Kill the thread
01490             thread.addPriorityTask(new KillThreadTask(1, 1));
01491             thread.notify();
01492           }
01493           break;
01494         }
01495       }
01496 
01497       backendNonBlockingThreadsRWLock.releaseWrite();
01498     }
01499 
01500     db.disable();
01501     if (db.isInitialized())
01502       db.finalizeConnections();
01503   }
01504 
01505   /**
01506    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
01507    */
01508   public String getXmlImpl()
01509   {
01510     StringBuffer info = new StringBuffer();
01511     info.append("<" + DatabasesXmlTags.ELT_RAIDb_1 + ">");
01512     if (waitForCompletionPolicy != null)
01513       info.append(waitForCompletionPolicy.getXml());
01514     if (macroHandler != null)
01515       info.append(macroHandler.getXml());
01516     info.append(getRaidb1Xml());
01517     info.append("</" + DatabasesXmlTags.ELT_RAIDb_1 + ">");
01518     return info.toString();
01519   }
01520 
01521   /**
01522    * Surrounding raidb1 tags can be treated by <method>getXmlImpl </method>
01523    * above, but more detailed content have to be returned by the method
01524    * <method>getRaidb1Xml </method> below.
01525    * 
01526    * @return content of Raidb1 xml
01527    */
01528   public abstract String getRaidb1Xml();
01529 }

Generated on Mon Apr 11 22:01:33 2005 for C-JDBC by  doxygen 1.3.9.1