00001
00025
package org.objectweb.cjdbc.controller.scheduler.raidb2;
00026
00027
import java.sql.SQLException;
00028
00029
import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00030
import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00031
import org.objectweb.cjdbc.common.sql.SelectRequest;
00032
import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00033
import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00034
import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00035
00045 public class RAIDb2QueryLevelScheduler extends AbstractScheduler
00046 {
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058 private long requestId;
00059 private int pendingReads;
00060
00061
00062
00063 private Object
readSync;
00064 private Object
writeSync;
00065
00066
00067
00068
00069
00073 public RAIDb2QueryLevelScheduler()
00074 {
00075 super(
RAIDbLevels.RAIDb2,
ParsingGranularities.NO_PARSING);
00076
requestId = 0;
00077
pendingReads = 0;
00078
readSync =
new Object();
00079
writeSync =
new Object();
00080 }
00081
00082
00083
00084
00085
00089 public void scheduleReadRequest(
SelectRequest request)
throws SQLException
00090 {
00091
00092
synchronized (
this.writeSync)
00093 {
00094
if (
getPendingWrites() == 0)
00095 {
00096
synchronized (
this.readSync)
00097 {
00098 request.setId(
requestId++);
00099
pendingReads++;
00100
if (logger.
isDebugEnabled())
00101 logger.
debug(
"Request " + request.getId() +
" scheduled for read ("
00102 +
pendingReads +
" pending reads)");
00103
return;
00104 }
00105 }
00106
00107
00108
try
00109 {
00110
if (logger.
isDebugEnabled())
00111 logger.
debug(
"Request " + request.getId() +
" waiting for "
00112 +
getPendingWrites() +
" pending writes)");
00113
00114
int timeout = request.getTimeout();
00115
if (timeout > 0)
00116 {
00117
long start = System.currentTimeMillis();
00118
00119
long lTimeout = timeout * 1000;
00120
this.writeSync.wait(lTimeout);
00121
long end = System.currentTimeMillis();
00122
int remaining = (
int) (lTimeout - (end - start));
00123
if (remaining > 0)
00124 request.setTimeout(remaining);
00125
else
00126 {
00127 String msg =
"Timeout (" + request.getTimeout() +
") for request: "
00128 + request.getId();
00129 logger.
warn(msg);
00130
throw new SQLException(msg);
00131 }
00132 }
00133
else
00134
this.writeSync.wait();
00135
00136
synchronized (
this.readSync)
00137 {
00138 request.setId(
requestId++);
00139
pendingReads++;
00140
if (logger.
isDebugEnabled())
00141 logger.
debug(
"Request " + request.getId() +
" scheduled for read ("
00142 +
pendingReads +
" pending reads)");
00143
return;
00144 }
00145 }
00146
catch (InterruptedException e)
00147 {
00148
00149
if (logger.
isWarnEnabled())
00150 logger.
warn(
"Request " + request.getId() +
" timed out ("
00151 + request.getTimeout() +
" s)");
00152
throw new SQLException(
"Timeout (" + request.getTimeout()
00153 +
") for request: " + request.getId());
00154 }
00155 }
00156 }
00157
00161 public final void readCompletedNotify(
SelectRequest request)
00162 {
00163
synchronized (
this.readSync)
00164 {
00165
pendingReads--;
00166
if (logger.
isDebugEnabled())
00167 logger.
debug(
"Request " + request.
getId() +
" completed");
00168
if (
pendingReads == 0)
00169 {
00170
if (logger.
isDebugEnabled())
00171 logger.
debug(
"Last read completed, notifying writes");
00172
readSync.notifyAll();
00173 }
00174 }
00175 }
00176
00180 public void scheduleNonSuspendedWriteRequest(
AbstractWriteRequest request)
00181
throws SQLException
00182 {
00183
00184
00185
synchronized (
this.writeSync)
00186 {
00187
synchronized (
this.readSync)
00188 {
00189
if (
pendingReads == 0)
00190 {
00191 request.setId(
requestId++);
00192
if (logger.
isDebugEnabled())
00193 logger.
debug(
"Request " + request.getId()
00194 +
" scheduled for write (" +
getPendingWrites()
00195 +
" pending writes)");
00196
return;
00197 }
00198 }
00199 }
00200
00201
waitForReadCompletion(request);
00202
scheduleNonSuspendedWriteRequest(request);
00203 }
00204
00211 private void waitForReadCompletion(
AbstractWriteRequest request)
00212
throws SQLException
00213 {
00214
synchronized (
this.readSync)
00215 {
00216
00217
try
00218 {
00219
if (logger.
isDebugEnabled())
00220 logger.
debug(
"Request " + request.getId() +
" waiting for "
00221 +
pendingReads +
" pending reads)");
00222
00223
int timeout = request.getTimeout();
00224
if (timeout > 0)
00225 {
00226
long start = System.currentTimeMillis();
00227
00228
long lTimeout = timeout * 1000;
00229
this.readSync.wait(lTimeout);
00230
long end = System.currentTimeMillis();
00231
int remaining = (
int) (lTimeout - (end - start));
00232
if (remaining > 0)
00233 request.setTimeout(remaining);
00234
else
00235 {
00236 String msg =
"Timeout (" + request.getTimeout() +
") for request: "
00237 + request.getId();
00238 logger.
warn(msg);
00239
throw new SQLException(msg);
00240 }
00241 }
00242
else
00243
this.readSync.wait();
00244 }
00245
catch (InterruptedException e)
00246 {
00247
00248
if (logger.
isWarnEnabled())
00249 logger.
warn(
"Request " + request.getId() +
" timed out ("
00250 + request.getTimeout() +
" ms)");
00251
throw new SQLException(
"Timeout (" + request.getTimeout()
00252 +
") for request: " + request.getId());
00253 }
00254 }
00255 }
00256
00260 public final synchronized void notifyWriteCompleted(
00261
AbstractWriteRequest request)
00262 {
00263
synchronized (
this.writeSync)
00264 {
00265
if (logger.
isDebugEnabled())
00266 logger.
debug(
"Request " + request.
getId() +
" completed");
00267
if (
getPendingWrites() == 0)
00268 {
00269
if (logger.
isDebugEnabled())
00270 logger.
debug(
"Last write completed, notifying reads");
00271
writeSync.notifyAll();
00272 }
00273 }
00274 }
00275
00276
00277
00278
00279
00283 protected final void commitTransaction(
long transactionId)
00284 {
00285 }
00286
00290 protected final void rollbackTransaction(
long transactionId)
00291 {
00292 }
00293
00294
00295
00296
00300 public String
getXmlImpl()
00301 {
00302 StringBuffer info =
new StringBuffer();
00303 info.append(
"<" +
DatabasesXmlTags.ELT_RAIDb2Scheduler +
" "
00304 +
DatabasesXmlTags.ATT_level +
"=\"" +
DatabasesXmlTags.VAL_query
00305 +
"\"/>");
00306 info.append(System.getProperty(
"line.separator"));
00307
return info.toString();
00308 }
00309
00310 }