Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

RAIDb1OptimisticTransactionLevelScheduler.java

00001 /**
00002  * C-JDBC: Clustered JDBC.
00003  * Copyright (C) 2002-2004 French National Institute For Research In Computer
00004  * Science And Control (INRIA).
00005  * Contact: c-jdbc@objectweb.org
00006  * 
00007  * This library is free software; you can redistribute it and/or modify it
00008  * under the terms of the GNU Lesser General Public License as published by the
00009  * Free Software Foundation; either version 2.1 of the License, or any later
00010  * version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but WITHOUT
00013  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00014  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
00015  * for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public License
00018  * along with this library; if not, write to the Free Software Foundation,
00019  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
00020  *
00021  * Initial developer(s): Emmanuel Cecchet.
00022  * Contributor(s): _________________________.
00023  */
00024 
00025 package org.objectweb.cjdbc.controller.scheduler.raidb1;
00026 
00027 import java.sql.SQLException;
00028 import java.util.ArrayList;
00029 
00030 import org.objectweb.cjdbc.common.exceptions.RollbackException;
00031 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00032 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00033 import org.objectweb.cjdbc.common.sql.SelectRequest;
00034 import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema;
00035 import org.objectweb.cjdbc.common.sql.schema.DatabaseTable;
00036 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00037 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00038 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00039 import org.objectweb.cjdbc.controller.scheduler.schema.SchedulerDatabaseSchema;
00040 import org.objectweb.cjdbc.controller.scheduler.schema.SchedulerDatabaseTable;
00041 import org.objectweb.cjdbc.controller.scheduler.schema.TransactionExclusiveLock;
00042 
00043 /**
00044  * This scheduler provides transaction level scheduling for RAIDb-1
00045  * controllers. Each write takes a lock on the table it affects. All following
00046  * writes are blocked until the transaction of the first write completes. This
00047  * scheduler automatically detects simple deadlocks and rollbacks the
00048  * transaction inducing the deadlock. Note that transitive deadlocks (involving
00049  * more than 2 tables are not detected).
00050  * 
00051  * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
00052  * @version 1.0
00053  */
00054 public class RAIDb1OptimisticTransactionLevelScheduler
00055     extends
00056       AbstractScheduler
00057 {
00058 
00059   //
00060   // How the code is organized ?
00061   //
00062   // 1. Member variables
00063   // 2. Constructor
00064   // 3. Request handling
00065   // 4. Transaction management
00066   // 5. Debug/Monitoring
00067   //
00068 
00069   private long                    requestId;
00070   private SchedulerDatabaseSchema schedulerDatabaseSchema = null;
00071 
00072   //
00073   // Constructor
00074   //
00075 
00076   /**
00077    * Creates a new Optimistic Transaction Level Scheduler
00078    */
00079   public RAIDb1OptimisticTransactionLevelScheduler()
00080   {
00081     super(RAIDbLevels.RAIDb1, ParsingGranularities.TABLE);
00082     requestId = 0;
00083   }
00084 
00085   //
00086   // Request Handling
00087   //
00088 
00089   /**
00090    * Sets the <code>DatabaseSchema</code> of the current virtual database.
00091    * This is only needed by some schedulers that will have to define their own
00092    * scheduler schema
00093    * 
00094    * @param dbs a <code>DatabaseSchema</code> value
00095    * @see org.objectweb.cjdbc.controller.scheduler.schema.SchedulerDatabaseSchema
00096    */
00097   public synchronized void setDatabaseSchema(DatabaseSchema dbs)
00098   {
00099     if (schedulerDatabaseSchema == null)
00100     {
00101       logger.info("Setting new database schema");
00102       schedulerDatabaseSchema = new SchedulerDatabaseSchema(dbs);
00103     }
00104     else
00105     { // Schema is updated, compute the diff !
00106       SchedulerDatabaseSchema newSchema = new SchedulerDatabaseSchema(dbs);
00107       ArrayList tables = schedulerDatabaseSchema.getTables();
00108       ArrayList newTables = newSchema.getTables();
00109       if (newTables == null)
00110       { // New schema is empty (no backend is active anymore)
00111         logger.info("Removing all tables.");
00112         schedulerDatabaseSchema = null;
00113         return;
00114       }
00115 
00116       // Remove extra-tables
00117       for (int i = 0; i < tables.size(); i++)
00118       {
00119         SchedulerDatabaseTable t = (SchedulerDatabaseTable) tables.get(i);
00120         if (!newSchema.hasTable(t.getName()))
00121         {
00122           schedulerDatabaseSchema.removeTable(t);
00123           if (logger.isInfoEnabled())
00124             logger.info("Removing table " + t.getName());
00125         }
00126       }
00127 
00128       // Add missing tables
00129       int size = newTables.size();
00130       for (int i = 0; i < size; i++)
00131       {
00132         SchedulerDatabaseTable t = (SchedulerDatabaseTable) newTables.get(i);
00133         if (!schedulerDatabaseSchema.hasTable(t.getName()))
00134         {
00135           schedulerDatabaseSchema.addTable(t);
00136           if (logger.isInfoEnabled())
00137             logger.info("Adding table " + t.getName());
00138         }
00139       }
00140     }
00141   }
00142 
00143   /**
00144    * Merge the given <code>DatabaseSchema</code> with the current one.
00145    * 
00146    * @param dbs a <code>DatabaseSchema</code> value
00147    * @see org.objectweb.cjdbc.controller.scheduler.schema.SchedulerDatabaseSchema
00148    */
00149   public void mergeDatabaseSchema(DatabaseSchema dbs)
00150   {
00151     try
00152     {
00153       logger.info("Merging new database schema");
00154       schedulerDatabaseSchema.mergeSchema(new SchedulerDatabaseSchema(dbs));
00155     }
00156     catch (Exception e)
00157     {
00158       logger.error("Error while merging new database schema", e);
00159     }
00160   }
00161 
00162   /**
00163    * Additionally to scheduling the request, this method replaces the SQL Date
00164    * macros such as now() with the current date.
00165    * 
00166    * @see org.objectweb.cjdbc.controller.scheduler.AbstractScheduler#scheduleReadRequest(SelectRequest)
00167    */
00168   public final void scheduleReadRequest(SelectRequest request)
00169       throws SQLException
00170   {
00171     synchronized (this)
00172     {
00173       request.setId(requestId++);
00174     }
00175   }
00176 
00177   /**
00178    * @see org.objectweb.cjdbc.controller.scheduler.AbstractScheduler#readCompletedNotify(SelectRequest)
00179    */
00180   public final void readCompletedNotify(SelectRequest request)
00181   {
00182   }
00183 
00184   /**
00185    * Additionally to scheduling the request, this method replaces the SQL Date
00186    * macros such as now() with the current date.
00187    * 
00188    * @see org.objectweb.cjdbc.controller.scheduler.AbstractScheduler#scheduleWriteRequest(AbstractWriteRequest)
00189    */
00190   public void scheduleNonSuspendedWriteRequest(AbstractWriteRequest request)
00191       throws SQLException, RollbackException
00192   {
00193     if (request.isCreate())
00194     {
00195       synchronized (this)
00196       {
00197         request.setId(requestId++);
00198       }
00199       return;
00200     }
00201 
00202     SchedulerDatabaseTable t = schedulerDatabaseSchema.getTable(request
00203         .getTableName());
00204     if (t == null)
00205     {
00206       String msg = "No table found for request " + request.getId();
00207       logger.error(msg);
00208       throw new SQLException(msg);
00209     }
00210 
00211     // Deadlock detection
00212     TransactionExclusiveLock tableLock = t.getLock();
00213     if (!request.isAutoCommit())
00214     {
00215       synchronized (this)
00216       {
00217         if (tableLock.isLocked())
00218         { // Is the lock owner blocked by a lock we already own?
00219           long owner = tableLock.getLocker();
00220           long us = request.getTransactionId();
00221           if (owner != us)
00222           { // Parse all tables
00223             ArrayList tables = schedulerDatabaseSchema.getTables();
00224             ArrayList weAreblocking = new ArrayList();
00225             int size = tables.size();
00226             for (int i = 0; i < size; i++)
00227             {
00228               SchedulerDatabaseTable table = (SchedulerDatabaseTable) tables
00229                   .get(i);
00230               if (table == null)
00231                 continue;
00232               TransactionExclusiveLock lock = table.getLock();
00233               // Are we the lock owner ?
00234               if (lock.isLocked())
00235               {
00236                 if (lock.getLocker() == us)
00237                 {
00238                   // Is 'owner' in the list of the blocked transactions?
00239                   if (lock.isWaiting(owner))
00240                   { // Deadlock detected, we must rollback
00241                     releaseLocks(us);
00242                     throw new RollbackException(
00243                         "Deadlock detected, rollbacking transaction " + us);
00244                   }
00245                   else
00246                     weAreblocking.addAll(lock.getWaitingList());
00247                 }
00248               }
00249             }
00250           }
00251           else
00252           { // We are the lock owner and already synchronized on this
00253             // Assign the request id and exit
00254             request.setId(requestId++);
00255             return;
00256           }
00257         }
00258         else
00259         { // Lock is free, take it in the synchronized block
00260           acquireLockAndSetRequestId(request, tableLock);
00261           return;
00262         }
00263       }
00264     }
00265 
00266     acquireLockAndSetRequestId(request, tableLock);
00267   }
00268 
00269   private void acquireLockAndSetRequestId(AbstractWriteRequest request,
00270       TransactionExclusiveLock tableLock) throws SQLException
00271   {
00272     // Acquire the lock
00273     if (tableLock.acquire(request))
00274     {
00275       synchronized (this)
00276       {
00277         request.setId(requestId++);
00278       }
00279       if (logger.isDebugEnabled())
00280         logger.debug("Request " + request.getId() + " scheduled for write ("
00281             + getPendingWrites() + " pending writes)");
00282     }
00283     else
00284     {
00285       if (logger.isWarnEnabled())
00286         logger.warn("Request " + request.getId() + " timed out ("
00287             + request.getTimeout() + " ms)");
00288       throw new SQLException("Timeout (" + request.getTimeout()
00289           + ") for request: " + request.getId());
00290     }
00291   }
00292 
00293   /**
00294    * @see org.objectweb.cjdbc.controller.scheduler.AbstractScheduler#notifyWriteCompleted(AbstractWriteRequest)
00295    */
00296   public final void notifyWriteCompleted(AbstractWriteRequest request)
00297   {
00298     if (request.isCreate())
00299     { // Add table to schema
00300       if (logger.isDebugEnabled())
00301         logger.debug("Adding table '" + request.getTableName()
00302             + "' to scheduler schema");
00303       synchronized (this)
00304       {
00305         schedulerDatabaseSchema.addTable(new SchedulerDatabaseTable(
00306             new DatabaseTable(request.getTableName())));
00307       }
00308     }
00309     else if (request.isDrop())
00310     { // Drop table from schema
00311       if (logger.isDebugEnabled())
00312         logger.debug("Removing table '" + request.getTableName()
00313             + "' to scheduler schema");
00314       synchronized (this)
00315       {
00316         schedulerDatabaseSchema.removeTable(schedulerDatabaseSchema
00317             .getTable(request.getTableName()));
00318       }
00319       return;
00320     }
00321 
00322     // Requests outside transaction delimiters must release the lock
00323     // as soon as they have executed
00324     if (request.isAutoCommit())
00325     {
00326       SchedulerDatabaseTable t = schedulerDatabaseSchema.getTable(request
00327           .getTableName());
00328       if (t == null)
00329       {
00330         String msg = "No table found to release lock for request "
00331             + request.getId();
00332         logger.error(msg);
00333       }
00334       else
00335         t.getLock().release();
00336     }
00337   }
00338 
00339   //
00340   // Transaction Management
00341   //
00342 
00343   /**
00344    * @see org.objectweb.cjdbc.controller.scheduler.AbstractScheduler#commitTransaction(long)
00345    */
00346   protected final void commitTransaction(long transactionId)
00347   {
00348     releaseLocks(transactionId);
00349   }
00350 
00351   /**
00352    * @see org.objectweb.cjdbc.controller.scheduler.AbstractScheduler#rollbackTransaction(long)
00353    */
00354   protected final void rollbackTransaction(long transactionId)
00355   {
00356     releaseLocks(transactionId);
00357   }
00358 
00359   /**
00360    * Release all locks we may own on tables.
00361    * 
00362    * @param transactionId id of the transaction that releases the locks
00363    */
00364   private synchronized void releaseLocks(long transactionId)
00365   {
00366     ArrayList tables = schedulerDatabaseSchema.getTables();
00367     int size = tables.size();
00368     for (int i = 0; i < size; i++)
00369     {
00370       SchedulerDatabaseTable t = (SchedulerDatabaseTable) tables.get(i);
00371       if (t == null)
00372         continue;
00373       TransactionExclusiveLock lock = t.getLock();
00374       // Are we the lock owner ?
00375       if (lock.isLocked())
00376         if (lock.getLocker() == transactionId)
00377           lock.release();
00378     }
00379   }
00380 
00381   //
00382   // Debug/Monitoring
00383   //
00384 
00385   /**
00386    * @see org.objectweb.cjdbc.controller.scheduler.AbstractScheduler#getXmlImpl()
00387    */
00388   public String getXmlImpl()
00389   {
00390     return "<" + DatabasesXmlTags.ELT_RAIDb1Scheduler + " "
00391         + DatabasesXmlTags.ATT_level + "=\""
00392         + DatabasesXmlTags.VAL_optimisticTransaction + "\"/>";
00393   }
00394 
00395 }

Generated on Mon Apr 11 22:01:33 2005 for C-JDBC by  doxygen 1.3.9.1