00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.scheduler.raidb1;
00026
00027 import java.sql.SQLException;
00028
00029 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00030 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00031 import org.objectweb.cjdbc.common.sql.SelectRequest;
00032 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00033 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00034 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045 public class RAIDb1QueryLevelScheduler extends AbstractScheduler
00046 {
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058 private long requestId;
00059 private int pendingReads;
00060
00061
00062
00063 private Object readSync;
00064 private Object writeSync;
00065
00066
00067
00068
00069
00070
00071
00072
00073 public RAIDb1QueryLevelScheduler()
00074 {
00075 super(RAIDbLevels.RAIDb1, ParsingGranularities.NO_PARSING);
00076 requestId = 0;
00077 pendingReads = 0;
00078 readSync = new Object();
00079 writeSync = new Object();
00080 }
00081
00082
00083
00084
00085
00086
00087
00088
00089 public void scheduleReadRequest(SelectRequest request) throws SQLException
00090 {
00091
00092 synchronized (this.writeSync)
00093 {
00094 if (getPendingWrites() == 0)
00095 {
00096 synchronized (this.readSync)
00097 {
00098 request.setId(requestId++);
00099 pendingReads++;
00100 if (logger.isDebugEnabled())
00101 logger.debug("Request "
00102 + request.getId()
00103 + (request.isAutoCommit() ? "" : " transaction "
00104 + request.getTransactionId()) + " scheduled for read ("
00105 + pendingReads + " pending reads)");
00106 return;
00107 }
00108 }
00109
00110
00111 try
00112 {
00113 if (logger.isDebugEnabled())
00114 logger.debug("Request " + request.getId() + " waiting for "
00115 + getPendingWrites() + " pending writes)");
00116
00117 int timeout = request.getTimeout();
00118 if (timeout > 0)
00119 {
00120 long start = System.currentTimeMillis();
00121
00122 long lTimeout = timeout * 1000;
00123 this.writeSync.wait(lTimeout);
00124 long end = System.currentTimeMillis();
00125 int remaining = (int) (lTimeout - (end - start));
00126 if (remaining > 0)
00127 request.setTimeout(remaining);
00128 else
00129 {
00130 String msg = "Timeout (" + request.getTimeout() + ") for request: "
00131 + request.getId();
00132 logger.warn(msg);
00133 throw new SQLException(msg);
00134 }
00135 }
00136 else
00137 this.writeSync.wait();
00138
00139 synchronized (this.readSync)
00140 {
00141 request.setId(requestId++);
00142 pendingReads++;
00143 if (logger.isDebugEnabled())
00144 logger.debug("Request " + request.getId() + " scheduled for read ("
00145 + pendingReads + " pending reads)");
00146 return;
00147 }
00148 }
00149 catch (InterruptedException e)
00150 {
00151
00152 if (logger.isWarnEnabled())
00153 logger.warn("Request " + request.getId() + " timed out ("
00154 + request.getTimeout() + " s)");
00155 throw new SQLException("Timeout (" + request.getTimeout()
00156 + ") for request: " + request.getId());
00157 }
00158 }
00159 }
00160
00161
00162
00163
00164 public final void readCompletedNotify(SelectRequest request)
00165 {
00166 synchronized (this.readSync)
00167 {
00168 pendingReads--;
00169 if (logger.isDebugEnabled())
00170 logger.debug("Read request " + request.getId() + " completed - "
00171 + pendingReads + " pending reads");
00172 if (pendingReads == 0)
00173 {
00174 if (logger.isDebugEnabled())
00175 logger.debug("Last read completed, notifying writes");
00176 readSync.notifyAll();
00177 }
00178 }
00179 }
00180
00181
00182
00183
00184 public void scheduleNonSuspendedWriteRequest(AbstractWriteRequest request)
00185 throws SQLException
00186 {
00187
00188
00189 synchronized (this.writeSync)
00190 {
00191 synchronized (this.readSync)
00192 {
00193 if (pendingReads == 0)
00194 {
00195 request.setId(requestId++);
00196 if (logger.isDebugEnabled())
00197 logger.debug("Request "
00198 + request.getId()
00199 + (request.isAutoCommit() ? "" : " transaction "
00200 + request.getTransactionId()) + " scheduled for write ("
00201 + getPendingWrites() + " pending writes)");
00202 return;
00203 }
00204 }
00205 }
00206
00207 waitForReadCompletion(request);
00208 scheduleNonSuspendedWriteRequest(request);
00209 }
00210
00211
00212
00213
00214
00215
00216
00217 private void waitForReadCompletion(AbstractWriteRequest request)
00218 throws SQLException
00219 {
00220 synchronized (this.readSync)
00221 {
00222
00223 try
00224 {
00225 if (logger.isDebugEnabled())
00226 logger.debug("Request " + request.getId() + " waiting for "
00227 + pendingReads + " pending reads)");
00228
00229 int timeout = request.getTimeout();
00230 if (timeout > 0)
00231 {
00232 long start = System.currentTimeMillis();
00233
00234 long lTimeout = timeout * 1000;
00235 this.readSync.wait(lTimeout);
00236 long end = System.currentTimeMillis();
00237 int remaining = (int) (lTimeout - (end - start));
00238 if (remaining > 0)
00239 request.setTimeout(remaining);
00240 else
00241 {
00242 String msg = "Timeout (" + request.getTimeout() + ") for request: "
00243 + request.getId();
00244 logger.warn(msg);
00245 throw new SQLException(msg);
00246 }
00247 }
00248 else
00249 this.readSync.wait();
00250 }
00251 catch (InterruptedException e)
00252 {
00253
00254 if (logger.isWarnEnabled())
00255 logger.warn("Request " + request.getId() + " timed out ("
00256 + request.getTimeout() + " ms)");
00257 throw new SQLException("Timeout (" + request.getTimeout()
00258 + ") for request: " + request.getId());
00259 }
00260 }
00261 }
00262
00263
00264
00265
00266 public final synchronized void notifyWriteCompleted(
00267 AbstractWriteRequest request)
00268 {
00269 synchronized (this.writeSync)
00270 {
00271 if (logger.isDebugEnabled())
00272 logger.debug("Request " + request.getId() + " completed - "
00273 + getPendingWrites() + " pending writes");
00274 if (getPendingWrites() == 0)
00275 {
00276 if (logger.isDebugEnabled())
00277 logger.debug("Last write completed, notifying reads");
00278 writeSync.notifyAll();
00279 }
00280 }
00281 }
00282
00283
00284
00285
00286
00287
00288
00289
00290 protected final void commitTransaction(long transactionId)
00291 {
00292 }
00293
00294
00295
00296
00297 protected final void rollbackTransaction(long transactionId)
00298 {
00299 }
00300
00301
00302
00303
00304
00305
00306
00307 public String getXmlImpl()
00308 {
00309 return "<" + DatabasesXmlTags.ELT_RAIDb1Scheduler + " "
00310 + DatabasesXmlTags.ATT_level + "=\"" + DatabasesXmlTags.VAL_query
00311 + "\"/>";
00312 }
00313
00314 }