
00001 00025 package org.objectweb.cjdbc.controller.scheduler.raidb1; 00026 00027 import java.sql.SQLException; 00028 00029 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 00030 import org.objectweb.cjdbc.common.sql.ParsingGranularities; 00031 import org.objectweb.cjdbc.common.sql.SelectRequest; 00032 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 00033 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels; 00034 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler; 00035 00045 public class RAIDb1QueryLevelScheduler extends AbstractScheduler 00046 { 00047 00048 // 00049 // How the code is organized ? 00050 // 00051 // 1. Member variables 00052 // 2. Constructor 00053 // 3. Request handling 00054 // 4. Transaction management 00055 // 5. Debug/Monitoring 00056 // 00057 00058 private long requestId; 00059 private int pendingReads; 00060 00061 // We have to distinguish read and write to wake up only 00062 // waiting reads or writes according to the situation 00063 private Object readSync; // to synchronize on reads completion 00064 private Object writeSync; // to synchronize on writes completion 00065 00066 // 00067 // Constructor 00068 // 00069 00073 public RAIDb1QueryLevelScheduler() 00074 { 00075 super(RAIDbLevels.RAIDb1, ParsingGranularities.NO_PARSING); 00076 requestId = 0; 00077 pendingReads = 0; 00078 readSync = new Object(); 00079 writeSync = new Object(); 00080 } 00081 00082 // 00083 // Request Handling 00084 // 00085 00089 public void scheduleReadRequest(SelectRequest request) throws SQLException 00090 { 00091 // Now deal with synchronization 00092 synchronized (this.writeSync) 00093 { 00094 if (getPendingWrites() == 0) 00095 { // No writes pending, go ahead ! 00096 synchronized (this.readSync) 00097 { 00098 request.setId(requestId++); 00099 pendingReads++; 00100 if (logger.isDebugEnabled()) 00101 logger.debug("Request " 00102 + request.getId() 00103 + (request.isAutoCommit() ? "" : " transaction " 00104 + request.getTransactionId()) + " scheduled for read (" 00105 + pendingReads + " pending reads)"); 00106 return; 00107 } 00108 } 00109 00110 // Wait for the writes completion 00111 try 00112 { 00113 if (logger.isDebugEnabled()) 00114 logger.debug("Request " + request.getId() + " waiting for " 00115 + getPendingWrites() + " pending writes)"); 00116 00117 int timeout = request.getTimeout(); 00118 if (timeout > 0) 00119 { 00120 long start = System.currentTimeMillis(); 00121 // Convert seconds to milliseconds for wait call 00122 long lTimeout = timeout * 1000; 00123 this.writeSync.wait(lTimeout); 00124 long end = System.currentTimeMillis(); 00125 int remaining = (int) (lTimeout - (end - start)); 00126 if (remaining > 0) 00127 request.setTimeout(remaining); 00128 else 00129 { 00130 String msg = "Timeout (" + request.getTimeout() + ") for request: " 00131 + request.getId(); 00132 logger.warn(msg); 00133 throw new SQLException(msg); 00134 } 00135 } 00136 else 00137 this.writeSync.wait(); 00138 00139 synchronized (this.readSync) 00140 { 00141 request.setId(requestId++); 00142 pendingReads++; 00143 if (logger.isDebugEnabled()) 00144 logger.debug("Request " + request.getId() + " scheduled for read (" 00145 + pendingReads + " pending reads)"); 00146 return; // Ok, write completed before timeout 00147 } 00148 } 00149 catch (InterruptedException e) 00150 { 00151 // Timeout 00152 if (logger.isWarnEnabled()) 00153 logger.warn("Request " + request.getId() + " timed out (" 00154 + request.getTimeout() + " s)"); 00155 throw new SQLException("Timeout (" + request.getTimeout() 00156 + ") for request: " + request.getId()); 00157 } 00158 } 00159 } 00160 00164 public final void readCompletedNotify(SelectRequest request) 00165 { 00166 synchronized (this.readSync) 00167 { 00168 pendingReads--; 00169 if (logger.isDebugEnabled()) 00170 logger.debug("Read request " + request.getId() + " completed - " 00171 + pendingReads + " pending reads"); 00172 if (pendingReads == 0) 00173 { 00174 if (logger.isDebugEnabled()) 00175 logger.debug("Last read completed, notifying writes"); 00176 readSync.notifyAll(); // Wakes up any waiting write query 00177 } 00178 } 00179 } 00180 00184 public void scheduleNonSuspendedWriteRequest(AbstractWriteRequest request) 00185 throws SQLException 00186 { 00187 // We have to take the locks in the same order as reads else 00188 // we could have a deadlock 00189 synchronized (this.writeSync) 00190 { 00191 synchronized (this.readSync) 00192 { 00193 if (pendingReads == 0) 00194 { // No read pending, go ahead 00195 request.setId(requestId++); 00196 if (logger.isDebugEnabled()) 00197 logger.debug("Request " 00198 + request.getId() 00199 + (request.isAutoCommit() ? "" : " transaction " 00200 + request.getTransactionId()) + " scheduled for write (" 00201 + getPendingWrites() + " pending writes)"); 00202 return; 00203 } 00204 } 00205 } 00206 00207 waitForReadCompletion(request); 00208 scheduleNonSuspendedWriteRequest(request); 00209 } 00210 00217 private void waitForReadCompletion(AbstractWriteRequest request) 00218 throws SQLException 00219 { 00220 synchronized (this.readSync) 00221 { 00222 // Wait for the reads completion 00223 try 00224 { 00225 if (logger.isDebugEnabled()) 00226 logger.debug("Request " + request.getId() + " waiting for " 00227 + pendingReads + " pending reads)"); 00228 00229 int timeout = request.getTimeout(); 00230 if (timeout > 0) 00231 { 00232 long start = System.currentTimeMillis(); 00233 // Convert seconds to milliseconds for wait call 00234 long lTimeout = timeout * 1000; 00235 this.readSync.wait(lTimeout); 00236 long end = System.currentTimeMillis(); 00237 int remaining = (int) (lTimeout - (end - start)); 00238 if (remaining > 0) 00239 request.setTimeout(remaining); 00240 else 00241 { 00242 String msg = "Timeout (" + request.getTimeout() + ") for request: " 00243 + request.getId(); 00244 logger.warn(msg); 00245 throw new SQLException(msg); 00246 } 00247 } 00248 else 00249 this.readSync.wait(); 00250 } 00251 catch (InterruptedException e) 00252 { 00253 // Timeout 00254 if (logger.isWarnEnabled()) 00255 logger.warn("Request " + request.getId() + " timed out (" 00256 + request.getTimeout() + " ms)"); 00257 throw new SQLException("Timeout (" + request.getTimeout() 00258 + ") for request: " + request.getId()); 00259 } 00260 } 00261 } 00262 00266 public final synchronized void notifyWriteCompleted( 00267 AbstractWriteRequest request) 00268 { 00269 synchronized (this.writeSync) 00270 { 00271 if (logger.isDebugEnabled()) 00272 logger.debug("Request " + request.getId() + " completed - " 00273 + getPendingWrites() + " pending writes"); 00274 if (getPendingWrites() == 0) 00275 { 00276 if (logger.isDebugEnabled()) 00277 logger.debug("Last write completed, notifying reads"); 00278 writeSync.notifyAll(); // Wakes up all waiting read queries 00279 } 00280 } 00281 } 00282 00283 // 00284 // Transaction Management 00285 // 00286 00290 protected final void commitTransaction(long transactionId) 00291 { 00292 } 00293 00297 protected final void rollbackTransaction(long transactionId) 00298 { 00299 } 00300 00301 // 00302 // Debug/Monitoring 00303 // 00307 public String getXmlImpl() 00308 { 00309 return "<" + DatabasesXmlTags.ELT_RAIDb1Scheduler + " " 00310 + DatabasesXmlTags.ATT_level + "=\"" + DatabasesXmlTags.VAL_query 00311 + "\"/>"; 00312 } 00313 00314 }

CJDBCversion1.0.4に対してTue Oct 12 15:16:03 2004に生成されました。 doxygen 1.3.8