Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

RAIDb2QueryLevelScheduler.java

00001 /**
00002  * C-JDBC: Clustered JDBC.
00003  * Copyright (C) 2002-2004 French National Institute For Research In Computer
00004  * Science And Control (INRIA).
00005  * Contact: c-jdbc@objectweb.org
00006  * 
00007  * This library is free software; you can redistribute it and/or modify it
00008  * under the terms of the GNU Lesser General Public License as published by the
00009  * Free Software Foundation; either version 2.1 of the License, or any later
00010  * version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but WITHOUT
00013  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00014  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
00015  * for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public License
00018  * along with this library; if not, write to the Free Software Foundation,
00019  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
00020  *
00021  * Initial developer(s): Emmanuel Cecchet.
00022  * Contributor(s): _________________________.
00023  */
00024 
00025 package org.objectweb.cjdbc.controller.scheduler.raidb2;
00026 
00027 import java.sql.SQLException;
00028 
00029 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00030 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00031 import org.objectweb.cjdbc.common.sql.SelectRequest;
00032 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00033 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00034 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00035 
00036 /**
00037  * This scheduler provides query level scheduling for RAIDb-2 controllers. Reads
00038  * can execute in parallel until a write comes in. Then the write waits for the
00039  * completion of the reads. Any new read is stacked after the write and they are
00040  * released together when the write has completed its execution.
00041  * 
00042  * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
00043  * @version 1.0
00044  */
00045 public class RAIDb2QueryLevelScheduler extends AbstractScheduler
00046 {
00047 
00048   //
00049   // How the code is organized ?
00050   //
00051   // 1. Member variables
00052   // 2. Constructor
00053   // 3. Request handling
00054   // 4. Transaction management
00055   // 5. Debug/Monitoring
00056   //
00057 
00058   private long   requestId;
00059   private int    pendingReads;
00060 
00061   // We have to distinguish read and write to wake up only
00062   // waiting reads or writes according to the situation
00063   private Object readSync;    // to synchronize on reads completion
00064   private Object writeSync;   // to synchronize on writes completion
00065 
00066   //
00067   // Constructor
00068   //
00069 
00070   /**
00071    * Creates a new Query Level Scheduler
00072    */
00073   public RAIDb2QueryLevelScheduler()
00074   {
00075     super(RAIDbLevels.RAIDb2, ParsingGranularities.NO_PARSING);
00076     requestId = 0;
00077     pendingReads = 0;
00078     readSync = new Object();
00079     writeSync = new Object();
00080   }
00081 
00082   //
00083   // Request Handling
00084   //
00085 
00086   /**
00087    * @see org.objectweb.cjdbc.controller.scheduler.AbstractScheduler#scheduleReadRequest(SelectRequest)
00088    */
00089   public void scheduleReadRequest(SelectRequest request) throws SQLException
00090   {
00091     // Now deal with synchronization
00092     synchronized (this.writeSync)
00093     {
00094       if (getPendingWrites() == 0)
00095       { // No writes pending, go ahead !
00096         synchronized (this.readSync)
00097         {
00098           request.setId(requestId++);
00099           pendingReads++;
00100           if (logger.isDebugEnabled())
00101             logger.debug("Request " + request.getId() + " scheduled for read ("
00102                 + pendingReads + " pending reads)");
00103           return;
00104         }
00105       }
00106 
00107       // Wait for the writes completion
00108       try
00109       {
00110         if (logger.isDebugEnabled())
00111           logger.debug("Request " + request.getId() + " waiting for "
00112               + getPendingWrites() + " pending writes)");
00113 
00114         int timeout = request.getTimeout();
00115         if (timeout > 0)
00116         {
00117           long start = System.currentTimeMillis();
00118           // Convert seconds to milliseconds for wait call
00119           long lTimeout = timeout * 1000;
00120           this.writeSync.wait(lTimeout);
00121           long end = System.currentTimeMillis();
00122           int remaining = (int) (lTimeout - (end - start));
00123           if (remaining > 0)
00124             request.setTimeout(remaining);
00125           else
00126           {
00127             String msg = "Timeout (" + request.getTimeout() + ") for request: "
00128                 + request.getId();
00129             logger.warn(msg);
00130             throw new SQLException(msg);
00131           }
00132         }
00133         else
00134           this.writeSync.wait();
00135 
00136         synchronized (this.readSync)
00137         {
00138           request.setId(requestId++);
00139           pendingReads++;
00140           if (logger.isDebugEnabled())
00141             logger.debug("Request " + request.getId() + " scheduled for read ("
00142                 + pendingReads + " pending reads)");
00143           return; // Ok, write completed before timeout
00144         }
00145       }
00146       catch (InterruptedException e)
00147       {
00148         // Timeout
00149         if (logger.isWarnEnabled())
00150           logger.warn("Request " + request.getId() + " timed out ("
00151               + request.getTimeout() + " s)");
00152         throw new SQLException("Timeout (" + request.getTimeout()
00153             + ") for request: " + request.getId());
00154       }
00155     }
00156   }
00157 
00158   /**
00159    * @see org.objectweb.cjdbc.controller.scheduler.AbstractScheduler#readCompletedNotify(SelectRequest)
00160    */
00161   public final void readCompletedNotify(SelectRequest request)
00162   {
00163     synchronized (this.readSync)
00164     {
00165       pendingReads--;
00166       if (logger.isDebugEnabled())
00167         logger.debug("Request " + request.getId() + " completed");
00168       if (pendingReads == 0)
00169       {
00170         if (logger.isDebugEnabled())
00171           logger.debug("Last read completed, notifying writes");
00172         readSync.notifyAll(); // Wakes up any waiting write query
00173       }
00174     }
00175   }
00176 
00177   /**
00178    * @see org.objectweb.cjdbc.controller.scheduler.AbstractScheduler#scheduleWriteRequest(AbstractWriteRequest)
00179    */
00180   public void scheduleNonSuspendedWriteRequest(AbstractWriteRequest request)
00181       throws SQLException
00182   {
00183     // We have to take the locks in the same order as reads else
00184     // we could have a deadlock
00185     synchronized (this.writeSync)
00186     {
00187       synchronized (this.readSync)
00188       {
00189         if (pendingReads == 0)
00190         { // No read pending, go ahead
00191           request.setId(requestId++);
00192           if (logger.isDebugEnabled())
00193             logger.debug("Request " + request.getId()
00194                 + " scheduled for write (" + getPendingWrites()
00195                 + " pending writes)");
00196           return;
00197         }
00198       }
00199     }
00200 
00201     waitForReadCompletion(request);
00202     scheduleNonSuspendedWriteRequest(request);
00203   }
00204 
00205   /**
00206    * Wait for the reads completion. Synchronizes on this.readSync.
00207    * 
00208    * @param request the request that is being scheduled
00209    * @throws SQLException if an error occurs
00210    */
00211   private void waitForReadCompletion(AbstractWriteRequest request)
00212       throws SQLException
00213   {
00214     synchronized (this.readSync)
00215     {
00216       // Wait for the reads completion
00217       try
00218       {
00219         if (logger.isDebugEnabled())
00220           logger.debug("Request " + request.getId() + " waiting for "
00221               + pendingReads + " pending reads)");
00222 
00223         int timeout = request.getTimeout();
00224         if (timeout > 0)
00225         {
00226           long start = System.currentTimeMillis();
00227           // Convert seconds to milliseconds for wait call
00228           long lTimeout = timeout * 1000;
00229           this.readSync.wait(lTimeout);
00230           long end = System.currentTimeMillis();
00231           int remaining = (int) (lTimeout - (end - start));
00232           if (remaining > 0)
00233             request.setTimeout(remaining);
00234           else
00235           {
00236             String msg = "Timeout (" + request.getTimeout() + ") for request: "
00237                 + request.getId();
00238             logger.warn(msg);
00239             throw new SQLException(msg);
00240           }
00241         }
00242         else
00243           this.readSync.wait();
00244       }
00245       catch (InterruptedException e)
00246       {
00247         // Timeout
00248         if (logger.isWarnEnabled())
00249           logger.warn("Request " + request.getId() + " timed out ("
00250               + request.getTimeout() + " ms)");
00251         throw new SQLException("Timeout (" + request.getTimeout()
00252             + ") for request: " + request.getId());
00253       }
00254     }
00255   }
00256 
00257   /**
00258    * @see org.objectweb.cjdbc.controller.scheduler.AbstractScheduler#notifyWriteCompleted(AbstractWriteRequest)
00259    */
00260   public final synchronized void notifyWriteCompleted(
00261       AbstractWriteRequest request)
00262   {
00263     synchronized (this.writeSync)
00264     {
00265       if (logger.isDebugEnabled())
00266         logger.debug("Request " + request.getId() + " completed");
00267       if (getPendingWrites() == 0)
00268       {
00269         if (logger.isDebugEnabled())
00270           logger.debug("Last write completed, notifying reads");
00271         writeSync.notifyAll(); // Wakes up all waiting read queries
00272       }
00273     }
00274   }
00275 
00276   //
00277   // Transaction Management
00278   //
00279 
00280   /**
00281    * @see org.objectweb.cjdbc.controller.scheduler.AbstractScheduler#commitTransaction(long)
00282    */
00283   protected final void commitTransaction(long transactionId)
00284   {
00285   }
00286 
00287   /**
00288    * @see org.objectweb.cjdbc.controller.scheduler.AbstractScheduler#rollbackTransaction(long)
00289    */
00290   protected final void rollbackTransaction(long transactionId)
00291   {
00292   }
00293 
00294   //
00295   // Debug/Monitoring
00296   //
00297   /**
00298    * @see org.objectweb.cjdbc.controller.scheduler.AbstractScheduler#getXmlImpl()
00299    */
00300   public String getXmlImpl()
00301   {
00302     StringBuffer info = new StringBuffer();
00303     info.append("<" + DatabasesXmlTags.ELT_RAIDb2Scheduler + " "
00304         + DatabasesXmlTags.ATT_level + "=\"" + DatabasesXmlTags.VAL_query
00305         + "\"/>");
00306     info.append(System.getProperty("line.separator"));
00307     return info.toString();
00308   }
00309 
00310 }

Generated on Mon Apr 11 22:01:34 2005 for C-JDBC by  doxygen 1.3.9.1