Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

RAIDb2.java

00001 /**
00002  * C-JDBC: Clustered JDBC.
00003  * Copyright (C) 2002-2004 French National Institute For Research In Computer
00004  * Science And Control (INRIA).
00005  * Contact: c-jdbc@objectweb.org
00006  * 
00007  * This library is free software; you can redistribute it and/or modify it
00008  * under the terms of the GNU Lesser General Public License as published by the
00009  * Free Software Foundation; either version 2.1 of the License, or any later
00010  * version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but WITHOUT
00013  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00014  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
00015  * for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public License
00018  * along with this library; if not, write to the Free Software Foundation,
00019  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
00020  *
00021  * Initial developer(s): Emmanuel Cecchet. 
00022  * Contributor(s): ______________________.
00023  */
00024 
00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb2;
00026 
00027 import java.sql.Connection;
00028 import java.sql.SQLException;
00029 import java.util.ArrayList;
00030 
00031 import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
00032 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException;
00033 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
00034 import org.objectweb.cjdbc.common.i18n.Translate;
00035 import org.objectweb.cjdbc.common.log.Trace;
00036 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00037 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00038 import org.objectweb.cjdbc.common.sql.SelectRequest;
00039 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00040 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock;
00041 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00042 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00043 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
00044 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
00045 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00046 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00047 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00048 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
00049 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableException;
00050 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy;
00051 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule;
00052 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
00053 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask;
00054 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00055 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask;
00056 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask;
00057 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestTask;
00058 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestWithKeysTask;
00059 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask;
00060 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00061 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00062 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00063 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00064 
00065 /**
00066  * RAIDb-2 load balancer.
00067  * <p>
00068  * This class is an abstract call because the read requests coming from the
00069  * Request Manager are NOT treated here but in the subclasses. Transaction
00070  * management and write requests are broadcasted to all backends owning the
00071  * written table.
00072  * 
00073  * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
00074  * @version 1.0
00075  */
00076 public abstract class RAIDb2 extends AbstractLoadBalancer
00077 {
00078   //
00079   // How the code is organized ?
00080   // 1. Member variables
00081   // 2. Constructor(s)
00082   // 3. Request handling
00083   // 4. Transaction handling
00084   // 5. Backend management
00085   //
00086 
00087   protected ArrayList                   backendBlockingThreads;
00088   protected ArrayList                   backendNonBlockingThreads;
00089   protected ReadPrioritaryFIFOWriteLock backendBlockingThreadsRWLock    = new ReadPrioritaryFIFOWriteLock();
00090   protected ReadPrioritaryFIFOWriteLock backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock();
00091   // Should we wait for all backends to commit before returning ?
00092   protected WaitForCompletionPolicy     waitForCompletionPolicy;
00093   protected CreateTablePolicy           createTablePolicy;
00094 
00095   protected static Trace                logger                          = Trace
00096                                                                             .getLogger("org.objectweb.cjdbc.controller.loadbalancer.raidb2");
00097 
00098   /*
00099    * Constructors
00100    */
00101 
00102   /**
00103    * Creates a new RAIDb-1 Round Robin request load balancer. A new backend
00104    * worker thread is created for each backend.
00105    * 
00106    * @param vdb the virtual database this load balancer belongs to.
00107    * @param waitForCompletionPolicy how many backends must complete before
00108    *          returning the result ?
00109    * @param createTablePolicy the policy defining how 'create table' statements
00110    *          should be handled
00111    * @exception Exception if an error occurs
00112    */
00113   public RAIDb2(VirtualDatabase vdb,
00114       WaitForCompletionPolicy waitForCompletionPolicy,
00115       CreateTablePolicy createTablePolicy) throws Exception
00116   {
00117     super(vdb, RAIDbLevels.RAIDb2, ParsingGranularities.TABLE);
00118 
00119     this.waitForCompletionPolicy = waitForCompletionPolicy;
00120     backendBlockingThreads = new ArrayList();
00121     backendNonBlockingThreads = new ArrayList();
00122     this.createTablePolicy = createTablePolicy;
00123   }
00124 
00125   /*
00126    * Request Handling
00127    */
00128 
00129   /**
00130    * Returns the number of nodes to wait for according to the defined
00131    * <code>waitForCompletion</code> policy.
00132    * 
00133    * @param nbOfThreads total number of threads
00134    * @return int number of threads to wait for
00135    */
00136   private int getNbToWait(int nbOfThreads)
00137   {
00138     int nbToWait;
00139     switch (waitForCompletionPolicy.getPolicy())
00140     {
00141       case WaitForCompletionPolicy.FIRST :
00142         nbToWait = 1;
00143         break;
00144       case WaitForCompletionPolicy.MAJORITY :
00145         nbToWait = nbOfThreads / 2 + 1;
00146         break;
00147       default :
00148         logger
00149             .warn(Translate.get("loadbalancer.waitforcompletion.unsupported"));
00150       case WaitForCompletionPolicy.ALL :
00151         nbToWait = nbOfThreads;
00152         break;
00153     }
00154     return nbToWait;
00155   }
00156 
00157   /**
00158    * Performs a write request. This request is broadcasted to all nodes that
00159    * owns the table to be written.
00160    * 
00161    * @param request an <code>AbstractWriteRequest</code>
00162    * @return number of rows affected by the request
00163    * @throws AllBackendsFailedException if all backends failed to execute the
00164    *           request
00165    * @exception SQLException if an error occurs
00166    */
00167   public int execWriteRequest(AbstractWriteRequest request)
00168       throws AllBackendsFailedException, SQLException
00169   {
00170     return ((WriteRequestTask) execWriteRequest(request, false, null))
00171         .getResult();
00172   }
00173 
00174   /**
00175    * Perform a write request and return the auto generated keys.
00176    * 
00177    * @param request the request to execute
00178    * @param metadataCache the metadataCache if any or null
00179    * @return auto generated keys.
00180    * @throws AllBackendsFailedException if all backends failed to execute the
00181    *           request
00182    * @exception SQLException if an error occurs
00183    */
00184   public ControllerResultSet execWriteRequestWithKeys(
00185       AbstractWriteRequest request, MetadataCache metadataCache)
00186       throws AllBackendsFailedException, SQLException
00187   {
00188     return ((WriteRequestWithKeysTask) execWriteRequest(request, true,
00189         metadataCache)).getResult();
00190   }
00191 
00192   /**
00193    * Common code for execWriteRequest(AbstractWriteRequest) and
00194    * execWriteRequestWithKeys(AbstractWriteRequest). The result is given back
00195    * using member variables execWriteRequestResult and
00196    * execWriteRequestWithKeysResult defined above.
00197    * 
00198    * @param request the request to execute
00199    * @param useKeys true if this must give an auto generated keys ResultSet
00200    * @param metadataCache the metadataCache if any or null
00201    * @throws AllBackendsFailedException if all backends failed to execute the
00202    *           request
00203    * @throws SQLException if an error occurs
00204    */
00205   private AbstractTask execWriteRequest(AbstractWriteRequest request,
00206       boolean useKeys, MetadataCache metadataCache)
00207       throws AllBackendsFailedException, SQLException
00208   {
00209     ArrayList backendThreads;
00210     ReadPrioritaryFIFOWriteLock lock;
00211 
00212     // Handle macros
00213     handleMacros(request);
00214 
00215     // Determine which list (blocking or not) to use
00216     if (request.mightBlock())
00217     { // Blocking
00218       backendThreads = backendBlockingThreads;
00219       lock = backendBlockingThreadsRWLock;
00220     }
00221     else
00222     { // Non-blocking
00223       backendThreads = backendNonBlockingThreads;
00224       lock = backendNonBlockingThreadsRWLock;
00225       if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00226           && (request.getTransactionId() != 0))
00227         waitForAllWritesToComplete(request.getTransactionId());
00228     }
00229 
00230     try
00231     {
00232       lock.acquireWrite();
00233     }
00234     catch (InterruptedException e)
00235     {
00236       String msg = Translate.get(
00237           "loadbalancer.backendlist.acquire.writelock.failed", e);
00238       logger.error(msg);
00239       throw new SQLException(msg);
00240     }
00241 
00242     int nbOfThreads = backendThreads.size();
00243     ArrayList writeList = new ArrayList();
00244     String tableName = request.getTableName();
00245 
00246     if (request.isCreate())
00247     { // Choose the backend according to the defined policy
00248       CreateTableRule rule = createTablePolicy.getTableRule(request
00249           .getTableName());
00250       if (rule == null)
00251         rule = createTablePolicy.getDefaultRule();
00252 
00253       // Ask the rule to pickup the backends
00254       ArrayList chosen;
00255       try
00256       {
00257         chosen = rule.getBackends(vdb.getBackends());
00258       }
00259       catch (CreateTableException e)
00260       {
00261         throw new SQLException(Translate.get(
00262             "loadbalancer.create.table.rule.failed", e.getMessage()));
00263       }
00264 
00265       // Build the thread list from the backend list
00266       if (chosen != null)
00267       {
00268         for (int i = 0; i < nbOfThreads; i++)
00269         {
00270           BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00271               .get(i);
00272           if (chosen.contains(thread.getBackend()))
00273             writeList.add(thread);
00274         }
00275       }
00276     }
00277     else
00278     { // Build the list of backends that need to execute this request
00279       for (int i = 0; i < nbOfThreads; i++)
00280       {
00281         BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00282             .get(i);
00283         if (thread.getBackend().hasTable(tableName))
00284           writeList.add(thread);
00285       }
00286     }
00287 
00288     nbOfThreads = writeList.size();
00289     if (nbOfThreads == 0)
00290     {
00291       lock.releaseWrite();
00292       String msg = Translate.get("loadbalancer.execute.no.backend.found",
00293           request.getSQLShortForm(vdb.getSQLShortFormLength()));
00294       logger.warn(msg);
00295       throw new SQLException(msg);
00296     }
00297     else
00298     {
00299       if (logger.isDebugEnabled())
00300         logger.debug(Translate.get("loadbalancer.execute.on.several",
00301             new String[]{String.valueOf(request.getId()),
00302                 String.valueOf(nbOfThreads)}));
00303     }
00304 
00305     // Create the task
00306     AbstractTask task;
00307     if (useKeys)
00308       task = new WriteRequestWithKeysTask(getNbToWait(nbOfThreads),
00309           nbOfThreads, request, metadataCache);
00310     else
00311       task = new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads,
00312           request);
00313 
00314     synchronized (task)
00315     {
00316       // We have to first post the request on each backend before letting the
00317       // first backend to execute the request. Therefore we have 2 phases:
00318       //   1. post the task in each thread queue
00319       //   2. notify each thread to execute the query
00320 
00321       // 1. Post the task
00322       if (request.isAutoCommit())
00323       {
00324         for (int i = 0; i < nbOfThreads; i++)
00325         {
00326           BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i);
00327           synchronized (thread)
00328           {
00329             thread.addTask(task);
00330           }
00331         }
00332       }
00333       else
00334       {
00335         for (int i = 0; i < nbOfThreads; i++)
00336         {
00337           BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i);
00338           synchronized (thread)
00339           {
00340             thread.addTask(task, request.getTransactionId());
00341           }
00342         }
00343       }
00344 
00345       // 2. Start the task execution on each backend
00346       for (int i = 0; i < nbOfThreads; i++)
00347       {
00348         BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i);
00349         synchronized (thread)
00350         {
00351           thread.notify();
00352         }
00353       }
00354 
00355       lock.releaseWrite();
00356 
00357       // Wait for completion (notified by the task)
00358       try
00359       {
00360         // Wait on task
00361         long timeout = request.getTimeout() * 1000;
00362         if (timeout > 0)
00363         {
00364           long start = System.currentTimeMillis();
00365           task.wait(timeout);
00366           long end = System.currentTimeMillis();
00367           long remaining = timeout - (end - start);
00368           if (remaining <= 0)
00369           {
00370             if (task.setExpiredTimeout())
00371             { // Task will be ignored by all backends
00372               String msg = Translate.get("loadbalancer.request.timeout",
00373                   new String[]{String.valueOf(request.getId()),
00374                       String.valueOf(task.getSuccess()),
00375                       String.valueOf(task.getFailed())});
00376               logger.warn(msg);
00377               throw new SQLException(msg);
00378             }
00379             // else task execution already started, to late to cancel
00380           }
00381           // No need to update request timeout since the execution is finished
00382         }
00383         else
00384           task.wait();
00385       }
00386       catch (InterruptedException e)
00387       {
00388         if (task.setExpiredTimeout())
00389         { // Task will be ignored by all backends
00390           String msg = Translate.get("loadbalancer.request.timeout",
00391               new String[]{String.valueOf(request.getId()),
00392                   String.valueOf(task.getSuccess()),
00393                   String.valueOf(task.getFailed())});
00394           logger.warn(msg);
00395           throw new SQLException(msg);
00396         }
00397         // else task execution already started, to late to cancel
00398       }
00399 
00400       if (task.getSuccess() > 0)
00401         return task;
00402       else
00403       { // All tasks failed
00404         ArrayList exceptions = task.getExceptions();
00405         if (exceptions == null)
00406           throw new AllBackendsFailedException(Translate.get(
00407               "loadbalancer.request.failed.all", request.getId()));
00408         else
00409         {
00410           String errorMsg = Translate.get("loadbalancer.request.failed.stack",
00411               request.getId())
00412               + "\n";
00413           for (int i = 0; i < exceptions.size(); i++)
00414             errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
00415           logger.error(errorMsg);
00416           throw new SQLException(errorMsg);
00417         }
00418       }
00419     }
00420   }
00421 
00422   /**
00423    * Implementation specific load balanced read execution.
00424    * 
00425    * @param request an <code>SelectRequest</code>
00426    * @param metadataCache the metadataCache if any or null
00427    * @return the corresponding <code>java.sql.ResultSet</code>
00428    * @exception SQLException if an error occurs
00429    */
00430   public abstract ControllerResultSet execReadRequest(SelectRequest request,
00431       MetadataCache metadataCache) throws SQLException;
00432 
00433   /**
00434    * Execute a read request on the selected backend.
00435    * 
00436    * @param request the request to execute
00437    * @param backend the backend that will execute the request
00438    * @param metadataCache a metadataCache if any or null
00439    * @return the ResultSet
00440    * @throws SQLException if an error occurs
00441    */
00442   protected ControllerResultSet executeRequestOnBackend(SelectRequest request,
00443       DatabaseBackend backend, MetadataCache metadataCache)
00444       throws SQLException, UnreachableBackendException
00445   {
00446     // Handle macros
00447     handleMacros(request);
00448 
00449     // Ok, we have a backend, let's execute the request
00450     AbstractConnectionManager cm = backend.getConnectionManager(request
00451         .getLogin());
00452 
00453     // Sanity check
00454     if (cm == null)
00455     {
00456       String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00457           new String[]{request.getLogin(), backend.getName()});
00458       logger.error(msg);
00459       throw new SQLException(msg);
00460     }
00461 
00462     // Execute the query
00463     if (request.isAutoCommit())
00464     {
00465       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00466         // We could do something finer grain here by waiting
00467         // only for writes that depend on the tables we need
00468         // but is that really worth the overhead ?
00469         waitForAllWritesToComplete(backend);
00470 
00471       ControllerResultSet rs = null;
00472       boolean badConnection;
00473       do
00474       {
00475         badConnection = false;
00476         // Use a connection just for this request
00477         Connection c = null;
00478         try
00479         {
00480           c = cm.getConnection();
00481         }
00482         catch (UnreachableBackendException e1)
00483         {
00484           logger.error(Translate.get(
00485               "loadbalancer.backend.disabling.unreachable", backend.getName()));
00486           disableBackend(backend);
00487           throw new UnreachableBackendException(Translate.get(
00488               "loadbalancer.backend.unreacheable", backend.getName()));
00489         }
00490 
00491         // Sanity check
00492         if (c == null)
00493           throw new UnreachableBackendException(
00494               "No more connections on backend " + backend.getName());
00495 
00496         // Execute Query
00497         try
00498         {
00499           rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00500           cm.releaseConnection(c);
00501         }
00502         catch (SQLException e)
00503         {
00504           cm.releaseConnection(c);
00505           throw new SQLException(Translate.get(
00506               "loadbalancer.request.failed.on.backend", new String[]{
00507                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00508                   backend.getName(), e.getMessage()}));
00509         }
00510         catch (BadConnectionException e)
00511         { // Get rid of the bad connection
00512           cm.deleteConnection(c);
00513           badConnection = true;
00514         }
00515       }
00516       while (badConnection);
00517       if (logger.isDebugEnabled())
00518         logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00519             String.valueOf(request.getId()), backend.getName()}));
00520       return rs;
00521     }
00522     else
00523     { // Inside a transaction
00524       Connection c;
00525       long tid = request.getTransactionId();
00526       Long lTid = new Long(tid);
00527 
00528       // Wait for previous writes to complete
00529       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00530         waitForAllWritesToComplete(backend, request.getTransactionId());
00531 
00532       try
00533       {
00534         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00535       }
00536       catch (UnreachableBackendException e1)
00537       {
00538         logger.error(Translate.get(
00539             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00540         disableBackend(backend);
00541         throw new SQLException(Translate.get(
00542             "loadbalancer.backend.unreacheable", backend.getName()));
00543       }
00544       catch (NoTransactionStartWhenDisablingException e)
00545       {
00546         String msg = Translate.get("loadbalancer.backend.is.disabling",
00547             new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00548                 backend.getName()});
00549         logger.error(msg);
00550         throw new SQLException(msg);
00551       }
00552 
00553       // Sanity check
00554       if (c == null)
00555         throw new SQLException(Translate.get(
00556             "loadbalancer.unable.retrieve.connection", new String[]{
00557                 String.valueOf(tid), backend.getName()}));
00558 
00559       // Execute Query
00560       ControllerResultSet rs = null;
00561       try
00562       {
00563         rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00564       }
00565       catch (SQLException e)
00566       {
00567         throw new SQLException(Translate.get(
00568             "loadbalancer.request.failed.on.backend", new String[]{
00569                 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00570                 backend.getName(), e.getMessage()}));
00571       }
00572       catch (BadConnectionException e)
00573       { // Connection failed, so did the transaction
00574         // Disable the backend.
00575         cm.deleteConnection(tid);
00576         String msg = Translate.get(
00577             "loadbalancer.backend.disabling.connection.failure", backend
00578                 .getName());
00579         logger.error(msg);
00580         disableBackend(backend);
00581         throw new SQLException(msg);
00582       }
00583       if (logger.isDebugEnabled())
00584         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00585             new String[]{String.valueOf(tid), String.valueOf(request.getId()),
00586                 backend.getName()}));
00587       return rs;
00588     }
00589   }
00590 
00591   /**
00592    * Execute a stored procedure on the selected backend.
00593    * 
00594    * @param proc the stored procedure to execute
00595    * @param backend the backend that will execute the request
00596    * @param metadataCache the metadataCache if any or null
00597    * @return the ResultSet
00598    * @throws SQLException if an error occurs
00599    */
00600   protected ControllerResultSet executeStoredProcedureOnBackend(
00601       StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache)
00602       throws SQLException, UnreachableBackendException
00603   {
00604     // Handle macros
00605     handleMacros(proc);
00606 
00607     // Ok, we have a backend, let's execute the request
00608     AbstractConnectionManager cm = backend
00609         .getConnectionManager(proc.getLogin());
00610 
00611     // Sanity check
00612     if (cm == null)
00613     {
00614       String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00615           new String[]{proc.getLogin(), backend.getName()});
00616       logger.error(msg);
00617       throw new SQLException(msg);
00618     }
00619 
00620     // Execute the query
00621     if (proc.isAutoCommit())
00622     {
00623       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00624         // We could do something finer grain here by waiting
00625         // only for writes that depend on the tables we need
00626         // but is that really worth the overhead ?
00627         waitForAllWritesToComplete(backend);
00628 
00629       // Use a connection just for this request
00630       Connection c = null;
00631       try
00632       {
00633         c = cm.getConnection();
00634       }
00635       catch (UnreachableBackendException e1)
00636       {
00637         logger.error(Translate.get(
00638             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00639         disableBackend(backend);
00640         throw new UnreachableBackendException(Translate.get(
00641             "loadbalancer.backend.unreacheable", backend.getName()));
00642       }
00643 
00644       // Sanity check
00645       if (c == null)
00646         throw new SQLException(Translate.get(
00647             "loadbalancer.backend.no.connection", backend.getName()));
00648 
00649       // Execute Query
00650       ControllerResultSet rs = null;
00651       try
00652       {
00653         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00654             backend, c, metadataCache);
00655       }
00656       catch (Exception e)
00657       {
00658         throw new SQLException(Translate.get(
00659             "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00660                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00661                 backend.getName(), e.getMessage()}));
00662       }
00663       finally
00664       {
00665         cm.releaseConnection(c);
00666       }
00667       if (logger.isDebugEnabled())
00668         logger.debug(Translate.get("loadbalancer.storedprocedure.on",
00669             new String[]{String.valueOf(proc.getId()), backend.getName()}));
00670       return rs;
00671     }
00672     else
00673     { // Inside a transaction
00674       Connection c;
00675       long tid = proc.getTransactionId();
00676       Long lTid = new Long(tid);
00677 
00678       // Wait for previous writes to complete
00679       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00680         waitForAllWritesToComplete(backend, proc.getTransactionId());
00681 
00682       try
00683       {
00684         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00685       }
00686       catch (UnreachableBackendException e1)
00687       {
00688         logger.error(Translate.get(
00689             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00690         disableBackend(backend);
00691         throw new SQLException(Translate.get(
00692             "loadbalancer.backend.unreacheable", backend.getName()));
00693       }
00694       catch (NoTransactionStartWhenDisablingException e)
00695       {
00696         String msg = Translate.get("loadbalancer.backend.is.disabling",
00697             new String[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00698                 backend.getName()});
00699         logger.error(msg);
00700         throw new SQLException(msg);
00701       }
00702 
00703       // Sanity check
00704       if (c == null)
00705         throw new SQLException(Translate.get(
00706             "loadbalancer.unable.retrieve.connection", new String[]{
00707                 String.valueOf(tid), backend.getName()}));
00708 
00709       // Execute Query
00710       ControllerResultSet rs;
00711       try
00712       {
00713         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00714             backend, c, metadataCache);
00715       }
00716       catch (Exception e)
00717       {
00718         throw new SQLException(Translate.get(
00719             "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00720                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00721                 backend.getName(), e.getMessage()}));
00722       }
00723       if (logger.isDebugEnabled())
00724         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00725             new String[]{String.valueOf(tid), String.valueOf(proc.getId()),
00726                 backend.getName()}));
00727       return rs;
00728     }
00729   }
00730 
00731   /**
00732    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execReadStoredProcedure(StoredProcedure,
00733    *      MetadataCache)
00734    */
00735   public ControllerResultSet execReadStoredProcedure(StoredProcedure proc,
00736       MetadataCache metadataCache) throws SQLException
00737   {
00738     ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure(
00739         proc, true, metadataCache);
00740     return task.getResult();
00741   }
00742 
00743   /**
00744    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execWriteStoredProcedure(StoredProcedure)
00745    */
00746   public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException
00747   {
00748     WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure(
00749         proc, false, null);
00750     return task.getResult();
00751   }
00752 
00753   /**
00754    * Post the stored procedure call in the threads task list.
00755    * 
00756    * @param proc the stored procedure to call
00757    * @param isRead true if the call returns a ResultSet
00758    * @param metadataCache the metadataCache if any or null
00759    * @return the task that has been executed (caller can get the result by
00760    *         calling getResult())
00761    * @throws SQLException if an error occurs
00762    */
00763   private AbstractTask callStoredProcedure(StoredProcedure proc,
00764       boolean isRead, MetadataCache metadataCache) throws SQLException
00765   {
00766     ArrayList backendThreads = backendBlockingThreads;
00767     ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock;
00768 
00769     try
00770     {
00771       // Note that a read stored procedure here is supposed to also execute
00772       // writes and as the scheduler cannot block atomically on multiple tables
00773       // for a writes, we have to lock as a write even for a read stored
00774       // procedure. A read-only stored procedure will execute in a separate
00775       // method call (see executeReadOnlyStoredProcedure).
00776       lock.acquireWrite();
00777     }
00778     catch (InterruptedException e)
00779     {
00780       String msg;
00781       msg = Translate.get("loadbalancer.backendlist.acquire.writelock.failed",
00782           e);
00783       logger.error(msg);
00784       throw new SQLException(msg);
00785     }
00786 
00787     int nbOfThreads = backendThreads.size();
00788 
00789     // Create the task
00790     AbstractTask task;
00791     if (isRead)
00792       task = new ReadStoredProcedureTask(getNbToWait(nbOfThreads), nbOfThreads,
00793           proc, metadataCache);
00794     else
00795       task = new WriteStoredProcedureTask(getNbToWait(nbOfThreads),
00796           nbOfThreads, proc);
00797 
00798     synchronized (task)
00799     {
00800       int nbOfBackends = 0;
00801 
00802       // Post the task in each backendThread tasklist and wakeup the threads
00803       for (int i = 0; i < nbOfThreads; i++)
00804       {
00805         BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00806             .get(i);
00807         if (thread.getBackend().hasStoredProcedure(proc.getProcedureName()))
00808         {
00809           nbOfBackends++;
00810           synchronized (thread)
00811           {
00812             if (proc.isAutoCommit())
00813               thread.addTask(task);
00814             else
00815               thread.addTask(task, proc.getTransactionId());
00816             thread.notify();
00817           }
00818         }
00819       }
00820 
00821       lock.releaseWrite();
00822 
00823       if (nbOfBackends == 0)
00824       {
00825         throw new SQLException(Translate.get(
00826             "loadbalancer.backend.no.required.storedprocedure", proc
00827                 .getProcedureName()));
00828       }
00829       else
00830         task.setTotalNb(nbOfBackends);
00831 
00832       // Wait for completion (notified by the task)
00833       try
00834       {
00835         // Wait on task
00836         long timeout = proc.getTimeout() * 1000;
00837         if (timeout > 0)
00838         {
00839           long start = System.currentTimeMillis();
00840           task.wait(timeout);
00841           long end = System.currentTimeMillis();
00842           long remaining = timeout - (end - start);
00843           if (remaining <= 0)
00844           {
00845             if (task.setExpiredTimeout())
00846             { // Task will be ignored by all backends
00847               String msg = Translate.get(
00848                   "loadbalancer.storedprocedure.timeout", new String[]{
00849                       String.valueOf(proc.getId()),
00850                       String.valueOf(task.getSuccess()),
00851                       String.valueOf(task.getFailed())});
00852               logger.warn(msg);
00853               throw new SQLException(msg);
00854             }
00855             // else task execution already started, to late to cancel
00856           }
00857           // No need to update request timeout since the execution is finished
00858         }
00859         else
00860           task.wait();
00861       }
00862       catch (InterruptedException e)
00863       {
00864         if (task.setExpiredTimeout())
00865         { // Task will be ignored by all backends
00866           String msg = Translate.get("loadbalancer.storedprocedure.timeout",
00867               new String[]{String.valueOf(proc.getId()),
00868                   String.valueOf(task.getSuccess()),
00869                   String.valueOf(task.getFailed())});
00870           logger.warn(msg);
00871           throw new SQLException(msg);
00872         }
00873         // else task execution already started, to late to cancel
00874       }
00875 
00876       if (task.getSuccess() > 0)
00877         return task;
00878       else
00879       { // All tasks failed
00880         ArrayList exceptions = task.getExceptions();
00881         if (exceptions == null)
00882           throw new SQLException(Translate.get(
00883               "loadbalancer.storedprocedure.all.failed", proc.getId()));
00884         else
00885         {
00886           String errorMsg = Translate.get(
00887               "loadbalancer.storedprocedure.failed.stack", proc.getId())
00888               + "\n";
00889           for (int i = 0; i < exceptions.size(); i++)
00890             errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
00891           logger.error(errorMsg);
00892           throw new SQLException(errorMsg);
00893         }
00894       }
00895     }
00896   }
00897 
00898   /*
00899    * Transaction management
00900    */
00901 
00902   /**
00903    * Begins a new transaction.
00904    * 
00905    * @param tm the transaction marker metadata
00906    * @exception SQLException if an error occurs
00907    */
00908   public final void begin(TransactionMarkerMetaData tm) throws SQLException
00909   {
00910   }
00911 
00912   /**
00913    * Commits a transaction.
00914    * 
00915    * @param tm the transaction marker metadata
00916    * @exception SQLException if an error occurs
00917    */
00918   public void commit(TransactionMarkerMetaData tm) throws SQLException
00919   {
00920     long tid = tm.getTransactionId();
00921     if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00922       waitForAllWritesToComplete(tid);
00923 
00924     try
00925     {
00926       backendNonBlockingThreadsRWLock.acquireWrite();
00927     }
00928     catch (InterruptedException e)
00929     {
00930       String msg = Translate.get(
00931           "loadbalancer.backendlist.acquire.writelock.failed", e);
00932       logger.error(msg);
00933       throw new SQLException(msg);
00934     }
00935 
00936     int nbOfThreads = backendNonBlockingThreads.size();
00937     ArrayList commitList = new ArrayList();
00938     Long iTid = new Long(tid);
00939 
00940     // Build the list of backend that need to commit this transaction
00941     for (int i = 0; i < nbOfThreads; i++)
00942     {
00943       BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
00944           .get(i);
00945       if (thread.getBackend().isStartedTransaction(iTid))
00946         commitList.add(thread);
00947     }
00948 
00949     nbOfThreads = commitList.size();
00950     if (nbOfThreads == 0)
00951     {
00952       backendNonBlockingThreadsRWLock.releaseWrite();
00953       return;
00954     }
00955 
00956     // Create the task
00957     CommitTask task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm
00958         .getTimeout(), tm.getLogin(), tid);
00959 
00960     synchronized (task)
00961     {
00962       // Post the task in each backendThread tasklist and wakeup the threads
00963       for (int i = 0; i < nbOfThreads; i++)
00964       {
00965         BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i);
00966         synchronized (thread)
00967         {
00968           thread.addTask(task, tid);
00969           thread.notify();
00970         }
00971       }
00972 
00973       backendNonBlockingThreadsRWLock.releaseWrite();
00974 
00975       // Wait for completion (notified by the task)
00976       try
00977       {
00978         // Wait on task
00979         long timeout = tm.getTimeout();
00980         if (timeout > 0)
00981         {
00982           long start = System.currentTimeMillis();
00983           task.wait(timeout);
00984           long end = System.currentTimeMillis();
00985           long remaining = timeout - (end - start);
00986           if (remaining <= 0)
00987           {
00988             if (task.setExpiredTimeout())
00989             { // Task will be ignored by all backends
00990               String msg = Translate.get("loadbalancer.commit.timeout",
00991                   new String[]{String.valueOf(tid),
00992                       String.valueOf(task.getSuccess()),
00993                       String.valueOf(task.getFailed())});
00994               logger.warn(msg);
00995               throw new SQLException(msg);
00996             }
00997             // else task execution already started, to late to cancel
00998           }
00999         }
01000         else
01001           task.wait();
01002       }
01003       catch (InterruptedException e)
01004       {
01005         if (task.setExpiredTimeout())
01006         { // Task will be ignored by all backends
01007           String msg = Translate.get("loadbalancer.commit.timeout",
01008               new String[]{String.valueOf(tid),
01009                   String.valueOf(task.getSuccess()),
01010                   String.valueOf(task.getFailed())});
01011           logger.warn(msg);
01012           throw new SQLException(msg);
01013         }
01014         // else task execution already started, to late to cancel
01015       }
01016 
01017       if (task.getSuccess() > 0)
01018         return;
01019       else
01020       { // All tasks failed
01021         ArrayList exceptions = task.getExceptions();
01022         if (exceptions == null)
01023           throw new SQLException(Translate.get(
01024               "loadbalancer.commit.all.failed", tid));
01025         else
01026         {
01027           String errorMsg = Translate.get("loadbalancer.commit.failed.stack",
01028               tid)
01029               + "\n";
01030           for (int i = 0; i < exceptions.size(); i++)
01031             errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
01032           logger.error(errorMsg);
01033           throw new SQLException(errorMsg);
01034         }
01035       }
01036     }
01037   }
01038 
01039   /**
01040    * Rollbacks a transaction.
01041    * 
01042    * @param tm the transaction marker metadata
01043    * @exception SQLException if an error occurs
01044    */
01045   public void rollback(TransactionMarkerMetaData tm) throws SQLException
01046   {
01047     long tid = tm.getTransactionId();
01048     if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
01049       waitForAllWritesToComplete(tid);
01050 
01051     try
01052     {
01053       backendNonBlockingThreadsRWLock.acquireWrite();
01054     }
01055     catch (InterruptedException e)
01056     {
01057       String msg = Translate.get(
01058           "loadbalancer.backendlist.acquire.writelock.failed", e);
01059       logger.error(msg);
01060       throw new SQLException(msg);
01061     }
01062     int nbOfThreads = backendNonBlockingThreads.size();
01063     ArrayList rollbackList = new ArrayList();
01064     Long iTid = new Long(tid);
01065 
01066     // Build the list of backend that need to rollback this transaction
01067     for (int i = 0; i < nbOfThreads; i++)
01068     {
01069       BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
01070           .get(i);
01071       if (thread.getBackend().isStartedTransaction(iTid))
01072         rollbackList.add(thread);
01073     }
01074 
01075     nbOfThreads = rollbackList.size();
01076     if (nbOfThreads == 0)
01077     {
01078       backendNonBlockingThreadsRWLock.releaseWrite();
01079       return;
01080     }
01081 
01082     // Create the task
01083     RollbackTask task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads,
01084         tm.getTimeout(), tm.getLogin(), tid);
01085 
01086     synchronized (task)
01087     {
01088       // Post the task in each backendThread tasklist and wakeup the threads
01089       for (int i = 0; i < nbOfThreads; i++)
01090       {
01091         BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i);
01092         synchronized (thread)
01093         {
01094           thread.addTask(task, tid);
01095           thread.notify();
01096         }
01097       }
01098 
01099       backendNonBlockingThreadsRWLock.releaseWrite();
01100 
01101       // Wait for completion (notified by the task)
01102       try
01103       {
01104         // Wait on task
01105         long timeout = tm.getTimeout();
01106         if (timeout > 0)
01107         {
01108           long start = System.currentTimeMillis();
01109           task.wait(timeout);
01110           long end = System.currentTimeMillis();
01111           long remaining = timeout - (end - start);
01112           if (remaining <= 0)
01113           {
01114             if (task.setExpiredTimeout())
01115             { // Task will be ignored by all backends
01116               String msg = Translate.get("loadbalancer.rollback.timeout",
01117                   new String[]{String.valueOf(tid),
01118                       String.valueOf(task.getSuccess()),
01119                       String.valueOf(task.getFailed())});
01120               logger.warn(msg);
01121               throw new SQLException(msg);
01122             }
01123             // else task execution already started, to late to cancel
01124           }
01125         }
01126         else
01127           task.wait();
01128       }
01129       catch (InterruptedException e)
01130       {
01131         if (task.setExpiredTimeout())
01132         { // Task will be ignored by all backends
01133           String msg = Translate.get("loadbalancer.rollback.timeout",
01134               new String[]{String.valueOf(tid),
01135                   String.valueOf(task.getSuccess()),
01136                   String.valueOf(task.getFailed())});
01137           logger.warn(msg);
01138           throw new SQLException(msg);
01139         }
01140         // else task execution already started, to late to cancel
01141       }
01142 
01143       if (task.getSuccess() > 0)
01144         return;
01145       else
01146       { // All tasks failed
01147         ArrayList exceptions = task.getExceptions();
01148         if (exceptions == null)
01149           throw new SQLException(Translate.get(
01150               "loadbalancer.rollback.all.failed", tid));
01151         else
01152         {
01153           String errorMsg = Translate.get("loadbalancer.rollback.failed.stack",
01154               tid)
01155               + "\n";
01156           for (int i = 0; i < exceptions.size(); i++)
01157             errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
01158           logger.error(errorMsg);
01159           throw new SQLException(errorMsg);
01160         }
01161       }
01162     }
01163   }
01164 
01165   /**
01166    * Waits for all writes of the given transaction in the blocking thread queue
01167    * to complete before being able to complete the transaction.
01168    */
01169   protected void waitForAllWritesToComplete(long transactionId)
01170       throws SQLException
01171   {
01172     try
01173     {
01174       backendBlockingThreadsRWLock.acquireWrite();
01175       // Lock in write to ensure that all writes are posted and we wait in the
01176       // queue, else a read lock has the priority with the implementation we are
01177       // using.
01178     }
01179     catch (InterruptedException e)
01180     {
01181       String msg = Translate.get(
01182           "loadbalancer.backendlist.acquire.writelock.failed", e);
01183       logger.error(msg);
01184       throw new SQLException(msg);
01185     }
01186 
01187     int nbOfThreads = backendBlockingThreads.size();
01188 
01189     for (int i = 0; i < nbOfThreads; i++)
01190     {
01191       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01192           .get(i);
01193       thread.waitForAllTasksToComplete(transactionId);
01194     }
01195 
01196     backendBlockingThreadsRWLock.releaseWrite();
01197   }
01198 
01199   /**
01200    * Waits for all writes of the given transaction in the blocking thread queue
01201    * of the given backend to complete before being able to complete the
01202    * transaction.
01203    * 
01204    * @see #executeRequestOnBackend
01205    */
01206   protected void waitForAllWritesToComplete(DatabaseBackend backend,
01207       long transactionId) throws SQLException
01208   {
01209     try
01210     {
01211       backendBlockingThreadsRWLock.acquireWrite();
01212       // Lock in write to ensure that all writes are posted and we wait in the
01213       // queue, else a read lock has the priority with the implementation we are
01214       // using.
01215     }
01216     catch (InterruptedException e)
01217     {
01218       String msg = Translate.get(
01219           "loadbalancer.backendlist.acquire.writelock.failed", e);
01220       logger.error(msg);
01221       throw new SQLException(msg);
01222     }
01223 
01224     int nbOfThreads = backendBlockingThreads.size();
01225 
01226     for (int i = 0; i < nbOfThreads; i++)
01227     {
01228       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01229           .get(i);
01230       if (thread.getBackend() == backend)
01231         thread.waitForAllTasksToComplete(transactionId);
01232     }
01233 
01234     backendBlockingThreadsRWLock.releaseWrite();
01235   }
01236 
01237   /**
01238    * Waits for all writes in the blocking thread queue of the given backend to
01239    * complete.
01240    * 
01241    * @see #executeRequestOnBackend
01242    */
01243   protected void waitForAllWritesToComplete(DatabaseBackend backend)
01244       throws SQLException
01245   {
01246     try
01247     {
01248       backendBlockingThreadsRWLock.acquireWrite();
01249       // Lock in write to ensure that all writes are posted and we wait in the
01250       // queue, else a read lock has the priority with the implementation we are
01251       // using.
01252     }
01253     catch (InterruptedException e)
01254     {
01255       String msg = Translate.get(
01256           "loadbalancer.backendlist.acquire.writelock.failed", e);
01257       logger.error(msg);
01258       throw new SQLException(msg);
01259     }
01260 
01261     int nbOfThreads = backendBlockingThreads.size();
01262 
01263     for (int i = 0; i < nbOfThreads; i++)
01264     {
01265       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01266           .get(i);
01267       if (thread.getBackend() == backend)
01268         thread.waitForAllTasksToComplete();
01269     }
01270 
01271     backendBlockingThreadsRWLock.releaseWrite();
01272   }
01273 
01274   /*
01275    * Backends management
01276    */
01277 
01278   /**
01279    * Enables a Backend that was previously disabled.
01280    * <p>
01281    * Ask the corresponding connection manager to initialize the connections if
01282    * needed.
01283    * <p>
01284    * No sanity checks are performed by this function.
01285    * 
01286    * @param db The database backend to enable
01287    * @param writeEnabled True if the backend must be enabled for writes
01288    * @throws SQLException if an error occurs
01289    */
01290   public void enableBackend(DatabaseBackend db, boolean writeEnabled)
01291       throws SQLException
01292   {
01293     if (writeEnabled)
01294     {
01295       // Create 2 worker threads
01296       BackendWorkerThread blockingThread = new BackendWorkerThread(db, this);
01297       BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this);
01298 
01299       // Add first to the blocking thread list
01300       try
01301       {
01302         backendBlockingThreadsRWLock.acquireWrite();
01303       }
01304       catch (InterruptedException e)
01305       {
01306         String msg = Translate.get(
01307             "loadbalancer.backendlist.acquire.writelock.failed", e);
01308         logger.error(msg);
01309         throw new SQLException(msg);
01310       }
01311       backendBlockingThreads.add(blockingThread);
01312       backendBlockingThreadsRWLock.releaseWrite();
01313       blockingThread.start();
01314       logger.info(Translate.get(
01315           "loadbalancer.backend.workerthread.blocking.add", db.getName()));
01316 
01317       // Then add to the non-blocking thread list
01318       try
01319       {
01320         backendNonBlockingThreadsRWLock.acquireWrite();
01321       }
01322       catch (InterruptedException e)
01323       {
01324         String msg = Translate.get(
01325             "loadbalancer.backendlist.acquire.writelock.failed", e);
01326         logger.error(msg);
01327         throw new SQLException(msg);
01328       }
01329       backendNonBlockingThreads.add(nonBlockingThread);
01330       backendNonBlockingThreadsRWLock.releaseWrite();
01331       nonBlockingThread.start();
01332       logger.info(Translate.get(
01333           "loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
01334       db.enableWrite();
01335     }
01336 
01337     if (!db.isInitialized())
01338       db.initializeConnections();
01339     db.enableRead();
01340   }
01341 
01342   /**
01343    * Disables a backend that was previously enabled.
01344    * <p>
01345    * Ask the corresponding connection manager to finalize the connections if
01346    * needed.
01347    * <p>
01348    * No sanity checks are performed by this function.
01349    * 
01350    * @param db the database backend to disable
01351    * @throws SQLException if an error occurs
01352    */
01353   public synchronized void disableBackend(DatabaseBackend db)
01354       throws SQLException
01355   {
01356     if (db.isWriteEnabled())
01357     {
01358       // Start with the backendBlockingThread list
01359       try
01360       {
01361         backendBlockingThreadsRWLock.acquireWrite();
01362       }
01363       catch (InterruptedException e)
01364       {
01365         String msg = Translate.get(
01366             "loadbalancer.backendlist.acquire.writelock.failed", e);
01367         logger.error(msg);
01368         throw new SQLException(msg);
01369       }
01370 
01371       int nbOfThreads = backendBlockingThreads.size();
01372 
01373       // Find the right blocking thread
01374       for (int i = 0; i < nbOfThreads; i++)
01375       {
01376         BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01377             .get(i);
01378         if (thread.getBackend().equals(db))
01379         {
01380           logger.info(Translate
01381               .get("loadbalancer.backend.workerthread.blocking.remove", db
01382                   .getName()));
01383 
01384           backendBlockingThreads.remove(thread);
01385 
01386           synchronized (thread)
01387           {
01388             // Kill the thread
01389             thread.addPriorityTask(new KillThreadTask(1, 1));
01390             thread.notify();
01391           }
01392           break;
01393         }
01394       }
01395 
01396       backendBlockingThreadsRWLock.releaseWrite();
01397 
01398       // Continue with the backendNonBlockingThread list
01399 
01400       try
01401       {
01402         backendNonBlockingThreadsRWLock.acquireWrite();
01403       }
01404       catch (InterruptedException e)
01405       {
01406         String msg = Translate.get(
01407             "loadbalancer.backendlist.acquire.writelock.failed", e);
01408         logger.error(msg);
01409         throw new SQLException(msg);
01410       }
01411 
01412       // Find the right non-blocking thread
01413       nbOfThreads = backendNonBlockingThreads.size();
01414       for (int i = 0; i < nbOfThreads; i++)
01415       {
01416         BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
01417             .get(i);
01418         if (thread.getBackend().equals(db))
01419         {
01420           logger.info(Translate.get(
01421               "loadbalancer.backend.workerthread.non.blocking.remove", db
01422                   .getName()));
01423 
01424           backendNonBlockingThreads.remove(thread);
01425 
01426           synchronized (thread)
01427           {
01428             // Kill the thread
01429             thread.addPriorityTask(new KillThreadTask(1, 1));
01430             thread.notify();
01431           }
01432           break;
01433         }
01434       }
01435 
01436       backendNonBlockingThreadsRWLock.releaseWrite();
01437     }
01438 
01439     db.disable();
01440     if (db.isInitialized())
01441       db.finalizeConnections();
01442   }
01443 
01444   /**
01445    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
01446    */
01447   public String getXmlImpl()
01448   {
01449     StringBuffer info = new StringBuffer();
01450     info.append("<" + DatabasesXmlTags.ELT_RAIDb_2 + ">");
01451     if (createTablePolicy != null)
01452       info.append(createTablePolicy.getXml());
01453     if (waitForCompletionPolicy != null)
01454       info.append(waitForCompletionPolicy.getXml());
01455     if (macroHandler != null)
01456       info.append(macroHandler.getXml());
01457     this.getRaidb2Xml();
01458     info.append("</" + DatabasesXmlTags.ELT_RAIDb_2 + ">");
01459     return info.toString();
01460   }
01461 
01462   /**
01463    * return xml formatted information about this raidb2 load balancer
01464    * 
01465    * @return xml formatted string
01466    */
01467   public abstract String getRaidb2Xml();
01468 
01469 }

Generated on Mon Apr 11 22:01:33 2005 for C-JDBC by  doxygen 1.3.9.1