Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

DistributedRequestManager.java

00001 /**
00002  * C-JDBC: Clustered JDBC.
00003  * Copyright (C) 2002-2005 French National Institute For Research In Computer
00004  * Science And Control (INRIA).
00005  * Contact: c-jdbc@objectweb.org
00006  * 
00007  * This library is free software; you can redistribute it and/or modify it
00008  * under the terms of the GNU Lesser General Public License as published by the
00009  * Free Software Foundation; either version 2.1 of the License, or any later
00010  * version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but WITHOUT
00013  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00014  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
00015  * for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public License
00018  * along with this library; if not, write to the Free Software Foundation,
00019  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
00020  *
00021  * Initial developer(s): Emmanuel Cecchet.
00022  * Contributor(s): ______________________.
00023  */
00024 
00025 package org.objectweb.cjdbc.controller.requestmanager.distributed;
00026 
00027 import java.sql.SQLException;
00028 import java.util.ArrayList;
00029 import java.util.Vector;
00030 
00031 import javax.management.NotCompliantMBeanException;
00032 
00033 import org.objectweb.cjdbc.common.exceptions.NoMoreBackendException;
00034 import org.objectweb.cjdbc.common.exceptions.VirtualDatabaseException;
00035 import org.objectweb.cjdbc.common.i18n.Translate;
00036 import org.objectweb.cjdbc.common.log.Trace;
00037 import org.objectweb.cjdbc.common.shared.BackendInfo;
00038 import org.objectweb.cjdbc.common.sql.AbstractRequest;
00039 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00040 import org.objectweb.cjdbc.common.sql.SelectRequest;
00041 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00042 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00043 import org.objectweb.cjdbc.controller.cache.result.AbstractResultCache;
00044 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00045 import org.objectweb.cjdbc.controller.recoverylog.AbstractRecoveryLog;
00046 import org.objectweb.cjdbc.controller.requestmanager.RequestManager;
00047 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00048 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00049 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00050 import org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase;
00051 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00052 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CJDBCGroupMessage;
00053 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.EnableBackend;
00054 import org.objectweb.tribe.adapters.MulticastRequestAdapter;
00055 
00056 /**
00057  * This class defines a Distributed Request Manager.
00058  * <p>
00059  * The DRM is composed of a Request Scheduler, an optional Query Cache, and a
00060  * Load Balancer and an optional Recovery Log. Unlike a non-dsitributed Request
00061  * Manager, this implementation is responsible for synchronizing the different
00062  * controllers components (schedulers, ...). Functions that are RAIDb level
00063  * dependent are implemented in sub-classes.
00064  * 
00065  * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
00066  * @version 1.0
00067  */
00068 public abstract class DistributedRequestManager extends RequestManager
00069 {
00070   protected DistributedVirtualDatabase dvdb;
00071   /** List of queries that failed on all backends */
00072   private Vector                       failedOnAllBackends;
00073   /** Unique controller identifier */
00074   private long                         controllerId;
00075   /** List of transactions that have executed at least one write query */
00076   private ArrayList                    writeTransactions;
00077 
00078   /**
00079    * Builds a new <code>DistributedRequestManager</code> instance without
00080    * cache.
00081    * 
00082    * @param vdb the virtual database this request manager belongs to
00083    * @param scheduler the Request Scheduler to use
00084    * @param cache a Query Cache implementation
00085    * @param loadBalancer the Request Load Balancer to use
00086    * @param recoveryLog the Log Recovery to use
00087    * @param beginTimeout timeout in seconds for begin
00088    * @param commitTimeout timeout in seconds for commit
00089    * @param rollbackTimeout timeout in seconds for rollback
00090    * @throws SQLException if an error occurs
00091    * @throws NotCompliantMBeanException if this class is not a compliant JMX
00092    *           MBean
00093    */
00094   public DistributedRequestManager(DistributedVirtualDatabase vdb,
00095       AbstractScheduler scheduler, AbstractResultCache cache,
00096       AbstractLoadBalancer loadBalancer, AbstractRecoveryLog recoveryLog,
00097       long beginTimeout, long commitTimeout, long rollbackTimeout)
00098       throws SQLException, NotCompliantMBeanException
00099   {
00100     super(vdb, scheduler, cache, loadBalancer, recoveryLog, beginTimeout,
00101         commitTimeout, rollbackTimeout);
00102     dvdb = vdb;
00103     failedOnAllBackends = new Vector();
00104     writeTransactions = new ArrayList();
00105   }
00106 
00107   /**
00108    * Returns the unique controller identifier.
00109    * 
00110    * @return Returns the controllerId.
00111    */
00112   public long getControllerId()
00113   {
00114     return controllerId;
00115   }
00116 
00117   /**
00118    * Get the trace logger of this DistributedRequestManager
00119    * 
00120    * @return a <code>Trace</code> object
00121    */
00122   public Trace getLogger()
00123   {
00124     return logger;
00125   }
00126 
00127   /**
00128    * Returns the vdb value.
00129    * 
00130    * @return Returns the vdb.
00131    */
00132   public VirtualDatabase getVirtualDatabase()
00133   {
00134     return dvdb;
00135   }
00136 
00137   //
00138   // Database Backends management
00139   //
00140 
00141   /**
00142    * Enable a backend that has been previously added to this virtual database
00143    * and that is in the disabled state. We check we the other controllers if
00144    * this backend must be enabled in read-only or read-write. The current policy
00145    * is that the first one to enable this backend will have read-write access to
00146    * it and others will be in read-only.
00147    * 
00148    * @param db The database backend to enable
00149    * @throws SQLException if an error occurs
00150    */
00151   public void enableBackend(DatabaseBackend db) throws SQLException
00152   {
00153     int size = dvdb.getAllMemberButUs().size();
00154     if (size > 0)
00155     {
00156       logger.debug(Translate
00157           .get("virtualdatabase.distributed.enable.backend.check"));
00158 
00159       try
00160       {
00161         // Notify other controllers that we enable this backend.
00162         // No answer is expected.
00163         dvdb.getMulticastRequestAdapter().multicastMessage(
00164             dvdb.getAllMemberButUs(), new EnableBackend(new BackendInfo(db)),
00165             MulticastRequestAdapter.WAIT_NONE,
00166             CJDBCGroupMessage.defaultCastTimeOut);
00167       }
00168       catch (Exception e)
00169       {
00170         String msg = "Error while enabling backend " + db.getName();
00171         logger.error(msg, e);
00172         throw new SQLException(msg + "(" + e + ")");
00173       }
00174     }
00175 
00176     loadBalancer.enableBackend(db, true);
00177     vdb.logger.info("Database backend " + db.getName() + " enabled");
00178   }
00179 
00180   /**
00181    * Add a request that failed on all backends.
00182    * 
00183    * @param request the request that failed
00184    */
00185   public void addFailedOnAllBackends(AbstractRequest request)
00186   {
00187     failedOnAllBackends.add(request);
00188   }
00189 
00190   /**
00191    * Notify completion of a request that failed on all backends. If completion
00192    * was successful, all local backends are disabled.
00193    * 
00194    * @param request request that completed
00195    * @param success true if completion is successful
00196    */
00197   public void completeFailedOnAllBackends(AbstractRequest request,
00198       boolean success)
00199   {
00200     if (!failedOnAllBackends.remove(request))
00201     {
00202       logger.warn("Unable to find request "
00203           + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00204           + " in list of requests that failed on all backends.");
00205       return;
00206     }
00207     if (success)
00208     { // We have to invalidate all backends
00209       logger
00210           .error("Request "
00211               + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00212               + " failed on all local backends but succeeded on other controllers. Disabling all local backends.");
00213       try
00214       {
00215         dvdb.disableAllBackends();
00216       }
00217       catch (VirtualDatabaseException e)
00218       {
00219         logger.error("An error occured while disabling all backends", e);
00220       }
00221     }
00222     else
00223       // Notify scheduler now, the notification was postponed in
00224       // ExecWriteRequest or ExecWriteRequestWithKeys
00225       scheduler.notifyWriteCompleted((AbstractWriteRequest) request);
00226   }
00227 
00228   //
00229   // Transaction management
00230   //
00231 
00232   /**
00233    * Sets the controller identifier value (this id must be unique).
00234    * 
00235    * @param id The controllerId to set.
00236    */
00237   public void setControllerId(int id)
00238   {
00239     if (id > 0xffff)
00240     {
00241       String msg = "Out of range controller id (" + id + ")";
00242       logger.error(msg);
00243       throw new RuntimeException(msg);
00244     }
00245     this.controllerId = (0x000000000000ffff & id) << 12;
00246   }
00247 
00248   /**
00249    * @see org.objectweb.cjdbc.controller.requestmanager.RequestManager#begin(java.lang.String)
00250    */
00251   public long begin(String login) throws SQLException
00252   {
00253     try
00254     {
00255       TransactionMarkerMetaData tm = new TransactionMarkerMetaData(0,
00256           beginTimeout, login);
00257 
00258       // Wait for the scheduler to give us the authorization to execute
00259       long tid = scheduler.begin(tm);
00260       // 2 first bytes are used for controller id
00261       // 6 right-most bytes are used for transaction id
00262       tid = tid & 0x0000ffffffffffffL;
00263       tid = tid | controllerId;
00264       tm.setTransactionId(tid);
00265 
00266       if (logger.isDebugEnabled())
00267         logger.debug(Translate.get("transaction.begin", String.valueOf(tid)));
00268 
00269       try
00270       {
00271         // Send to load balancer
00272         loadBalancer.begin(tm);
00273 
00274         // Log the begin
00275         if (recoveryLog != null)
00276         {
00277           recoveryLog.begin(tm);
00278         }
00279       }
00280       catch (SQLException e)
00281       {
00282         throw e;
00283       }
00284       finally
00285       {
00286         // Notify scheduler for completion in any case
00287         scheduler.beginCompleted(tid);
00288       }
00289 
00290       tidLoginTable.put(new Long(tid), tm);
00291       return tid;
00292     }
00293     catch (RuntimeException e)
00294     {
00295       logger.fatal(Translate.get(
00296           "fatal.runtime.exception.requestmanager.begin", e));
00297       throw new SQLException(e.getMessage());
00298     }
00299   }
00300 
00301   /**
00302    * @see org.objectweb.cjdbc.controller.requestmanager.RequestManager#commit(long)
00303    */
00304   public void commit(long transactionId) throws SQLException
00305   {
00306     Long lTid = new Long(transactionId);
00307     boolean isAWriteTransaction;
00308     synchronized (writeTransactions)
00309     {
00310       isAWriteTransaction = writeTransactions.remove(lTid);
00311     }
00312     if (isAWriteTransaction)
00313       distributedCommit(transactionId);
00314     else
00315       // read-only transaction, it is local
00316       super.commit(transactionId);
00317   }
00318 
00319   /**
00320    * @see org.objectweb.cjdbc.controller.requestmanager.RequestManager#rollback(long)
00321    */
00322   public void rollback(long transactionId) throws SQLException
00323   {
00324     Long lTid = new Long(transactionId);
00325     boolean isAWriteTransaction;
00326     synchronized (writeTransactions)
00327     {
00328       isAWriteTransaction = writeTransactions.remove(lTid);
00329     }
00330     if (isAWriteTransaction)
00331       distributedRollback(transactionId);
00332     else
00333       // read-only transaction, it is local
00334       super.rollback(transactionId);
00335   }
00336 
00337   /**
00338    * @see org.objectweb.cjdbc.controller.requestmanager.RequestManager#scheduleExecWriteRequest(org.objectweb.cjdbc.common.sql.AbstractWriteRequest)
00339    */
00340   public void scheduleExecWriteRequest(AbstractWriteRequest request)
00341       throws SQLException
00342   {
00343     lazyTransactionStart(request);
00344     super.scheduleExecWriteRequest(request);
00345   }
00346 
00347   /**
00348    * Check if the transaction corresponding to the given query has been started
00349    * remotely and start the transaction locally in a lazy manner if needed.
00350    * 
00351    * @param request query to execute
00352    * @throws SQLException if an error occurs
00353    */
00354   public void lazyTransactionStart(AbstractRequest request) throws SQLException
00355   {
00356     // Check if this is a remotely started transaction that we need to lazyly
00357     // start locally
00358     if (!request.isAutoCommit())
00359     {
00360       long tid = request.getTransactionId();
00361       if ((tid & 0x0000ffffffffffffL) != controllerId)
00362       { // Remote transaction, check that it is started
00363         if (!tidLoginTable.containsKey(new Long(tid)))
00364         { // Begin this transaction
00365           try
00366           {
00367             TransactionMarkerMetaData tm = new TransactionMarkerMetaData(0,
00368                 beginTimeout, request.getLogin());
00369             tm.setTransactionId(tid);
00370 
00371             if (logger.isDebugEnabled())
00372               logger.debug(Translate.get("transaction.begin.lazy", String
00373                   .valueOf(tid)));
00374 
00375             try
00376             {
00377               scheduler.begin(tm);
00378 
00379               // Send to load balancer
00380               loadBalancer.begin(tm);
00381 
00382               // Log the begin
00383               if (recoveryLog != null)
00384               {
00385                 recoveryLog.begin(tm);
00386               }
00387             }
00388             catch (SQLException e)
00389             {
00390               throw e;
00391             }
00392             finally
00393             {
00394               // Notify scheduler for completion in any case
00395               scheduler.beginCompleted(tid);
00396             }
00397 
00398             tidLoginTable.put(new Long(tid), tm);
00399           }
00400           catch (RuntimeException e)
00401           {
00402             logger.fatal(Translate.get(
00403                 "fatal.runtime.exception.requestmanager.begin", e));
00404             throw new SQLException(e.getMessage());
00405           }
00406         }
00407       }
00408     }
00409   }
00410 
00411   /**
00412    * @see org.objectweb.cjdbc.controller.requestmanager.RequestManager#execReadRequest(org.objectweb.cjdbc.common.sql.SelectRequest)
00413    */
00414   public ControllerResultSet execReadRequest(SelectRequest request)
00415       throws SQLException
00416   {
00417     try
00418     {
00419       return super.execReadRequest(request);
00420     }
00421     catch (SQLException e)
00422     {
00423       if (!(e instanceof NoMoreBackendException))
00424         throw e;
00425     }
00426     // At this point, we don't have any backend available to execute this query
00427     // on this controller. Let's forward the query to another controller.
00428     return execRemoteReadRequest(controllerId, request);
00429   }
00430 
00431   /**
00432    * Execute a read request on a remote controller because the local controller
00433    * don't have any backend available to execute it (usually it already failed
00434    * locally).
00435    * 
00436    * @param controllerUniqueId unique identifier of the controller that receives
00437    *          this query from the client
00438    * @param request the request to execute
00439    * @return the query ResultSet
00440    */
00441   public abstract ControllerResultSet execRemoteReadRequest(
00442       long controllerUniqueId, SelectRequest request) throws SQLException;
00443 
00444   /**
00445    * @see org.objectweb.cjdbc.controller.requestmanager.RequestManager#execWriteRequest(org.objectweb.cjdbc.common.sql.AbstractWriteRequest)
00446    */
00447   public int execWriteRequest(AbstractWriteRequest request) throws SQLException
00448   {
00449     if (!request.isAutoCommit())
00450     { // Add this transaction to the list of write transactions
00451       Long lTid = new Long(request.getTransactionId());
00452       synchronized (writeTransactions)
00453       {
00454         if (!writeTransactions.contains(lTid))
00455           writeTransactions.add(lTid);
00456       }
00457     }
00458     return execDistributedWriteRequest(request);
00459   }
00460 
00461   /**
00462    * @see org.objectweb.cjdbc.controller.requestmanager.RequestManager#execWriteRequestWithKeys(org.objectweb.cjdbc.common.sql.AbstractWriteRequest)
00463    */
00464   public ControllerResultSet execWriteRequestWithKeys(
00465       AbstractWriteRequest request) throws SQLException
00466   {
00467     if (!request.isAutoCommit())
00468     { // Add this transaction to the list of write transactions
00469       Long lTid = new Long(request.getTransactionId());
00470       synchronized (writeTransactions)
00471       {
00472         if (!writeTransactions.contains(lTid))
00473           writeTransactions.add(lTid);
00474       }
00475     }
00476     return execDistributedWriteRequestWithKeys(request);
00477   }
00478 
00479   /**
00480    * @see org.objectweb.cjdbc.controller.requestmanager.RequestManager#execReadStoredProcedure(StoredProcedure)
00481    */
00482   public ControllerResultSet execReadStoredProcedure(StoredProcedure proc)
00483       throws SQLException
00484   {
00485     // If connection is read-only, we don't broadcast
00486     if (proc.isReadOnly())
00487       return execDistributedReadStoredProcedureLocally(proc);
00488 
00489     if (!proc.isAutoCommit())
00490     { // Add this transaction to the list of write transactions
00491       Long lTid = new Long(proc.getTransactionId());
00492       synchronized (writeTransactions)
00493       {
00494         if (!writeTransactions.contains(lTid))
00495           writeTransactions.add(lTid);
00496       }
00497     }
00498     return execDistributedReadStoredProcedure(proc);
00499   }
00500 
00501   /**
00502    * @see org.objectweb.cjdbc.controller.requestmanager.RequestManager#execWriteStoredProcedure(org.objectweb.cjdbc.common.sql.StoredProcedure)
00503    */
00504   public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException
00505   {
00506     if (!proc.isAutoCommit())
00507     { // Add this transaction to the list of write transactions
00508       Long lTid = new Long(proc.getTransactionId());
00509       synchronized (writeTransactions)
00510       {
00511         if (!writeTransactions.contains(lTid))
00512           writeTransactions.add(lTid);
00513       }
00514     }
00515     return execDistributedWriteStoredProcedure(proc);
00516   }
00517 
00518   //
00519   // RAIDb level specific methods
00520   //
00521 
00522   /**
00523    * Distributed implementation of a commit
00524    * 
00525    * @param transactionId id of the commiting transaction
00526    * @throws SQLException if an error occurs
00527    */
00528   public abstract void distributedCommit(long transactionId)
00529       throws SQLException;
00530 
00531   /**
00532    * Distributed implementation of a rollback
00533    * 
00534    * @param transactionId id of the rollbacking transaction
00535    * @throws SQLException if an error occurs
00536    */
00537   public abstract void distributedRollback(long transactionId)
00538       throws SQLException;
00539 
00540   /**
00541    * Distributed implementation of a write request execution.
00542    * 
00543    * @param request request to execute
00544    * @return number of modified rows
00545    * @throws SQLException if an error occurs
00546    */
00547   public abstract int execDistributedWriteRequest(AbstractWriteRequest request)
00548       throws SQLException;
00549 
00550   /**
00551    * Distributed implementation of a write request execution that returns
00552    * auto-generated keys.
00553    * 
00554    * @param request request to execute
00555    * @return ResultSet containing the auto-generated keys.
00556    * @throws SQLException if an error occurs
00557    */
00558   public abstract ControllerResultSet execDistributedWriteRequestWithKeys(
00559       AbstractWriteRequest request) throws SQLException;
00560 
00561   /**
00562    * Distributed implementation of a read stored procedure execution.
00563    * 
00564    * @param proc stored procedure to execute
00565    * @return ResultSet corresponding to this stored procedure execution
00566    * @throws SQLException if an error occurs
00567    */
00568   public abstract ControllerResultSet execDistributedReadStoredProcedure(
00569       StoredProcedure proc) throws SQLException;
00570 
00571   /**
00572    * Distributed implementation of a write stored procedure execution.
00573    * 
00574    * @param proc stored procedure to execute
00575    * @return number of modified rows
00576    * @throws SQLException if an error occurs
00577    */
00578   public abstract int execDistributedWriteStoredProcedure(StoredProcedure proc)
00579       throws SQLException;
00580 
00581   /**
00582    * Once the request has been dispatched, it can be executed using the code
00583    * from <code>RequestManager</code>
00584    * 
00585    * @param proc stored procedure to execute
00586    * @return ResultSet corresponding to this stored procedure execution
00587    * @throws SQLException if an error occurs
00588    */
00589   public ControllerResultSet execDistributedReadStoredProcedureLocally(
00590       StoredProcedure proc) throws SQLException
00591   {
00592     return super.execReadStoredProcedure(proc);
00593   }
00594 
00595   /**
00596    * Once the request has been dispatched, it can be executed using the code
00597    * from <code>RequestManager</code>
00598    * 
00599    * @param proc stored procedure to execute
00600    * @return number of modified rows
00601    * @throws SQLException if an error occurs
00602    */
00603   public int execDistributedWriteStoredProcedureLocally(StoredProcedure proc)
00604       throws SQLException
00605   {
00606     return super.execWriteStoredProcedure(proc);
00607   }
00608 
00609 }

Generated on Mon Apr 11 22:01:31 2005 for C-JDBC by  doxygen 1.3.9.1