Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

RAIDb0.java

00001 /**
00002  * C-JDBC: Clustered JDBC.
00003  * Copyright (C) 2002-2004 French National Institute For Research In Computer
00004  * Science And Control (INRIA).
00005  * Contact: c-jdbc@objectweb.org
00006  * 
00007  * This library is free software; you can redistribute it and/or modify it
00008  * under the terms of the GNU Lesser General Public License as published by the
00009  * Free Software Foundation; either version 2.1 of the License, or any later
00010  * version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but WITHOUT
00013  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00014  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
00015  * for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public License
00018  * along with this library; if not, write to the Free Software Foundation,
00019  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
00020  *
00021  * Initial developer(s): Emmanuel Cecchet. 
00022  * Contributor(s): Jaco Swart
00023  */
00024 
00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb0;
00026 
00027 import java.sql.Connection;
00028 import java.sql.SQLException;
00029 import java.util.ArrayList;
00030 
00031 import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
00032 import org.objectweb.cjdbc.common.exceptions.NoMoreBackendException;
00033 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException;
00034 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
00035 import org.objectweb.cjdbc.common.i18n.Translate;
00036 import org.objectweb.cjdbc.common.log.Trace;
00037 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00038 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00039 import org.objectweb.cjdbc.common.sql.SelectRequest;
00040 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00041 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock;
00042 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00043 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00044 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
00045 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
00046 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00047 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00048 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableException;
00049 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy;
00050 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule;
00051 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask;
00052 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00053 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask;
00054 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00055 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00056 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00057 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00058 
00059 /**
00060  * RAIDb-0: database partitioning.
00061  * <p>
00062  * The requests are sent to the backend nodes hosting the tables needed to
00063  * execute the request. If no backend has the needed tables to perform a
00064  * request, it will fail.
00065  * 
00066  * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
00067  * @author <a href="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
00068  * @version 1.0
00069  */
00070 public class RAIDb0 extends AbstractLoadBalancer
00071 {
00072   /*
00073    * How the code is organized ? 1. Member variables 2. Constructor(s) 3.
00074    * Request handling 4. Transaction handling 5. Backend management 6.
00075    * Debug/Monitoring
00076    */
00077 
00078   private ArrayList                   backendThreads;
00079   private ReadPrioritaryFIFOWriteLock backendThreadsRWLock = new ReadPrioritaryFIFOWriteLock();
00080   private CreateTablePolicy           createTablePolicy;
00081 
00082   protected static Trace              logger               = Trace
00083                                                                .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb0");
00084 
00085   /*
00086    * Constructors
00087    */
00088 
00089   /**
00090    * Creates a new RAIDb-0 request load balancer.
00091    * 
00092    * @param vdb the virtual database this load balancer belongs to.
00093    * @param createTablePolicy the policy defining how 'create table' statements
00094    *          should be handled
00095    * @throws Exception if an error occurs
00096    */
00097   public RAIDb0(VirtualDatabase vdb, CreateTablePolicy createTablePolicy)
00098       throws Exception
00099   {
00100     super(vdb, RAIDbLevels.RAIDb0, ParsingGranularities.TABLE);
00101     backendThreads = new ArrayList();
00102     this.createTablePolicy = createTablePolicy;
00103   }
00104 
00105   /*
00106    * Request Handling
00107    */
00108 
00109   /**
00110    * Performs a read request on the backend that has the needed tables to
00111    * executes the request.
00112    * 
00113    * @param request an <code>SelectRequest</code>
00114    * @param metadataCache the metadataCache if any or null
00115    * @return the corresponding <code>java.sql.ResultSet</code>
00116    * @exception SQLException if an error occurs
00117    * @see AbstractLoadBalancer#execReadRequest(SelectRequest, MetadataCache)
00118    */
00119   public ControllerResultSet execReadRequest(SelectRequest request,
00120       MetadataCache metadataCache) throws SQLException
00121   {
00122     try
00123     {
00124       vdb.acquireReadLockBackendLists(); // Acquire
00125       // read
00126       // lock
00127     }
00128     catch (InterruptedException e)
00129     {
00130       String msg = Translate.get(
00131           "loadbalancer.backendlist.acquire.readlock.failed", e);
00132       logger.error(msg);
00133       throw new SQLException(msg);
00134     }
00135 
00136     try
00137     {
00138       ControllerResultSet rs = null;
00139       ArrayList fromTables = request.getFrom();
00140 
00141       if (fromTables == null)
00142         throw new SQLException(Translate.get("loadbalancer.from.not.found",
00143             request.getSQLShortForm(vdb.getSQLShortFormLength())));
00144 
00145       // Find the backend that has the needed tables
00146       ArrayList backends = vdb.getBackends();
00147       int size = backends.size();
00148       int enabledBackends = 0;
00149 
00150       DatabaseBackend backend = null;
00151       // The backend that will execute the query
00152       for (int i = 0; i < size; i++)
00153       {
00154         backend = (DatabaseBackend) backends.get(i);
00155         if (backend.isReadEnabled())
00156           enabledBackends++;
00157         if (backend.isReadEnabled() && backend.hasTables(fromTables))
00158           break;
00159         else
00160           backend = null;
00161       }
00162 
00163       if (backend == null)
00164       {
00165         if (enabledBackends == 0)
00166           throw new NoMoreBackendException(Translate.get(
00167               "loadbalancer.execute.no.backend.enabled", request.getId()));
00168         else
00169           throw new SQLException(Translate.get(
00170               "loadbalancer.backend.no.required.tables", fromTables.toString()));
00171       }
00172 
00173       if (logger.isDebugEnabled())
00174       {
00175         logger.debug("Backend " + backend.getName()
00176             + " has all tables which are:");
00177         for (int i = 0; i < fromTables.size(); i++)
00178         {
00179           logger.debug(fromTables.get(i));
00180         }
00181       }
00182 
00183       // Execute the request on the chosen backend
00184       try
00185       {
00186         rs = executeRequestOnBackend(request, backend, metadataCache);
00187       }
00188       catch (SQLException se)
00189       {
00190         String msg = Translate.get("loadbalancer.request.failed", new String[]{
00191             String.valueOf(request.getId()), se.getMessage()});
00192         if (logger.isInfoEnabled())
00193           logger.info(msg);
00194         throw new SQLException(msg);
00195       }
00196 
00197       return rs;
00198     }
00199     catch (RuntimeException e)
00200     {
00201       String msg = Translate
00202           .get("loadbalancer.request.failed", new String[]{
00203               request.getSQLShortForm(vdb.getSQLShortFormLength()),
00204               e.getMessage()});
00205       logger.fatal(msg, e);
00206       throw new SQLException(msg);
00207     }
00208     finally
00209     {
00210       vdb.releaseReadLockBackendLists(); // Release
00211       // the
00212       // lock
00213     }
00214   }
00215 
00216   /**
00217    * Performs a write request on the backend that has the needed tables to
00218    * executes the request.
00219    * 
00220    * @param request an <code>AbstractWriteRequest</code>
00221    * @return number of rows affected by the request
00222    * @exception SQLException if an error occurs
00223    */
00224   public int execWriteRequest(AbstractWriteRequest request) throws SQLException
00225   {
00226     // Handle macros
00227     handleMacros(request);
00228 
00229     try
00230     {
00231       vdb.acquireReadLockBackendLists(); // Acquire
00232       // read
00233       // lock
00234     }
00235     catch (InterruptedException e)
00236     {
00237       String msg = Translate.get(
00238           "loadbalancer.backendlist.acquire.readlock.failed", e);
00239       logger.error(msg);
00240       throw new SQLException(msg);
00241     }
00242 
00243     try
00244     {
00245       String table = request.getTableName();
00246       AbstractConnectionManager cm = null;
00247 
00248       if (table == null)
00249         throw new SQLException(Translate.get(
00250             "loadbalancer.request.target.table.not.found", request
00251                 .getSQLShortForm(vdb.getSQLShortFormLength())));
00252 
00253       // Find the backend that has the needed table
00254       ArrayList backends = vdb.getBackends();
00255       int size = backends.size();
00256 
00257       DatabaseBackend backend = null;
00258       // The backend that will execute the query
00259       if (request.isCreate())
00260       { // Choose the backend according to the defined policy
00261         CreateTableRule rule = createTablePolicy.getTableRule(request
00262             .getTableName());
00263         if (rule == null)
00264           rule = createTablePolicy.getDefaultRule();
00265 
00266         // Ask the rule to pickup a backend
00267         ArrayList choosen;
00268         try
00269         {
00270           choosen = rule.getBackends(backends);
00271         }
00272         catch (CreateTableException e)
00273         {
00274           throw new SQLException(Translate.get(
00275               "loadbalancer.create.table.rule.failed", e.getMessage()));
00276         }
00277 
00278         // Get the connection manager from the chosen backend
00279         if (choosen != null)
00280           backend = (DatabaseBackend) choosen.get(0);
00281         if (backend != null)
00282           cm = backend.getConnectionManager(request.getLogin());
00283       }
00284       else
00285       { // Find the backend that has the table
00286         for (int i = 0; i < size; i++)
00287         {
00288           backend = (DatabaseBackend) backends.get(i);
00289           if (backend.isWriteEnabled() && backend.hasTable(table))
00290           {
00291             cm = backend.getConnectionManager(request.getLogin());
00292             break;
00293           }
00294         }
00295       }
00296 
00297       // Sanity check
00298       if (cm == null)
00299         throw new SQLException(Translate.get(
00300             "loadbalancer.backend.no.required.table", table));
00301 
00302       // Ok, let's execute the query
00303 
00304       if (request.isAutoCommit())
00305       {
00306         // We do not execute request outside the already open transactions if we
00307         // are disabling the backend.
00308         if (backend.isDisabling())
00309           throw new SQLException(Translate.get(
00310               "loadbalancer.backend.is.disabling", new String[]{
00311                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00312                   backend.getName()}));
00313 
00314         // Use a connection just for this request
00315         Connection c = null;
00316         try
00317         {
00318           c = cm.getConnection();
00319         }
00320         catch (UnreachableBackendException e1)
00321         {
00322           logger.error(Translate.get(
00323               "loadbalancer.backend.disabling.unreachable", backend.getName()));
00324           disableBackend(backend);
00325           throw new SQLException(Translate.get(
00326               "loadbalancer.backend.unreacheable", backend.getName()));
00327         }
00328 
00329         // Sanity check
00330         if (c == null)
00331           throw new SQLException(Translate.get(
00332               "loadbalancer.backend.no.connection", backend.getName()));
00333 
00334         int result;
00335         try
00336         {
00337           result = executeUpdateRequestOnBackend(request, backend, c);
00338         }
00339         catch (Exception e)
00340         {
00341           throw new SQLException(Translate.get("loadbalancer.request.failed",
00342               new String[]{
00343                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00344                   e.getMessage()}));
00345         }
00346         finally
00347         {
00348           cm.releaseConnection(c);
00349         }
00350         if (logger.isDebugEnabled())
00351           logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00352               String.valueOf(request.getId()), backend.getName()}));
00353         return result;
00354       }
00355       else
00356       { // Inside a transaction
00357         Connection c;
00358         long tid = request.getTransactionId();
00359         Long lTid = new Long(tid);
00360 
00361         try
00362         {
00363           c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00364         }
00365         catch (UnreachableBackendException e1)
00366         {
00367           logger.error(Translate.get(
00368               "loadbalancer.backend.disabling.unreachable", backend.getName()));
00369           disableBackend(backend);
00370           throw new SQLException(Translate.get(
00371               "loadbalancer.backend.unreacheable", backend.getName()));
00372         }
00373         catch (NoTransactionStartWhenDisablingException e)
00374         {
00375           String msg = Translate.get("loadbalancer.backend.is.disabling",
00376               new String[]{
00377                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00378                   backend.getName()});
00379           logger.error(msg);
00380           throw new SQLException(msg);
00381         }
00382 
00383         // Sanity check
00384         if (c == null)
00385           throw new SQLException(Translate.get(
00386               "loadbalancer.unable.retrieve.connection", new String[]{
00387                   String.valueOf(tid), backend.getName()}));
00388 
00389         // Execute the query
00390         int result;
00391         try
00392         {
00393           result = executeUpdateRequestOnBackend(request, backend, c);
00394         }
00395         catch (Exception e)
00396         {
00397           throw new SQLException(Translate.get("loadbalancer.request.failed",
00398               new String[]{
00399                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00400                   e.getMessage()}));
00401         }
00402         if (logger.isDebugEnabled())
00403           logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00404               String.valueOf(request.getId()), backend.getName()}));
00405         return result;
00406       }
00407     }
00408     catch (RuntimeException e)
00409     {
00410       String msg = Translate
00411           .get("loadbalancer.request.failed", new String[]{
00412               request.getSQLShortForm(vdb.getSQLShortFormLength()),
00413               e.getMessage()});
00414       logger.fatal(msg, e);
00415       throw new SQLException(msg);
00416     }
00417     finally
00418     {
00419       vdb.releaseReadLockBackendLists(); // Release
00420       // the
00421       // lock
00422     }
00423   }
00424 
00425   /**
00426    * @see AbstractLoadBalancer#execWriteRequestWithKeys(AbstractWriteRequest,
00427    *      MetadataCache)
00428    */
00429   public ControllerResultSet execWriteRequestWithKeys(
00430       AbstractWriteRequest request, MetadataCache metadataCache)
00431       throws SQLException
00432   {
00433     // Handle macros
00434     handleMacros(request);
00435 
00436     try
00437     {
00438       vdb.acquireReadLockBackendLists(); // Acquire
00439       // read
00440       // lock
00441     }
00442     catch (InterruptedException e)
00443     {
00444       String msg = Translate.get(
00445           "loadbalancer.backendlist.acquire.readlock.failed", e);
00446       logger.error(msg);
00447       throw new SQLException(msg);
00448     }
00449 
00450     try
00451     {
00452       String table = request.getTableName();
00453       AbstractConnectionManager cm = null;
00454 
00455       if (table == null)
00456         throw new SQLException(Translate.get(
00457             "loadbalancer.request.target.table.not.found", request
00458                 .getSQLShortForm(vdb.getSQLShortFormLength())));
00459 
00460       // Find the backend that has the needed table
00461       ArrayList backends = vdb.getBackends();
00462       int size = backends.size();
00463 
00464       DatabaseBackend backend = null;
00465       // The backend that will execute the query
00466       if (request.isCreate())
00467       { // Choose the backend according to the defined policy
00468         CreateTableRule rule = createTablePolicy.getTableRule(request
00469             .getTableName());
00470         if (rule == null)
00471           rule = createTablePolicy.getDefaultRule();
00472 
00473         // Ask the rule to pickup a backend
00474         ArrayList choosen;
00475         try
00476         {
00477           choosen = rule.getBackends(backends);
00478         }
00479         catch (CreateTableException e)
00480         {
00481           throw new SQLException(Translate.get(
00482               "loadbalancer.create.table.rule.failed", e.getMessage()));
00483         }
00484 
00485         // Get the connection manager from the chosen backend
00486         if (choosen != null)
00487           backend = (DatabaseBackend) choosen.get(0);
00488         if (backend != null)
00489           cm = backend.getConnectionManager(request.getLogin());
00490       }
00491       else
00492       { // Find the backend that has the table
00493         for (int i = 0; i < size; i++)
00494         {
00495           backend = (DatabaseBackend) backends.get(i);
00496           if (backend.isWriteEnabled() && backend.hasTable(table))
00497           {
00498             cm = backend.getConnectionManager(request.getLogin());
00499             break;
00500           }
00501         }
00502       }
00503 
00504       // Sanity check
00505       if (cm == null)
00506         throw new SQLException(Translate.get(
00507             "loadbalancer.backend.no.required.table", table));
00508 
00509       if (!backend.getDriverCompliance().supportGetGeneratedKeys())
00510         throw new SQLException(Translate.get(
00511             "loadbalancer.backend.autogeneratedkeys.unsupported", backend
00512                 .getName()));
00513 
00514       // Ok, let's execute the query
00515 
00516       if (request.isAutoCommit())
00517       {
00518         // We do not execute request outside the already open transactions if we
00519         // are disabling the backend.
00520         if (backend.isDisabling())
00521           throw new SQLException(Translate.get(
00522               "loadbalancer.backend.is.disabling", new String[]{
00523                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00524                   backend.getName()}));
00525 
00526         // Use a connection just for this request
00527         Connection c = null;
00528         try
00529         {
00530           c = cm.getConnection();
00531         }
00532         catch (UnreachableBackendException e1)
00533         {
00534           logger.error(Translate.get(
00535               "loadbalancer.backend.disabling.unreachable", backend.getName()));
00536           disableBackend(backend);
00537           throw new SQLException(Translate.get(
00538               "loadbalancer.backend.unreacheable", backend.getName()));
00539         }
00540 
00541         // Sanity check
00542         if (c == null)
00543           throw new SQLException(Translate.get(
00544               "loadbalancer.backend.no.connection", backend.getName()));
00545 
00546         // Execute Query
00547         ControllerResultSet result;
00548         try
00549         {
00550           result = executeUpdateRequestOnBackendWithKeys(request, backend, c,
00551               metadataCache);
00552         }
00553         catch (Exception e)
00554         {
00555           throw new SQLException(Translate.get("loadbalancer.request.failed",
00556               new String[]{
00557                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00558                   e.getMessage()}));
00559         }
00560         finally
00561         {
00562           backend.removePendingRequest(request);
00563           cm.releaseConnection(c);
00564         }
00565         if (logger.isDebugEnabled())
00566           logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00567               String.valueOf(request.getId()), backend.getName()}));
00568         return result;
00569       }
00570       else
00571       { // Inside a transaction
00572         Connection c;
00573         long tid = request.getTransactionId();
00574         Long lTid = new Long(tid);
00575 
00576         try
00577         {
00578           c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00579         }
00580         catch (UnreachableBackendException e1)
00581         {
00582           logger.error(Translate.get(
00583               "loadbalancer.backend.disabling.unreachable", backend.getName()));
00584           disableBackend(backend);
00585           throw new SQLException(Translate.get(
00586               "loadbalancer.backend.unreacheable", backend.getName()));
00587         }
00588         catch (NoTransactionStartWhenDisablingException e)
00589         {
00590           String msg = Translate.get("loadbalancer.backend.is.disabling",
00591               new String[]{
00592                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00593                   backend.getName()});
00594           logger.error(msg);
00595           throw new SQLException(msg);
00596         }
00597 
00598         // Sanity check
00599         if (c == null)
00600           throw new SQLException(Translate.get(
00601               "loadbalancer.unable.retrieve.connection", new String[]{
00602                   String.valueOf(tid), backend.getName()}));
00603 
00604         // Execute the query
00605         ControllerResultSet result;
00606         try
00607         {
00608           result = executeUpdateRequestOnBackendWithKeys(request, backend, c,
00609               metadataCache);
00610         }
00611         catch (Exception e)
00612         {
00613           throw new SQLException(Translate.get("loadbalancer.request.failed",
00614               new String[]{
00615                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00616                   e.getMessage()}));
00617         }
00618         if (logger.isDebugEnabled())
00619           logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00620               String.valueOf(request.getId()), backend.getName()}));
00621         return result;
00622       }
00623     }
00624     catch (RuntimeException e)
00625     {
00626       String msg = Translate
00627           .get("loadbalancer.request.failed", new String[]{
00628               request.getSQLShortForm(vdb.getSQLShortFormLength()),
00629               e.getMessage()});
00630       logger.fatal(msg, e);
00631       throw new SQLException(msg);
00632     }
00633     finally
00634     {
00635       vdb.releaseReadLockBackendLists(); // Release
00636       // the
00637       // lock
00638     }
00639   }
00640 
00641   /**
00642    * Execute a read request on the selected backend.
00643    * 
00644    * @param request the request to execute
00645    * @param backend the backend that will execute the request
00646    * @param metadataCache the metadataCache if any or null
00647    * @return the ControllerResultSet
00648    * @throws SQLException if an error occurs
00649    */
00650   protected ControllerResultSet executeRequestOnBackend(SelectRequest request,
00651       DatabaseBackend backend, MetadataCache metadataCache) throws SQLException
00652   {
00653     // Handle macros
00654     handleMacros(request);
00655 
00656     // Ok, we have a backend, let's execute the request
00657     AbstractConnectionManager cm = backend.getConnectionManager(request
00658         .getLogin());
00659 
00660     // Sanity check
00661     if (cm == null)
00662     {
00663       String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00664           new String[]{request.getLogin(), backend.getName()});
00665       logger.error(msg);
00666       throw new SQLException(msg);
00667     }
00668 
00669     // Execute the query
00670     if (request.isAutoCommit())
00671     {
00672       ControllerResultSet rs = null;
00673       boolean badConnection;
00674       do
00675       {
00676         badConnection = false;
00677         // Use a connection just for this request
00678         Connection c = null;
00679         try
00680         {
00681           c = cm.getConnection();
00682         }
00683         catch (UnreachableBackendException e1)
00684         {
00685           logger.error(Translate.get(
00686               "loadbalancer.backend.disabling.unreachable", backend.getName()));
00687           disableBackend(backend);
00688           throw new SQLException(Translate.get(
00689               "loadbalancer.backend.unreacheable", backend.getName()));
00690         }
00691 
00692         // Sanity check
00693         if (c == null)
00694           throw new SQLException(Translate.get(
00695               "loadbalancer.backend.no.connection", backend.getName()));
00696 
00697         // Execute Query
00698         try
00699         {
00700           rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00701           cm.releaseConnection(c);
00702         }
00703         catch (SQLException e)
00704         {
00705           cm.releaseConnection(c);
00706           throw new SQLException(Translate.get(
00707               "loadbalancer.request.failed.on.backend", new String[]{
00708                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00709                   backend.getName(), e.getMessage()}));
00710         }
00711         catch (BadConnectionException e)
00712         { // Get rid of the bad connection
00713           cm.deleteConnection(c);
00714           badConnection = true;
00715         }
00716       }
00717       while (badConnection);
00718       if (logger.isDebugEnabled())
00719         logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00720             String.valueOf(request.getId()), backend.getName()}));
00721       return rs;
00722     }
00723     else
00724     { // Inside a transaction
00725       Connection c;
00726       long tid = request.getTransactionId();
00727       Long lTid = new Long(tid);
00728 
00729       try
00730       {
00731         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00732       }
00733       catch (UnreachableBackendException e1)
00734       {
00735         logger.error(Translate.get(
00736             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00737         disableBackend(backend);
00738         throw new SQLException(Translate.get(
00739             "loadbalancer.backend.unreacheable", backend.getName()));
00740       }
00741       catch (NoTransactionStartWhenDisablingException e)
00742       {
00743         String msg = Translate.get("loadbalancer.backend.is.disabling",
00744             new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00745                 backend.getName()});
00746         logger.error(msg);
00747         throw new SQLException(msg);
00748       }
00749 
00750       // Sanity check
00751       if (c == null)
00752         throw new SQLException(Translate.get(
00753             "loadbalancer.unable.retrieve.connection", new String[]{
00754                 String.valueOf(tid), backend.getName()}));
00755 
00756       // Execute Query
00757       ControllerResultSet rs = null;
00758       try
00759       {
00760         rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00761       }
00762       catch (SQLException e)
00763       {
00764         throw new SQLException(Translate.get(
00765             "loadbalancer.request.failed.on.backend", new String[]{
00766                 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00767                 backend.getName(), e.getMessage()}));
00768       }
00769       catch (BadConnectionException e)
00770       { // Get rid of the bad connection
00771         cm.deleteConnection(tid);
00772         throw new SQLException(Translate
00773             .get("loadbalancer.connection.failed", new String[]{
00774                 String.valueOf(tid), backend.getName(), e.getMessage()}));
00775       }
00776       if (logger.isDebugEnabled())
00777         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00778             new String[]{String.valueOf(tid), String.valueOf(request.getId()),
00779                 backend.getName()}));
00780       return rs;
00781     }
00782   }
00783 
00784   /**
00785    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execReadOnlyReadStoredProcedure(StoredProcedure,
00786    *      MetadataCache)
00787    */
00788   public ControllerResultSet execReadOnlyReadStoredProcedure(
00789       StoredProcedure proc, MetadataCache metadataCache) throws SQLException
00790   {
00791     throw new SQLException(
00792         "Stored procedure calls are not supported with RAIDb-0 load balancers.");
00793   }
00794 
00795   /**
00796    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execReadStoredProcedure(StoredProcedure,
00797    *      MetadataCache)
00798    */
00799   public ControllerResultSet execReadStoredProcedure(StoredProcedure proc,
00800       MetadataCache metadataCache) throws SQLException
00801   {
00802     throw new SQLException(
00803         "Stored procedure calls are not supported with RAIDb-0 load balancers.");
00804   }
00805 
00806   /**
00807    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execWriteStoredProcedure(org.objectweb.cjdbc.common.sql.StoredProcedure)
00808    */
00809   public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException
00810   {
00811     throw new SQLException(
00812         "Stored procedure calls are not supported with RAIDb-0 load balancers.");
00813   }
00814 
00815   /*
00816    * Transaction management
00817    */
00818 
00819   /**
00820    * Begins a new transaction.
00821    * 
00822    * @param tm the transaction marker metadata
00823    * @throws SQLException if an error occurs
00824    */
00825   public final void begin(TransactionMarkerMetaData tm) throws SQLException
00826   {
00827   }
00828 
00829   /**
00830    * Commits a transaction.
00831    * 
00832    * @param tm the transaction marker metadata
00833    * @throws SQLException if an error occurs
00834    */
00835   public void commit(TransactionMarkerMetaData tm) throws SQLException
00836   {
00837     try
00838     {
00839       backendThreadsRWLock.acquireRead();
00840     }
00841     catch (InterruptedException e)
00842     {
00843       String msg = Translate.get(
00844           "loadbalancer.backendlist.acquire.readlock.failed", e);
00845       logger.error(msg);
00846       throw new SQLException(msg);
00847     }
00848 
00849     int nbOfThreads = backendThreads.size();
00850     ArrayList commitList = new ArrayList();
00851     long tid = tm.getTransactionId();
00852     Long lTid = new Long(tid);
00853 
00854     // Build the list of backend that need to commit this transaction
00855     for (int i = 0; i < nbOfThreads; i++)
00856     {
00857       BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i);
00858       if (thread.getBackend().isStartedTransaction(lTid))
00859         commitList.add(thread);
00860     }
00861 
00862     nbOfThreads = commitList.size();
00863     if (nbOfThreads == 0)
00864     { // Empty transaction or read-only with cache hits, then no backend
00865       // actually executed any query for that transaction. Simply return.
00866       // Bug reported by Ganesh (ssriganesha@rogers.com).
00867       backendThreadsRWLock.releaseRead();
00868       return;
00869     }
00870 
00871     // Create the task
00872     CommitTask task = new CommitTask(nbOfThreads, // Wait for all to commit
00873         nbOfThreads, tm.getTimeout(), tm.getLogin(), tid);
00874 
00875     synchronized (task)
00876     {
00877       // Post the task in each backendThread tasklist and wakeup the threads
00878       for (int i = 0; i < nbOfThreads; i++)
00879       {
00880         BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i);
00881         synchronized (thread)
00882         {
00883           thread.addTask(task, tid);
00884           thread.notify();
00885         }
00886       }
00887 
00888       backendThreadsRWLock.releaseRead();
00889 
00890       // Wait for completion (notified by the task)
00891       try
00892       {
00893         // Wait on task
00894         long timeout = tm.getTimeout();
00895         if (timeout > 0)
00896         {
00897           long start = System.currentTimeMillis();
00898           task.wait(timeout);
00899           long end = System.currentTimeMillis();
00900           long remaining = timeout - (end - start);
00901           if (remaining <= 0)
00902           {
00903             if (task.setExpiredTimeout())
00904             { // Task will be ignored by all backends
00905               String msg = Translate.get("loadbalancer.commit.timeout",
00906                   new String[]{String.valueOf(tid),
00907                       String.valueOf(task.getSuccess()),
00908                       String.valueOf(task.getFailed())});
00909               logger.warn(msg);
00910               throw new SQLException(msg);
00911             }
00912             // else task execution already started, to late to cancel
00913           }
00914         }
00915         else
00916           task.wait();
00917       }
00918       catch (InterruptedException e)
00919       {
00920         if (task.setExpiredTimeout())
00921         { // Task will be ignored by all backends
00922           String msg = Translate.get("loadbalancer.commit.timeout",
00923               new String[]{String.valueOf(tid),
00924                   String.valueOf(task.getSuccess()),
00925                   String.valueOf(task.getFailed())});
00926           logger.warn(msg);
00927           throw new SQLException(msg);
00928         }
00929         // else task execution already started, to late to cancel
00930       }
00931 
00932       if (task.getSuccess() > 0)
00933         return;
00934       else
00935       { // All tasks failed
00936         ArrayList exceptions = task.getExceptions();
00937         if (exceptions == null)
00938           throw new SQLException(Translate.get(
00939               "loadbalancer.commit.all.failed", tid));
00940         else
00941         {
00942           String errorMsg = Translate.get("loadbalancer.commit.failed.stack",
00943               tid)
00944               + "\n";
00945           for (int i = 0; i < exceptions.size(); i++)
00946             errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
00947           logger.error(errorMsg);
00948           throw new SQLException(errorMsg);
00949         }
00950       }
00951     }
00952   }
00953 
00954   /**
00955    * Rollbacks a transaction.
00956    * 
00957    * @param tm the transaction marker metadata
00958    * @throws SQLException if an error occurs
00959    */
00960   public void rollback(TransactionMarkerMetaData tm) throws SQLException
00961   {
00962     try
00963     {
00964       backendThreadsRWLock.acquireRead();
00965     }
00966     catch (InterruptedException e)
00967     {
00968       String msg = Translate.get(
00969           "loadbalancer.backendlist.acquire.readlock.failed", e);
00970       logger.error(msg);
00971       throw new SQLException(msg);
00972     }
00973     int nbOfThreads = backendThreads.size();
00974     ArrayList rollbackList = new ArrayList();
00975     long tid = tm.getTransactionId();
00976     Long lTid = new Long(tid);
00977 
00978     // Build the list of backend that need to rollback this transaction
00979     for (int i = 0; i < nbOfThreads; i++)
00980     {
00981       BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i);
00982       if (thread.getBackend().isStartedTransaction(lTid))
00983         rollbackList.add(thread);
00984     }
00985 
00986     nbOfThreads = rollbackList.size();
00987     if (nbOfThreads == 0)
00988     { // Empty transaction or read-only with cache hits, then no backend
00989       // actually executed any query for that transaction. Simply return.
00990       // Bug reported by Ganesh (ssriganesha@rogers.com).
00991       backendThreadsRWLock.releaseRead();
00992       return;
00993     }
00994 
00995     // Create the task
00996     RollbackTask task = new RollbackTask(nbOfThreads, // Wait for all to
00997         // rollback
00998         nbOfThreads, tm.getTimeout(), tm.getLogin(), tid);
00999 
01000     synchronized (task)
01001     {
01002       // Post the task in each backendThread tasklist and wakeup the threads
01003       for (int i = 0; i < nbOfThreads; i++)
01004       {
01005         BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i);
01006         synchronized (thread)
01007         {
01008           thread.addTask(task, tid);
01009           thread.notify();
01010         }
01011       }
01012 
01013       backendThreadsRWLock.releaseRead();
01014 
01015       // Wait for completion (notified by the task)
01016       try
01017       {
01018         // Wait on task
01019         long timeout = tm.getTimeout();
01020         if (timeout > 0)
01021         {
01022           long start = System.currentTimeMillis();
01023           task.wait(timeout);
01024           long end = System.currentTimeMillis();
01025           long remaining = timeout - (end - start);
01026           if (remaining <= 0)
01027           {
01028             if (task.setExpiredTimeout())
01029             { // Task will be ignored by all backends
01030               String msg = Translate.get("loadbalancer.rollback.timeout",
01031                   new String[]{String.valueOf(tid),
01032                       String.valueOf(task.getSuccess()),
01033                       String.valueOf(task.getFailed())});
01034               logger.warn(msg);
01035               throw new SQLException(msg);
01036             }
01037             // else task execution already started, to late to cancel
01038           }
01039         }
01040         else
01041           task.wait();
01042       }
01043       catch (InterruptedException e)
01044       {
01045         if (task.setExpiredTimeout())
01046         { // Task will be ignored by all backends
01047           String msg = Translate.get("loadbalancer.rollback.timeout",
01048               new String[]{String.valueOf(tid),
01049                   String.valueOf(task.getSuccess()),
01050                   String.valueOf(task.getFailed())});
01051           logger.warn(msg);
01052           throw new SQLException(msg);
01053         }
01054         // else task execution already started, to late to cancel
01055       }
01056 
01057       if (task.getSuccess() > 0)
01058         return;
01059       else
01060       { // All tasks failed
01061         ArrayList exceptions = task.getExceptions();
01062         if (exceptions == null)
01063           throw new SQLException(Translate.get(
01064               "loadbalancer.rollback.all.failed", tid));
01065         else
01066         {
01067           String errorMsg = Translate.get("loadbalancer.rollback.failed.stack",
01068               tid)
01069               + "\n";
01070           for (int i = 0; i < exceptions.size(); i++)
01071             errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
01072           logger.error(errorMsg);
01073           throw new SQLException(errorMsg);
01074         }
01075       }
01076     }
01077   }
01078 
01079   /*
01080    * Backends management
01081    */
01082 
01083   /**
01084    * Enables a Backend that was previously disabled.
01085    * <p>
01086    * Ask the corresponding connection manager to initialize the connections if
01087    * needed.
01088    * <p>
01089    * No sanity checks are performed by this function.
01090    * 
01091    * @param db the database backend to enable
01092    * @param writeEnabled True if the backend must be enabled for writes
01093    * @throws SQLException if an error occurs
01094    */
01095   public void enableBackend(DatabaseBackend db, boolean writeEnabled)
01096       throws SQLException
01097   {
01098     // Create a worker thread and add it to the list
01099     BackendWorkerThread thread = new BackendWorkerThread(db, this);
01100     try
01101     {
01102       backendThreadsRWLock.acquireWrite();
01103     }
01104     catch (InterruptedException e)
01105     {
01106       String msg = Translate.get(
01107           "loadbalancer.backendlist.acquire.writelock.failed", e);
01108       logger.error(msg);
01109       throw new SQLException(msg);
01110     }
01111     backendThreads.add(thread);
01112     backendThreadsRWLock.releaseWrite();
01113     thread.start();
01114     logger.info(Translate.get("loadbalancer.backend.workerthread.add", db
01115         .getName()));
01116 
01117     if (!db.isInitialized())
01118       db.initializeConnections();
01119     db.enableRead();
01120     if (writeEnabled)
01121       db.enableWrite();
01122   }
01123 
01124   /**
01125    * Disables a backend that was previously enabled.
01126    * <p>
01127    * Ask the corresponding connection manager to finalize the connections if
01128    * needed.
01129    * <p>
01130    * No sanity checks are performed by this function.
01131    * 
01132    * @param db the database backend to disable
01133    * @throws SQLException if an error occurs
01134    */
01135   public synchronized void disableBackend(DatabaseBackend db)
01136       throws SQLException
01137   {
01138     int nbOfThreads = backendThreads.size();
01139 
01140     // Find the right thread
01141     for (int i = 0; i < nbOfThreads; i++)
01142     {
01143       BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i);
01144       if (thread.getBackend().equals(db))
01145       {
01146         logger.info(Translate.get("loadbalancer.backend.workerthread.remove",
01147             db.getName()));
01148 
01149         // Remove it from the backendThread list
01150         try
01151         {
01152           backendThreadsRWLock.acquireWrite();
01153         }
01154         catch (InterruptedException e)
01155         {
01156           String msg = Translate.get(
01157               "loadbalancer.backendlist.acquire.writelock.failed", e);
01158           logger.error(msg);
01159           throw new SQLException(msg);
01160         }
01161         backendThreads.remove(thread);
01162         backendThreadsRWLock.releaseWrite();
01163 
01164         synchronized (thread)
01165         {
01166           // Kill the thread
01167           thread.addPriorityTask(new KillThreadTask(1, 1));
01168           thread.notify();
01169         }
01170         break;
01171       }
01172     }
01173 
01174     db.disable();
01175     if (db.isInitialized())
01176       db.finalizeConnections();
01177   }
01178 
01179   /**
01180    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#setWeight(String,
01181    *      int)
01182    */
01183   public void setWeight(String name, int w) throws SQLException
01184   {
01185     throw new SQLException("Weight is not supported with this load balancer");
01186   }
01187 
01188   /*
01189    * Debug/Monitoring
01190    */
01191 
01192   /**
01193    * Get information about the Request load balancer
01194    * 
01195    * @return <code>String</code> containing information
01196    */
01197   public String getInformation()
01198   {
01199     return "RAIDb-0 Request load balancer\n";
01200   }
01201 
01202   /**
01203    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
01204    */
01205   public String getXmlImpl()
01206   {
01207     StringBuffer info = new StringBuffer();
01208     info.append("<" + DatabasesXmlTags.ELT_RAIDb_0 + ">");
01209     createTablePolicy.getXml();
01210     info.append("</" + DatabasesXmlTags.ELT_RAIDb_0 + ">");
01211     return info.toString();
01212   }
01213 
01214 }

Generated on Mon Apr 11 22:01:33 2005 for C-JDBC by  doxygen 1.3.9.1