00001
00025
package org.objectweb.cjdbc.controller.scheduler.raidb1;
00026
00027
import java.sql.SQLException;
00028
00029
import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00030
import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00031
import org.objectweb.cjdbc.common.sql.SelectRequest;
00032
import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00033
import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00034
import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00035
00045 public class RAIDb1QueryLevelScheduler extends AbstractScheduler
00046 {
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058 private long requestId;
00059 private int pendingReads;
00060
00061
00062
00063 private Object
readSync;
00064 private Object
writeSync;
00065
00066
00067
00068
00069
00073 public RAIDb1QueryLevelScheduler()
00074 {
00075 super(
RAIDbLevels.RAIDb1,
ParsingGranularities.NO_PARSING);
00076
requestId = 0;
00077
pendingReads = 0;
00078
readSync =
new Object();
00079
writeSync =
new Object();
00080 }
00081
00082
00083
00084
00085
00089 public void scheduleReadRequest(
SelectRequest request)
throws SQLException
00090 {
00091
00092
synchronized (
this.writeSync)
00093 {
00094
if (
getPendingWrites() == 0)
00095 {
00096
synchronized (
this.readSync)
00097 {
00098 request.setId(
requestId++);
00099
pendingReads++;
00100
if (logger.
isDebugEnabled())
00101 logger.
debug(
"Request "
00102 + request.getId()
00103 + (request.isAutoCommit() ?
"" :
" transaction "
00104 + request.getTransactionId()) +
" scheduled for read ("
00105 +
pendingReads +
" pending reads)");
00106
return;
00107 }
00108 }
00109
00110
00111
try
00112 {
00113
if (logger.
isDebugEnabled())
00114 logger.
debug(
"Request " + request.getId() +
" waiting for "
00115 +
getPendingWrites() +
" pending writes)");
00116
00117
int timeout = request.getTimeout();
00118
if (timeout > 0)
00119 {
00120
long start = System.currentTimeMillis();
00121
00122
long lTimeout = timeout * 1000;
00123
this.writeSync.wait(lTimeout);
00124
long end = System.currentTimeMillis();
00125
int remaining = (
int) (lTimeout - (end - start));
00126
if (remaining > 0)
00127 request.setTimeout(remaining);
00128
else
00129 {
00130 String msg =
"Timeout (" + request.getTimeout() +
") for request: "
00131 + request.getId();
00132 logger.
warn(msg);
00133
throw new SQLException(msg);
00134 }
00135 }
00136
else
00137
this.writeSync.wait();
00138
00139
synchronized (
this.readSync)
00140 {
00141 request.setId(
requestId++);
00142
pendingReads++;
00143
if (logger.
isDebugEnabled())
00144 logger.
debug(
"Request " + request.getId() +
" scheduled for read ("
00145 +
pendingReads +
" pending reads)");
00146
return;
00147 }
00148 }
00149
catch (InterruptedException e)
00150 {
00151
00152
if (logger.
isWarnEnabled())
00153 logger.
warn(
"Request " + request.getId() +
" timed out ("
00154 + request.getTimeout() +
" s)");
00155
throw new SQLException(
"Timeout (" + request.getTimeout()
00156 +
") for request: " + request.getId());
00157 }
00158 }
00159 }
00160
00164 public final void readCompletedNotify(
SelectRequest request)
00165 {
00166
synchronized (
this.readSync)
00167 {
00168
pendingReads--;
00169
if (logger.
isDebugEnabled())
00170 logger.
debug(
"Read request " + request.
getId() +
" completed - "
00171 +
pendingReads +
" pending reads");
00172
if (
pendingReads == 0)
00173 {
00174
if (logger.
isDebugEnabled())
00175 logger.
debug(
"Last read completed, notifying writes");
00176
readSync.notifyAll();
00177 }
00178 }
00179 }
00180
00184 public void scheduleNonSuspendedWriteRequest(
AbstractWriteRequest request)
00185
throws SQLException
00186 {
00187
00188
00189
synchronized (
this.writeSync)
00190 {
00191
synchronized (
this.readSync)
00192 {
00193
if (
pendingReads == 0)
00194 {
00195 request.setId(
requestId++);
00196
if (logger.
isDebugEnabled())
00197 logger.
debug(
"Request "
00198 + request.getId()
00199 + (request.isAutoCommit() ?
"" :
" transaction "
00200 + request.getTransactionId()) +
" scheduled for write ("
00201 +
getPendingWrites() +
" pending writes)");
00202
return;
00203 }
00204 }
00205 }
00206
00207
waitForReadCompletion(request);
00208
scheduleNonSuspendedWriteRequest(request);
00209 }
00210
00217 private void waitForReadCompletion(
AbstractWriteRequest request)
00218
throws SQLException
00219 {
00220
synchronized (
this.readSync)
00221 {
00222
00223
try
00224 {
00225
if (logger.
isDebugEnabled())
00226 logger.
debug(
"Request " + request.getId() +
" waiting for "
00227 +
pendingReads +
" pending reads)");
00228
00229
int timeout = request.getTimeout();
00230
if (timeout > 0)
00231 {
00232
long start = System.currentTimeMillis();
00233
00234
long lTimeout = timeout * 1000;
00235
this.readSync.wait(lTimeout);
00236
long end = System.currentTimeMillis();
00237
int remaining = (
int) (lTimeout - (end - start));
00238
if (remaining > 0)
00239 request.setTimeout(remaining);
00240
else
00241 {
00242 String msg =
"Timeout (" + request.getTimeout() +
") for request: "
00243 + request.getId();
00244 logger.
warn(msg);
00245
throw new SQLException(msg);
00246 }
00247 }
00248
else
00249
this.readSync.wait();
00250 }
00251
catch (InterruptedException e)
00252 {
00253
00254
if (logger.
isWarnEnabled())
00255 logger.
warn(
"Request " + request.getId() +
" timed out ("
00256 + request.getTimeout() +
" ms)");
00257
throw new SQLException(
"Timeout (" + request.getTimeout()
00258 +
") for request: " + request.getId());
00259 }
00260 }
00261 }
00262
00266 public final synchronized void notifyWriteCompleted(
00267
AbstractWriteRequest request)
00268 {
00269
synchronized (
this.writeSync)
00270 {
00271
if (logger.
isDebugEnabled())
00272 logger.
debug(
"Request " + request.
getId() +
" completed - "
00273 +
getPendingWrites() +
" pending writes");
00274
if (
getPendingWrites() == 0)
00275 {
00276
if (logger.
isDebugEnabled())
00277 logger.
debug(
"Last write completed, notifying reads");
00278
writeSync.notifyAll();
00279 }
00280 }
00281 }
00282
00283
00284
00285
00286
00290 protected final void commitTransaction(
long transactionId)
00291 {
00292 }
00293
00297 protected final void rollbackTransaction(
long transactionId)
00298 {
00299 }
00300
00301
00302
00303
00307 public String
getXmlImpl()
00308 {
00309
return "<" +
DatabasesXmlTags.ELT_RAIDb1Scheduler +
" "
00310 +
DatabasesXmlTags.ATT_level +
"=\"" +
DatabasesXmlTags.VAL_query
00311 +
"\"/>";
00312 }
00313
00314 }