00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.scheduler.raidb2;
00026
00027 import java.sql.SQLException;
00028
00029 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00030 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00031 import org.objectweb.cjdbc.common.sql.SelectRequest;
00032 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00033 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00034 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045 public class RAIDb2QueryLevelScheduler extends AbstractScheduler
00046 {
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058 private long requestId;
00059 private int pendingReads;
00060
00061
00062
00063 private Object readSync;
00064 private Object writeSync;
00065
00066
00067
00068
00069
00070
00071
00072
00073 public RAIDb2QueryLevelScheduler()
00074 {
00075 super(RAIDbLevels.RAIDb2, ParsingGranularities.NO_PARSING);
00076 requestId = 0;
00077 pendingReads = 0;
00078 readSync = new Object();
00079 writeSync = new Object();
00080 }
00081
00082
00083
00084
00085
00086
00087
00088
00089 public void scheduleReadRequest(SelectRequest request) throws SQLException
00090 {
00091
00092 synchronized (this.writeSync)
00093 {
00094 if (getPendingWrites() == 0)
00095 {
00096 synchronized (this.readSync)
00097 {
00098 request.setId(requestId++);
00099 pendingReads++;
00100 if (logger.isDebugEnabled())
00101 logger.debug("Request " + request.getId() + " scheduled for read ("
00102 + pendingReads + " pending reads)");
00103 return;
00104 }
00105 }
00106
00107
00108 try
00109 {
00110 if (logger.isDebugEnabled())
00111 logger.debug("Request " + request.getId() + " waiting for "
00112 + getPendingWrites() + " pending writes)");
00113
00114 int timeout = request.getTimeout();
00115 if (timeout > 0)
00116 {
00117 long start = System.currentTimeMillis();
00118
00119 long lTimeout = timeout * 1000;
00120 this.writeSync.wait(lTimeout);
00121 long end = System.currentTimeMillis();
00122 int remaining = (int) (lTimeout - (end - start));
00123 if (remaining > 0)
00124 request.setTimeout(remaining);
00125 else
00126 {
00127 String msg = "Timeout (" + request.getTimeout() + ") for request: "
00128 + request.getId();
00129 logger.warn(msg);
00130 throw new SQLException(msg);
00131 }
00132 }
00133 else
00134 this.writeSync.wait();
00135
00136 synchronized (this.readSync)
00137 {
00138 request.setId(requestId++);
00139 pendingReads++;
00140 if (logger.isDebugEnabled())
00141 logger.debug("Request " + request.getId() + " scheduled for read ("
00142 + pendingReads + " pending reads)");
00143 return;
00144 }
00145 }
00146 catch (InterruptedException e)
00147 {
00148
00149 if (logger.isWarnEnabled())
00150 logger.warn("Request " + request.getId() + " timed out ("
00151 + request.getTimeout() + " s)");
00152 throw new SQLException("Timeout (" + request.getTimeout()
00153 + ") for request: " + request.getId());
00154 }
00155 }
00156 }
00157
00158
00159
00160
00161 public final void readCompletedNotify(SelectRequest request)
00162 {
00163 synchronized (this.readSync)
00164 {
00165 pendingReads--;
00166 if (logger.isDebugEnabled())
00167 logger.debug("Request " + request.getId() + " completed");
00168 if (pendingReads == 0)
00169 {
00170 if (logger.isDebugEnabled())
00171 logger.debug("Last read completed, notifying writes");
00172 readSync.notifyAll();
00173 }
00174 }
00175 }
00176
00177
00178
00179
00180 public void scheduleNonSuspendedWriteRequest(AbstractWriteRequest request)
00181 throws SQLException
00182 {
00183
00184
00185 synchronized (this.writeSync)
00186 {
00187 synchronized (this.readSync)
00188 {
00189 if (pendingReads == 0)
00190 {
00191 request.setId(requestId++);
00192 if (logger.isDebugEnabled())
00193 logger.debug("Request " + request.getId()
00194 + " scheduled for write (" + getPendingWrites()
00195 + " pending writes)");
00196 return;
00197 }
00198 }
00199 }
00200
00201 waitForReadCompletion(request);
00202 scheduleNonSuspendedWriteRequest(request);
00203 }
00204
00205
00206
00207
00208
00209
00210
00211 private void waitForReadCompletion(AbstractWriteRequest request)
00212 throws SQLException
00213 {
00214 synchronized (this.readSync)
00215 {
00216
00217 try
00218 {
00219 if (logger.isDebugEnabled())
00220 logger.debug("Request " + request.getId() + " waiting for "
00221 + pendingReads + " pending reads)");
00222
00223 int timeout = request.getTimeout();
00224 if (timeout > 0)
00225 {
00226 long start = System.currentTimeMillis();
00227
00228 long lTimeout = timeout * 1000;
00229 this.readSync.wait(lTimeout);
00230 long end = System.currentTimeMillis();
00231 int remaining = (int) (lTimeout - (end - start));
00232 if (remaining > 0)
00233 request.setTimeout(remaining);
00234 else
00235 {
00236 String msg = "Timeout (" + request.getTimeout() + ") for request: "
00237 + request.getId();
00238 logger.warn(msg);
00239 throw new SQLException(msg);
00240 }
00241 }
00242 else
00243 this.readSync.wait();
00244 }
00245 catch (InterruptedException e)
00246 {
00247
00248 if (logger.isWarnEnabled())
00249 logger.warn("Request " + request.getId() + " timed out ("
00250 + request.getTimeout() + " ms)");
00251 throw new SQLException("Timeout (" + request.getTimeout()
00252 + ") for request: " + request.getId());
00253 }
00254 }
00255 }
00256
00257
00258
00259
00260 public final synchronized void notifyWriteCompleted(
00261 AbstractWriteRequest request)
00262 {
00263 synchronized (this.writeSync)
00264 {
00265 if (logger.isDebugEnabled())
00266 logger.debug("Request " + request.getId() + " completed");
00267 if (getPendingWrites() == 0)
00268 {
00269 if (logger.isDebugEnabled())
00270 logger.debug("Last write completed, notifying reads");
00271 writeSync.notifyAll();
00272 }
00273 }
00274 }
00275
00276
00277
00278
00279
00280
00281
00282
00283 protected final void commitTransaction(long transactionId)
00284 {
00285 }
00286
00287
00288
00289
00290 protected final void rollbackTransaction(long transactionId)
00291 {
00292 }
00293
00294
00295
00296
00297
00298
00299
00300 public String getXmlImpl()
00301 {
00302 StringBuffer info = new StringBuffer();
00303 info.append("<" + DatabasesXmlTags.ELT_RAIDb2Scheduler + " "
00304 + DatabasesXmlTags.ATT_level + "=\"" + DatabasesXmlTags.VAL_query
00305 + "\"/>");
00306 info.append(System.getProperty("line.separator"));
00307 return info.toString();
00308 }
00309
00310 }