Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

ParallelDB.java

00001 /**
00002  * C-JDBC: Clustered JDBC.
00003  * Copyright (C) 2002-2004 French National Institute For Research In Computer
00004  * Science And Control (INRIA).
00005  * Contact: c-jdbc@objectweb.org
00006  *
00007  * This library is free software; you can redistribute it and/or modify it
00008  * under the terms of the GNU Lesser General Public License as published by the
00009  * Free Software Foundation; either version 2.1 of the License, or any later
00010  * version.
00011  *
00012  * This library is distributed in the hope that it will be useful, but WITHOUT
00013  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00014  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
00015  * for more details.
00016  *
00017  * You should have received a copy of the GNU Lesser General Public License
00018  * along with this library; if not, write to the Free Software Foundation,
00019  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
00020  *
00021  * Initial developer(s): Emmanuel Cecchet.
00022  * Contributor(s): Jaco Swart.
00023  */
00024 
00025 package org.objectweb.cjdbc.controller.loadbalancer.paralleldb;
00026 
00027 import java.sql.Connection;
00028 import java.sql.SQLException;
00029 import java.util.Hashtable;
00030 
00031 import javax.management.NotCompliantMBeanException;
00032 
00033 import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
00034 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException;
00035 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
00036 import org.objectweb.cjdbc.common.i18n.Translate;
00037 import org.objectweb.cjdbc.common.sql.AbstractRequest;
00038 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00039 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00040 import org.objectweb.cjdbc.common.sql.SelectRequest;
00041 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00042 import org.objectweb.cjdbc.common.sql.UnknownRequest;
00043 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00044 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00045 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
00046 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
00047 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00048 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00049 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00050 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00051 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00052 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00053 
00054 /**
00055  * This class defines a ParallelDB
00056  *
00057  * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
00058  * @version 1.0
00059  */
00060 /**
00061  * These are generic functions for all ParallelDB load balancers.
00062  * <p>
00063  * Read and write queries are load balanced on the backends without any
00064  * replication (assuming that the underlying parallel database takes care of
00065  * data replication). The load balancers provide failover for reads and writes.
00066  * 
00067  * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
00068  * @author <a href="mailto:jaco.swart@iblocks.co.uk">Jaco Swart </a>
00069  * @version 1.0
00070  */
00071 public abstract class ParallelDB extends AbstractLoadBalancer
00072 {
00073   //transaction id -> DatabaseBackend
00074   private Hashtable backendPerTransactionId;
00075 
00076   /**
00077    * Creates a new <code>ParallelDB</code> load balancer with NO_PARSING and a
00078    * SingleDB RAIDb level.
00079    * 
00080    * @param vdb the virtual database this load balancer belongs to.
00081    * @throws SQLException if an error occurs
00082    * @throws NotCompliantMBeanException if the MBean is not JMX compliant
00083    */
00084   public ParallelDB(VirtualDatabase vdb) throws SQLException,
00085       NotCompliantMBeanException
00086   {
00087     super(vdb, RAIDbLevels.SingleDB, ParsingGranularities.NO_PARSING);
00088   }
00089 
00090   /**
00091    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execReadRequest(SelectRequest,
00092    *      MetadataCache)
00093    */
00094   public ControllerResultSet execReadRequest(SelectRequest request,
00095       MetadataCache metadataCache) throws SQLException
00096   {
00097     DatabaseBackend backend;
00098     if (request.isAutoCommit())
00099       backend = chooseBackendForReadRequest(request);
00100     else
00101       backend = (DatabaseBackend) backendPerTransactionId.get(new Long(request
00102           .getTransactionId()));
00103 
00104     if (backend == null)
00105       throw new SQLException(Translate.get(
00106           "loadbalancer.execute.no.backend.found", request.getSQLShortForm(vdb
00107               .getSQLShortFormLength())));
00108 
00109     ControllerResultSet rs = null;
00110     // Execute the request on the chosen backend
00111     try
00112     {
00113       rs = executeReadRequestOnBackend(request, backend, metadataCache);
00114     }
00115     catch (UnreachableBackendException urbe)
00116     {
00117       // Try to execute query on different backend
00118       return execReadRequest(request, metadataCache);
00119     }
00120     catch (SQLException se)
00121     {
00122       String msg = Translate.get("loadbalancer.request.failed", new String[]{
00123           String.valueOf(request.getId()), se.getMessage()});
00124       if (logger.isInfoEnabled())
00125         logger.info(msg);
00126       throw new SQLException(msg);
00127     }
00128     catch (RuntimeException e)
00129     {
00130       String msg = Translate.get("loadbalancer.request.failed.on.backend",
00131           new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00132               backend.getName(), e.getMessage()});
00133       logger.error(msg, e);
00134       throw new SQLException(msg);
00135     }
00136 
00137     return rs;
00138   }
00139 
00140   /**
00141    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execReadOnlyReadStoredProcedure(StoredProcedure,
00142    *      MetadataCache)
00143    */
00144   public ControllerResultSet execReadOnlyReadStoredProcedure(
00145       StoredProcedure proc, MetadataCache metadataCache) throws SQLException
00146   {
00147     return execReadStoredProcedure(proc, metadataCache);
00148   }
00149 
00150   /**
00151    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execReadStoredProcedure(StoredProcedure,
00152    *      MetadataCache)
00153    */
00154   public ControllerResultSet execReadStoredProcedure(StoredProcedure proc,
00155       MetadataCache metadataCache) throws SQLException
00156   {
00157     DatabaseBackend backend;
00158     if (proc.isAutoCommit())
00159       backend = chooseBackendForReadRequest(proc);
00160     else
00161       backend = (DatabaseBackend) backendPerTransactionId.get(new Long(proc
00162           .getTransactionId()));
00163 
00164     if (backend == null)
00165       throw new SQLException(Translate.get(
00166           "loadbalancer.storedprocedure.no.backend.found", proc
00167               .getSQLShortForm(vdb.getSQLShortFormLength())));
00168 
00169     ControllerResultSet rs = null;
00170     // Execute the request on the chosen backend
00171     try
00172     {
00173       rs = executeReadStoredProcedureOnBackend(proc, backend, metadataCache);
00174     }
00175     catch (UnreachableBackendException urbe)
00176     {
00177       // Try to execute query on different backend
00178       return execReadStoredProcedure(proc, metadataCache);
00179     }
00180     catch (SQLException se)
00181     {
00182       String msg = Translate.get("loadbalancer.storedprocedure.failed",
00183           new String[]{String.valueOf(proc.getId()), se.getMessage()});
00184       if (logger.isInfoEnabled())
00185         logger.info(msg);
00186       throw new SQLException(msg);
00187     }
00188     catch (RuntimeException e)
00189     {
00190       String msg = Translate.get(
00191           "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00192               proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00193               backend.getName(), e.getMessage()});
00194       logger.error(msg, e);
00195       throw new SQLException(msg);
00196     }
00197 
00198     return rs;
00199   }
00200 
00201   /**
00202    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execWriteRequest(org.objectweb.cjdbc.common.sql.AbstractWriteRequest)
00203    */
00204   public int execWriteRequest(AbstractWriteRequest request)
00205       throws AllBackendsFailedException, SQLException
00206   {
00207     DatabaseBackend backend;
00208     if (request.isAutoCommit())
00209       backend = chooseBackendForWriteRequest(request);
00210     else
00211       backend = (DatabaseBackend) backendPerTransactionId.get(new Long(request
00212           .getTransactionId()));
00213 
00214     if (backend == null)
00215       throw new SQLException(Translate.get(
00216           "loadbalancer.execute.no.backend.found", request.getSQLShortForm(vdb
00217               .getSQLShortFormLength())));
00218 
00219     int result;
00220     // Execute the request on the chosen backend
00221     try
00222     {
00223       result = executeWriteRequestOnBackend(request, backend);
00224     }
00225     catch (UnreachableBackendException urbe)
00226     {
00227       // Try to execute query on different backend
00228       return execWriteRequest(request);
00229     }
00230     catch (SQLException se)
00231     {
00232       String msg = Translate.get("loadbalancer.request.failed", new String[]{
00233           String.valueOf(request.getId()), se.getMessage()});
00234       if (logger.isInfoEnabled())
00235         logger.info(msg);
00236       throw new SQLException(msg);
00237     }
00238     catch (RuntimeException e)
00239     {
00240       String msg = Translate.get("loadbalancer.request.failed.on.backend",
00241           new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00242               backend.getName(), e.getMessage()});
00243       logger.error(msg, e);
00244       throw new SQLException(msg);
00245     }
00246 
00247     return result;
00248   }
00249 
00250   /**
00251    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execWriteRequestWithKeys(AbstractWriteRequest,
00252    *      MetadataCache)
00253    */
00254   public ControllerResultSet execWriteRequestWithKeys(
00255       AbstractWriteRequest request, MetadataCache metadataCache)
00256       throws AllBackendsFailedException, SQLException
00257   {
00258     DatabaseBackend backend;
00259     if (request.isAutoCommit())
00260       backend = chooseBackendForWriteRequest(request);
00261     else
00262       backend = (DatabaseBackend) backendPerTransactionId.get(new Long(request
00263           .getTransactionId()));
00264 
00265     if (backend == null)
00266       throw new SQLException(Translate.get(
00267           "loadbalancer.execute.no.backend.found", request.getSQLShortForm(vdb
00268               .getSQLShortFormLength())));
00269 
00270     ControllerResultSet rs;
00271     // Execute the request on the chosen backend
00272     try
00273     {
00274       rs = executeWriteRequestWithKeysOnBackend(request, backend, metadataCache);
00275     }
00276     catch (UnreachableBackendException urbe)
00277     {
00278       // Try to execute query on different backend
00279       return execWriteRequestWithKeys(request, metadataCache);
00280     }
00281     catch (SQLException se)
00282     {
00283       String msg = Translate.get("loadbalancer.request.failed", new String[]{
00284           String.valueOf(request.getId()), se.getMessage()});
00285       if (logger.isInfoEnabled())
00286         logger.info(msg);
00287       throw new SQLException(msg);
00288     }
00289     catch (RuntimeException e)
00290     {
00291       String msg = Translate.get("loadbalancer.request.failed.on.backend",
00292           new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00293               backend.getName(), e.getMessage()});
00294       logger.error(msg, e);
00295       throw new SQLException(msg);
00296     }
00297 
00298     return rs;
00299   }
00300 
00301   /**
00302    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#execWriteStoredProcedure(org.objectweb.cjdbc.common.sql.StoredProcedure)
00303    */
00304   public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException
00305   {
00306     DatabaseBackend backend;
00307     if (proc.isAutoCommit())
00308       backend = chooseBackendForReadRequest(proc);
00309     else
00310       backend = (DatabaseBackend) backendPerTransactionId.get(new Long(proc
00311           .getTransactionId()));
00312 
00313     if (backend == null)
00314       throw new SQLException(Translate.get(
00315           "loadbalancer.storedprocedure.no.backend.found", proc
00316               .getSQLShortForm(vdb.getSQLShortFormLength())));
00317 
00318     int result;
00319     // Execute the request on the chosen backend
00320     try
00321     {
00322       result = executeWriteStoredProcedureOnBackend(proc, backend);
00323     }
00324     catch (UnreachableBackendException urbe)
00325     {
00326       // Try to execute query on different backend
00327       return execWriteStoredProcedure(proc);
00328     }
00329     catch (SQLException se)
00330     {
00331       String msg = Translate.get("loadbalancer.storedprocedure.failed",
00332           new String[]{String.valueOf(proc.getId()), se.getMessage()});
00333       if (logger.isInfoEnabled())
00334         logger.info(msg);
00335       throw new SQLException(msg);
00336     }
00337     catch (RuntimeException e)
00338     {
00339       String msg = Translate.get(
00340           "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00341               proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00342               backend.getName(), e.getMessage()});
00343       logger.error(msg, e);
00344       throw new SQLException(msg);
00345     }
00346 
00347     return result;
00348   }
00349 
00350   /**
00351    * Execute a read request on the selected backend.
00352    * 
00353    * @param request the request to execute
00354    * @param backend the backend that will execute the request
00355    * @param metadataCache MetadataCache (null if none)
00356    * @return the ControllerResultSet
00357    * @throws SQLException if an error occurs
00358    */
00359   private ControllerResultSet executeReadRequestOnBackend(
00360       SelectRequest request, DatabaseBackend backend,
00361       MetadataCache metadataCache) throws SQLException,
00362       UnreachableBackendException
00363   {
00364     // Ok, we have a backend, let's execute the request
00365     AbstractConnectionManager cm = backend.getConnectionManager(request
00366         .getLogin());
00367 
00368     // Sanity check
00369     if (cm == null)
00370     {
00371       String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00372           new String[]{request.getLogin(), backend.getName()});
00373       logger.error(msg);
00374       throw new SQLException(msg);
00375     }
00376 
00377     // Execute the query
00378     if (request.isAutoCommit())
00379     {
00380       ControllerResultSet rs = null;
00381       boolean badConnection;
00382       do
00383       {
00384         badConnection = false;
00385         // Use a connection just for this request
00386         Connection c = null;
00387         try
00388         {
00389           c = cm.getConnection();
00390         }
00391         catch (UnreachableBackendException e1)
00392         {
00393           logger.error(Translate.get(
00394               "loadbalancer.backend.disabling.unreachable", backend.getName()));
00395           disableBackend(backend);
00396           throw new UnreachableBackendException(Translate.get(
00397               "loadbalancer.backend.unreacheable", backend.getName()));
00398         }
00399 
00400         // Sanity check
00401         if (c == null)
00402           throw new SQLException(Translate.get(
00403               "loadbalancer.backend.no.connection", backend.getName()));
00404 
00405         // Execute Query
00406         try
00407         {
00408           rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00409           cm.releaseConnection(c);
00410         }
00411         catch (SQLException e)
00412         {
00413           cm.releaseConnection(c);
00414           throw new SQLException(Translate.get(
00415               "loadbalancer.request.failed.on.backend", new String[]{
00416                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00417                   backend.getName(), e.getMessage()}));
00418         }
00419         catch (BadConnectionException e)
00420         { // Get rid of the bad connection
00421           cm.deleteConnection(c);
00422           badConnection = true;
00423         }
00424       }
00425       while (badConnection);
00426       if (logger.isDebugEnabled())
00427         logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00428             String.valueOf(request.getId()), backend.getName()}));
00429       return rs;
00430     }
00431     else
00432     { // Inside a transaction
00433       Connection c;
00434       long tid = request.getTransactionId();
00435       Long lTid = new Long(tid);
00436 
00437       try
00438       {
00439         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00440       }
00441       catch (UnreachableBackendException e1)
00442       {
00443         logger.error(Translate.get(
00444             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00445         disableBackend(backend);
00446         throw new SQLException(Translate.get(
00447             "loadbalancer.backend.unreacheable", backend.getName()));
00448       }
00449       catch (NoTransactionStartWhenDisablingException e)
00450       {
00451         String msg = Translate.get("loadbalancer.backend.is.disabling",
00452             new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00453                 backend.getName()});
00454         logger.error(msg);
00455         throw new SQLException(msg);
00456       }
00457 
00458       // Sanity check
00459       if (c == null)
00460         throw new SQLException(Translate.get(
00461             "loadbalancer.unable.retrieve.connection", new String[]{
00462                 String.valueOf(tid), backend.getName()}));
00463 
00464       // Execute Query
00465       ControllerResultSet rs = null;
00466       try
00467       {
00468         rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00469       }
00470       catch (SQLException e)
00471       {
00472         throw new SQLException(Translate.get(
00473             "loadbalancer.request.failed.on.backend", new String[]{
00474                 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00475                 backend.getName(), e.getMessage()}));
00476       }
00477       catch (BadConnectionException e)
00478       { // Connection failed, so did the transaction
00479         // Disable the backend.
00480         cm.deleteConnection(tid);
00481         String msg = Translate.get(
00482             "loadbalancer.backend.disabling.connection.failure", backend
00483                 .getName());
00484         logger.error(msg);
00485         disableBackend(backend);
00486         throw new SQLException(msg);
00487       }
00488       if (logger.isDebugEnabled())
00489         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00490             new String[]{String.valueOf(tid), String.valueOf(request.getId()),
00491                 backend.getName()}));
00492       return rs;
00493     }
00494   }
00495 
00496   /**
00497    * Execute a stored procedure on the selected backend.
00498    * 
00499    * @param proc the stored procedure to execute
00500    * @param backend the backend that will execute the request
00501    * @param metadataCache MetadataCache (null if none)
00502    * @return the ControllerResultSet
00503    * @throws SQLException if an error occurs
00504    */
00505   private ControllerResultSet executeReadStoredProcedureOnBackend(
00506       StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache)
00507       throws SQLException, UnreachableBackendException
00508   {
00509     // Ok, we have a backend, let's execute the request
00510     AbstractConnectionManager cm = backend
00511         .getConnectionManager(proc.getLogin());
00512 
00513     // Sanity check
00514     if (cm == null)
00515     {
00516       String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00517           new String[]{proc.getLogin(), backend.getName()});
00518       logger.error(msg);
00519       throw new SQLException(msg);
00520     }
00521 
00522     // Execute the query
00523     if (proc.isAutoCommit())
00524     {
00525       // Use a connection just for this request
00526       Connection c = null;
00527       try
00528       {
00529         c = cm.getConnection();
00530       }
00531       catch (UnreachableBackendException e1)
00532       {
00533         logger.error(Translate.get(
00534             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00535         disableBackend(backend);
00536         throw new UnreachableBackendException(Translate.get(
00537             "loadbalancer.backend.unreacheable", backend.getName()));
00538       }
00539 
00540       // Sanity check
00541       if (c == null)
00542         throw new UnreachableBackendException(Translate.get(
00543             "loadbalancer.backend.no.connection", backend.getName()));
00544 
00545       // Execute Query
00546       ControllerResultSet rs = null;
00547       try
00548       {
00549         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00550             backend, c, metadataCache);
00551       }
00552       catch (Exception e)
00553       {
00554         throw new SQLException(Translate.get(
00555             "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00556                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00557                 backend.getName(), e.getMessage()}));
00558       }
00559       finally
00560       {
00561         cm.releaseConnection(c);
00562       }
00563       if (logger.isDebugEnabled())
00564         logger.debug(Translate.get("loadbalancer.storedprocedure.on",
00565             new String[]{String.valueOf(proc.getId()), backend.getName()}));
00566       return rs;
00567     }
00568     else
00569     { // Inside a transaction
00570       Connection c;
00571       long tid = proc.getTransactionId();
00572       Long lTid = new Long(tid);
00573 
00574       try
00575       {
00576         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00577       }
00578       catch (UnreachableBackendException e1)
00579       {
00580         logger.error(Translate.get(
00581             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00582         disableBackend(backend);
00583         throw new SQLException(Translate.get(
00584             "loadbalancer.backend.unreacheable", backend.getName()));
00585       }
00586       catch (NoTransactionStartWhenDisablingException e)
00587       {
00588         String msg = Translate.get("loadbalancer.backend.is.disabling",
00589             new String[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00590                 backend.getName()});
00591         logger.error(msg);
00592         throw new SQLException(msg);
00593       }
00594 
00595       // Sanity check
00596       if (c == null)
00597         throw new SQLException(Translate.get(
00598             "loadbalancer.unable.retrieve.connection", new String[]{
00599                 String.valueOf(tid), backend.getName()}));
00600 
00601       // Execute Query
00602       ControllerResultSet rs;
00603       try
00604       {
00605         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00606             backend, c, metadataCache);
00607       }
00608       catch (Exception e)
00609       {
00610         throw new SQLException(Translate.get(
00611             "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00612                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00613                 backend.getName(), e.getMessage()}));
00614       }
00615       if (logger.isDebugEnabled())
00616         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00617             new String[]{String.valueOf(tid), String.valueOf(proc.getId()),
00618                 backend.getName()}));
00619       return rs;
00620     }
00621   }
00622 
00623   /**
00624    * Execute a write request on the selected backend.
00625    * 
00626    * @param request the request to execute
00627    * @param backend the backend that will execute the request
00628    * @return the number of modified rows
00629    * @throws SQLException if an error occurs
00630    */
00631   private int executeWriteRequestOnBackend(AbstractWriteRequest request,
00632       DatabaseBackend backend) throws SQLException, UnreachableBackendException
00633   {
00634     if (backend == null)
00635       throw new SQLException(Translate.get(
00636           "loadbalancer.execute.no.backend.available", request.getId()));
00637 
00638     try
00639     {
00640       AbstractConnectionManager cm = backend.getConnectionManager(request
00641           .getLogin());
00642       if (request.isAutoCommit())
00643       { // Use a connection just for this request
00644         Connection c = null;
00645         try
00646         {
00647           c = cm.getConnection();
00648         }
00649         catch (UnreachableBackendException e1)
00650         {
00651           String backendName = backend.getName();
00652           logger.error(Translate.get(
00653               "loadbalancer.backend.disabling.unreachable", backendName));
00654           disableBackend(backend);
00655           throw new UnreachableBackendException(Translate.get(
00656               "loadbalancer.backend.unreacheable", backendName));
00657         }
00658 
00659         // Sanity check
00660         if (c == null)
00661           throw new UnreachableBackendException(Translate.get(
00662               "loadbalancer.backend.no.connection", backend.getName()));
00663 
00664         // Execute Query
00665         int result;
00666         try
00667         {
00668           result = executeUpdateRequestOnBackend(request, backend, c);
00669         }
00670         catch (Exception e)
00671         {
00672           throw new SQLException(Translate.get(
00673               "loadbalancer.request.failed.on.backend", new String[]{
00674                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00675                   backend.getName(), e.getMessage()}));
00676         }
00677         finally
00678         {
00679           cm.releaseConnection(c);
00680         }
00681         return result;
00682       }
00683       else
00684       { // Re-use the connection used by this transaction
00685         Connection c = cm.retrieveConnection(request.getTransactionId());
00686 
00687         // Sanity check
00688         if (c == null)
00689           throw new SQLException(Translate.get(
00690               "loadbalancer.unable.retrieve.connection",
00691               new String[]{String.valueOf(request.getTransactionId()),
00692                   backend.getName()}));
00693 
00694         // Execute Query
00695         int result;
00696         try
00697         {
00698           result = executeUpdateRequestOnBackend(request, backend, c);
00699           return result;
00700         }
00701         catch (Exception e)
00702         {
00703           throw new SQLException(Translate.get(
00704               "loadbalancer.request.failed.on.backend", new String[]{
00705                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00706                   backend.getName(), e.getMessage()}));
00707         }
00708       }
00709     }
00710     catch (RuntimeException e)
00711     {
00712       String msg = Translate.get("loadbalancer.request.failed.on.backend",
00713           new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00714               backend.getName(), e.getMessage()});
00715       logger.fatal(msg, e);
00716       throw new SQLException(msg);
00717     }
00718   }
00719 
00720   /**
00721    * Execute a write request on the selected backend and return the
00722    * autogenerated keys.
00723    * 
00724    * @param request the request to execute
00725    * @param backend the backend that will execute the request
00726    * @param metadataCache MetadataCache (null if none)
00727    * @return the ResultSet containing the auto-generated keys
00728    * @throws SQLException if an error occurs
00729    */
00730   private ControllerResultSet executeWriteRequestWithKeysOnBackend(
00731       AbstractWriteRequest request, DatabaseBackend backend,
00732       MetadataCache metadataCache) throws SQLException,
00733       UnreachableBackendException
00734   {
00735     if (backend == null)
00736       throw new SQLException(Translate.get(
00737           "loadbalancer.execute.no.backend.available", request.getId()));
00738 
00739     if (!backend.getDriverCompliance().supportGetGeneratedKeys())
00740       throw new SQLException(Translate.get(
00741           "loadbalancer.backend.autogeneratedkeys.unsupported", backend
00742               .getName()));
00743 
00744     try
00745     {
00746       AbstractConnectionManager cm = backend.getConnectionManager(request
00747           .getLogin());
00748       if (request.isAutoCommit())
00749       { // Use a connection just for this request
00750         Connection c = null;
00751         try
00752         {
00753           c = cm.getConnection();
00754         }
00755         catch (UnreachableBackendException e1)
00756         {
00757           String backendName = backend.getName();
00758           logger.error(Translate.get(
00759               "loadbalancer.backend.disabling.unreachable", backendName));
00760           disableBackend(backend);
00761           throw new UnreachableBackendException(Translate.get(
00762               "loadbalancer.backend.unreacheable", backendName));
00763         }
00764 
00765         // Sanity check
00766         if (c == null)
00767           throw new UnreachableBackendException(Translate.get(
00768               "loadbalancer.backend.no.connection", backend.getName()));
00769 
00770         // Execute Query
00771         ControllerResultSet result;
00772         try
00773         {
00774           result = executeUpdateRequestOnBackendWithKeys(request, backend, c,
00775               metadataCache);
00776         }
00777         catch (Exception e)
00778         {
00779           throw new SQLException(Translate.get(
00780               "loadbalancer.request.failed.on.backend", new String[]{
00781                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00782                   backend.getName(), e.getMessage()}));
00783         }
00784         finally
00785         {
00786           cm.releaseConnection(c);
00787         }
00788         return result;
00789       }
00790       else
00791       { // Re-use the connection used by this transaction
00792         Connection c = cm.retrieveConnection(request.getTransactionId());
00793 
00794         // Sanity check
00795         if (c == null)
00796           throw new SQLException(Translate.get(
00797               "loadbalancer.unable.retrieve.connection",
00798               new String[]{String.valueOf(request.getTransactionId()),
00799                   backend.getName()}));
00800 
00801         // Execute Query
00802         try
00803         {
00804           return executeUpdateRequestOnBackendWithKeys(request, backend, c,
00805               metadataCache);
00806         }
00807         catch (Exception e)
00808         {
00809           throw new SQLException(Translate.get(
00810               "loadbalancer.request.failed.on.backend", new String[]{
00811                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00812                   backend.getName(), e.getMessage()}));
00813         }
00814       }
00815     }
00816     catch (RuntimeException e)
00817     {
00818       String msg = Translate
00819           .get("loadbalancer.request.failed", new String[]{
00820               request.getSQLShortForm(vdb.getSQLShortFormLength()),
00821               e.getMessage()});
00822       logger.fatal(msg, e);
00823       throw new SQLException(msg);
00824     }
00825   }
00826 
00827   /**
00828    * Execute a stored procedure on the selected backend.
00829    * 
00830    * @param proc the stored procedure to execute
00831    * @param backend the backend that will execute the request
00832    * @return the ResultSet
00833    * @throws SQLException if an error occurs
00834    */
00835   private int executeWriteStoredProcedureOnBackend(StoredProcedure proc,
00836       DatabaseBackend backend) throws SQLException, UnreachableBackendException
00837   {
00838     // Ok, we have a backend, let's execute the request
00839     AbstractConnectionManager cm = backend
00840         .getConnectionManager(proc.getLogin());
00841 
00842     // Sanity check
00843     if (cm == null)
00844     {
00845       String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00846           new String[]{proc.getLogin(), backend.getName()});
00847       logger.error(msg);
00848       throw new SQLException(msg);
00849     }
00850 
00851     // Execute the query
00852     if (proc.isAutoCommit())
00853     {
00854       // Use a connection just for this request
00855       Connection c = null;
00856       try
00857       {
00858         c = cm.getConnection();
00859       }
00860       catch (UnreachableBackendException e1)
00861       {
00862         logger.error(Translate.get(
00863             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00864         disableBackend(backend);
00865         throw new UnreachableBackendException(Translate.get(
00866             "loadbalancer.backend.unreacheable", backend.getName()));
00867       }
00868 
00869       // Sanity check
00870       if (c == null)
00871         throw new UnreachableBackendException(Translate.get(
00872             "loadbalancer.backend.no.connection", backend.getName()));
00873 
00874       // Execute Query
00875       int result;
00876       try
00877       {
00878         result = AbstractLoadBalancer.executeWriteStoredProcedureOnBackend(
00879             proc, backend, c);
00880 
00881         // Warning! No way to detect if schema has been modified unless
00882         // we ask the backend again using DatabaseMetaData.getTables().
00883       }
00884       catch (Exception e)
00885       {
00886         throw new SQLException(Translate.get(
00887             "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00888                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00889                 backend.getName(), e.getMessage()}));
00890       }
00891       finally
00892       {
00893         cm.releaseConnection(c);
00894       }
00895       if (logger.isDebugEnabled())
00896         logger.debug(Translate.get("loadbalancer.storedprocedure.on",
00897             new String[]{String.valueOf(proc.getId()), backend.getName()}));
00898       return result;
00899     }
00900     else
00901     { // Inside a transaction
00902       Connection c;
00903       long tid = proc.getTransactionId();
00904       Long lTid = new Long(tid);
00905 
00906       try
00907       {
00908         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00909       }
00910       catch (UnreachableBackendException e1)
00911       {
00912         logger.error(Translate.get(
00913             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00914         disableBackend(backend);
00915         throw new SQLException(Translate.get(
00916             "loadbalancer.backend.unreacheable", backend.getName()));
00917       }
00918       catch (NoTransactionStartWhenDisablingException e)
00919       {
00920         String msg = Translate.get("loadbalancer.backend.is.disabling",
00921             new String[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00922                 backend.getName()});
00923         logger.error(msg);
00924         throw new SQLException(msg);
00925       }
00926 
00927       // Sanity check
00928       if (c == null)
00929         throw new SQLException(Translate.get(
00930             "loadbalancer.unable.retrieve.connection", new String[]{
00931                 String.valueOf(tid), backend.getName()}));
00932 
00933       // Execute Query
00934       int result;
00935       try
00936       {
00937         result = AbstractLoadBalancer.executeWriteStoredProcedureOnBackend(
00938             proc, backend, c);
00939 
00940         // Warning! No way to detect if schema has been modified unless
00941         // we ask the backend again using DatabaseMetaData.getTables().
00942       }
00943       catch (Exception e)
00944       {
00945         throw new SQLException(Translate.get(
00946             "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00947                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00948                 backend.getName(), e.getMessage()}));
00949       }
00950       if (logger.isDebugEnabled())
00951         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00952             new String[]{String.valueOf(tid), String.valueOf(proc.getId()),
00953                 backend.getName()}));
00954       return result;
00955     }
00956   }
00957 
00958   //
00959   // Transaction Management
00960   //
00961 
00962   /**
00963    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getInformation()
00964    */
00965   /**
00966    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#begin(org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData)
00967    */
00968   public void begin(TransactionMarkerMetaData tm) throws SQLException
00969   {
00970     Long lTid = new Long(tm.getTransactionId());
00971     if (backendPerTransactionId.containsKey(lTid))
00972       throw new SQLException(Translate.get(
00973           "loadbalancer.transaction.already.started", lTid.toString()));
00974 
00975     DatabaseBackend backend = chooseBackendForReadRequest(new UnknownRequest(
00976         "begin", false, 0, "\n"));
00977     backendPerTransactionId.put(lTid, backend);
00978     backend.startTransaction(lTid);
00979   }
00980 
00981   /**
00982    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#commit(org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData)
00983    */
00984   public void commit(TransactionMarkerMetaData tm) throws SQLException
00985   {
00986     long tid = tm.getTransactionId();
00987     Long lTid = new Long(tid);
00988     DatabaseBackend db = (DatabaseBackend) backendPerTransactionId.remove(lTid);
00989 
00990     AbstractConnectionManager cm = db.getConnectionManager(tm.getLogin());
00991     Connection c = cm.retrieveConnection(tid);
00992 
00993     // Sanity check
00994     if (c == null)
00995     { // Bad connection
00996       db.stopTransaction(lTid);
00997 
00998       throw new SQLException(Translate.get(
00999           "loadbalancer.unable.retrieve.connection", new String[]{
01000               String.valueOf(tid), db.getName()}));
01001     }
01002 
01003     // Execute Query
01004     try
01005     {
01006       c.commit();
01007       c.setAutoCommit(true);
01008     }
01009     catch (Exception e)
01010     {
01011       String msg = Translate.get("loadbalancer.commit.failed", new String[]{
01012           String.valueOf(tid), db.getName(), e.getMessage()});
01013       logger.error(msg);
01014       throw new SQLException(msg);
01015     }
01016     finally
01017     {
01018       cm.releaseConnection(tid);
01019       db.stopTransaction(lTid);
01020     }
01021   }
01022 
01023   /**
01024    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#rollback(org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData)
01025    */
01026   public void rollback(TransactionMarkerMetaData tm) throws SQLException
01027   {
01028     long tid = tm.getTransactionId();
01029     Long lTid = new Long(tid);
01030     DatabaseBackend db = (DatabaseBackend) backendPerTransactionId.remove(lTid);
01031 
01032     AbstractConnectionManager cm = db.getConnectionManager(tm.getLogin());
01033     Connection c = cm.retrieveConnection(tid);
01034 
01035     // Sanity check
01036     if (c == null)
01037     { // Bad connection
01038       db.stopTransaction(lTid);
01039 
01040       throw new SQLException(Translate.get(
01041           "loadbalancer.unable.retrieve.connection", new String[]{
01042               String.valueOf(tid), db.getName()}));
01043     }
01044 
01045     // Execute Query
01046     try
01047     {
01048       c.rollback();
01049       c.setAutoCommit(true);
01050     }
01051     catch (Exception e)
01052     {
01053       String msg = Translate.get("loadbalancer.rollback.failed", new String[]{
01054           String.valueOf(tid), db.getName(), e.getMessage()});
01055       logger.error(msg);
01056       throw new SQLException(msg);
01057     }
01058     finally
01059     {
01060       cm.releaseConnection(tid);
01061       db.stopTransaction(lTid);
01062     }
01063   }
01064 
01065   /**
01066    * Enables a backend that was previously disabled. Asks the corresponding
01067    * connection manager to initialize the connections if needed.
01068    * <p>
01069    * No sanity checks are performed by this function.
01070    * 
01071    * @param db the database backend to enable
01072    * @param writeEnabled True if the backend must be enabled for writes
01073    * @throws SQLException if an error occurs
01074    */
01075   public void enableBackend(DatabaseBackend db, boolean writeEnabled)
01076       throws SQLException
01077   {
01078     logger.info(Translate.get("loadbalancer.backend.enabling", db.getName()));
01079     if (!db.isInitialized())
01080       db.initializeConnections();
01081     db.enableRead();
01082     if (writeEnabled)
01083       db.enableWrite();
01084   }
01085 
01086   /**
01087    * Disables a backend that was previously enabled. Asks the corresponding
01088    * connection manager to finalize the connections if needed.
01089    * <p>
01090    * No sanity checks are performed by this function.
01091    * 
01092    * @param db the database backend to disable
01093    * @throws SQLException if an error occurs
01094    */
01095   public void disableBackend(DatabaseBackend db) throws SQLException
01096   {
01097     logger.info(Translate.get("loadbalancer.backend.disabling", db.getName()));
01098     db.disable();
01099     if (db.isInitialized())
01100       db.finalizeConnections();
01101   }
01102 
01103   /**
01104    * @see org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer#getXmlImpl
01105    */
01106   public String getXmlImpl()
01107   {
01108     StringBuffer info = new StringBuffer();
01109     info.append("<" + DatabasesXmlTags.ELT_ParallelDB + ">");
01110     info.append(getParallelDBXml());
01111     info.append("</" + DatabasesXmlTags.ELT_ParallelDB + ">");
01112     return info.toString();
01113   }
01114 
01115   /**
01116    * Return the XML tags of the ParallelDB load balancer implementation.
01117    * 
01118    * @return content of ParallelDB xml
01119    */
01120   public abstract String getParallelDBXml();
01121 
01122   /**
01123    * Choose a backend using the implementation specific load balancing algorithm
01124    * for read request execution.
01125    * 
01126    * @param request request to execute
01127    * @return the chosen backend
01128    * @throws SQLException if an error occurs
01129    */
01130   public abstract DatabaseBackend chooseBackendForReadRequest(
01131       AbstractRequest request) throws SQLException;
01132 
01133   /**
01134    * Choose a backend using the implementation specific load balancing algorithm
01135    * for write request execution.
01136    * 
01137    * @param request request to execute
01138    * @return the chosen backend
01139    * @throws SQLException if an error occurs
01140    */
01141   public abstract DatabaseBackend chooseBackendForWriteRequest(
01142       AbstractWriteRequest request) throws SQLException;
01143 
01144 }

Generated on Mon Apr 11 22:01:33 2005 for C-JDBC by  doxygen 1.3.9.1