00001
00025 package org.objectweb.cjdbc.controller.scheduler;
00026
00027
import java.sql.SQLException;
00028
00029
import org.objectweb.cjdbc.common.exceptions.RollbackException;
00030
import org.objectweb.cjdbc.common.i18n.Translate;
00031
import org.objectweb.cjdbc.common.log.Trace;
00032
import org.objectweb.cjdbc.common.sql.AbstractRequest;
00033
import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00034
import org.objectweb.cjdbc.common.sql.SelectRequest;
00035
import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema;
00036
import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00037
import org.objectweb.cjdbc.common.xml.XmlComponent;
00038
import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00039
00050 public abstract class AbstractScheduler implements XmlComponent
00051 {
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065 protected int raidbLevel;
00066 protected int parsingGranularity;
00067
00068
00069 private long tid;
00070 private boolean suspendedTransactions =
false;
00071 private int pendingTransactions;
00072 private Object
transactionSync =
new Object();
00073 private Object
endOfCurrentTransactions =
new Object();
00074
00075
00076 private boolean suspendedWrites =
false;
00077 private int pendingWrites;
00078 private Object
writesSync =
new Object();
00079 private Object
endOfCurrentWrites =
new Object();
00080
00081 protected static Trace
logger = Trace
00082 .getLogger(
"org.objectweb.cjdbc.controller.scheduler");
00083
00084
00085 private int numberRead = 0;
00086 private int numberWrite = 0;
00087
00088
00089
00090
00091
00099 public AbstractScheduler(
int raidbLevel,
int parsingGranularity)
00100 {
00101
this.raidbLevel = raidbLevel;
00102
this.parsingGranularity = parsingGranularity;
00103
this.tid = 0;
00104
this.pendingTransactions = 0;
00105
this.pendingWrites = 0;
00106 }
00107
00108
00109
00110
00111
00118 public final void initializeTransactionId(
long transactionId)
00119 {
00120
this.tid = transactionId;
00121 }
00122
00128 public final int getParsingGranularity()
00129 {
00130
return parsingGranularity;
00131 }
00132
00138 public final void setParsingGranularity(
int parsingGranularity)
00139 {
00140
this.parsingGranularity = parsingGranularity;
00141 }
00142
00148 public final int getPendingWrites()
00149 {
00150
return pendingWrites;
00151 }
00152
00158 public final int getRAIDbLevel()
00159 {
00160
return raidbLevel;
00161 }
00162
00168 public final void setRAIDbLevel(
int raidbLevel)
00169 {
00170
this.raidbLevel = raidbLevel;
00171 }
00172
00181 public void setDatabaseSchema(
DatabaseSchema dbs)
00182 {
00183
if (
logger.isInfoEnabled())
00184
logger.info(
Translate.get(
"scheduler.doesnt.support.schemas"));
00185 }
00186
00193 public void mergeDatabaseSchema(
DatabaseSchema dbs)
00194 {
00195
logger.info(
Translate.get(
"scheduler.doesnt.support.schemas"));
00196 }
00197
00198
00199
00200
00201
00210
public abstract void scheduleReadRequest(
SelectRequest request)
00211
throws SQLException;
00212
00218
public abstract void readCompletedNotify(
SelectRequest request);
00219
00225 public final void readCompleted(
SelectRequest request)
00226 {
00227
numberRead++;
00228
this.readCompletedNotify(request);
00229 }
00230
00243 public final void scheduleWriteRequest(
AbstractWriteRequest request)
00244
throws SQLException,
RollbackException
00245 {
00246
synchronized (
writesSync)
00247 {
00248
if (
suspendedWrites)
00249 {
00250
try
00251 {
00252
00253
int timeout = request.getTimeout();
00254
if (timeout > 0)
00255 {
00256
long start = System.currentTimeMillis();
00257
long lTimeout = timeout * 1000;
00258
writesSync.wait(lTimeout);
00259
long end = System.currentTimeMillis();
00260
int remaining = (
int) (lTimeout - (end - start));
00261
if (remaining > 0)
00262 request.setTimeout(remaining);
00263
else
00264 {
00265 String msg =
Translate.get(
"scheduler.request.timeout",
00266
new String[]{String.valueOf(request.getId()),
00267 String.valueOf(request.getTimeout())});
00268
logger.warn(msg);
00269
throw new SQLException(msg);
00270 }
00271 }
00272
else
00273
this.writesSync.wait();
00274 }
00275
catch (InterruptedException e)
00276 {
00277 String msg =
Translate.get(
"scheduler.request.timeout.failed", e);
00278
logger.warn(msg);
00279
throw new SQLException(msg);
00280 }
00281 }
00282
pendingWrites++;
00283 }
00284
scheduleNonSuspendedWriteRequest(request);
00285 }
00286
00296
public abstract void scheduleNonSuspendedWriteRequest(
00297 AbstractWriteRequest request)
throws SQLException, RollbackException;
00298
00311 public final void writeCompleted(
AbstractWriteRequest request)
00312 {
00313
synchronized (writesSync)
00314 {
00315 pendingWrites--;
00316
00317 notifyWriteCompleted(request);
00318
00319
00320
00321
if (suspendedWrites && (pendingWrites == 0))
00322 {
00323
synchronized (endOfCurrentWrites)
00324 {
00325 endOfCurrentWrites.notifyAll();
00326 }
00327 }
00328 }
00329 numberWrite++;
00330 }
00331
00339
public abstract void notifyWriteCompleted(
AbstractWriteRequest request);
00340
00348 protected boolean hasSQLMacros(
AbstractRequest request)
00349 {
00350 String lower = request.
getSQL().toLowerCase();
00351
if (lower.indexOf(
"now()") > 0)
00352
return true;
00353
if (lower.indexOf(
"rand()") > 0)
00354
return true;
00355
return false;
00356 }
00357
00358
00359
00360
00361
00371 public final long begin(
TransactionMarkerMetaData tm)
throws SQLException
00372 {
00373
00374
synchronized (writesSync)
00375 {
00376
if (suspendedWrites)
00377 {
00378
try
00379 {
00380
00381
long timeout = tm.getTimeout();
00382
if (timeout > 0)
00383 {
00384
long start = System.currentTimeMillis();
00385 writesSync.wait(timeout);
00386
long end = System.currentTimeMillis();
00387
long remaining = timeout - (end - start);
00388
if (remaining > 0)
00389 tm.setTimeout(remaining);
00390
else
00391 {
00392 String msg =
Translate.get(
"scheduler.begin.timeout.writeSync");
00393 logger.warn(msg);
00394
throw new SQLException(msg);
00395 }
00396 }
00397
else
00398 writesSync.wait();
00399 }
00400
catch (InterruptedException e)
00401 {
00402 String msg =
Translate.get(
"scheduler.begin.timeout.writeSync")
00403 +
" (" + e +
")";
00404 logger.error(msg);
00405
throw new SQLException(msg);
00406 }
00407 }
00408 pendingWrites++;
00409 }
00410
00411
00412
synchronized (transactionSync)
00413 {
00414
if (suspendedTransactions)
00415
try
00416 {
00417
00418
long timeout = tm.getTimeout();
00419
if (timeout > 0)
00420 {
00421
long start = System.currentTimeMillis();
00422 transactionSync.wait(timeout);
00423
long end = System.currentTimeMillis();
00424
long remaining = timeout - (end - start);
00425
if (remaining > 0)
00426 tm.setTimeout(remaining);
00427
else
00428 {
00429 String msg =
Translate
00430 .get(
"scheduler.begin.timeout.transactionSync");
00431 logger.warn(msg);
00432
throw new SQLException(msg);
00433 }
00434 }
00435
else
00436 transactionSync.wait();
00437 }
00438
catch (InterruptedException e)
00439 {
00440 String msg =
Translate.get(
"scheduler.begin.timeout.transactionSync")
00441 +
" (" + e +
")";
00442 logger.error(msg);
00443
throw new SQLException(msg);
00444 }
00445 tid++;
00446 pendingTransactions++;
00447
return tid;
00448 }
00449 }
00450
00456 public final void beginCompleted(
long transactionId)
00457 {
00458
00459
synchronized (writesSync)
00460 {
00461 pendingWrites--;
00462
00463
00464
00465
if (suspendedWrites && (pendingWrites == 0))
00466 {
00467
synchronized (endOfCurrentWrites)
00468 {
00469 endOfCurrentWrites.notifyAll();
00470 }
00471 }
00472 }
00473 }
00474
00484 public final void commit(
TransactionMarkerMetaData tm)
throws SQLException
00485 {
00486
00487
synchronized (writesSync)
00488 {
00489
if (suspendedWrites)
00490 {
00491
try
00492 {
00493
00494
long timeout = tm.getTimeout();
00495
if (timeout > 0)
00496 {
00497
long start = System.currentTimeMillis();
00498 writesSync.wait(timeout);
00499
long end = System.currentTimeMillis();
00500
long remaining = timeout - (end - start);
00501
if (remaining > 0)
00502 tm.setTimeout(remaining);
00503
else
00504 {
00505 String msg =
Translate.get(
"scheduler.commit.timeout.writeSync");
00506 logger.warn(msg);
00507
throw new SQLException(msg);
00508 }
00509 }
00510
else
00511 writesSync.wait();
00512 }
00513
catch (InterruptedException e)
00514 {
00515 String msg =
Translate.get(
"scheduler.commit.timeout.writeSync")
00516 +
" (" + e +
")";
00517 logger.error(msg);
00518
throw new SQLException(msg);
00519 }
00520 }
00521 pendingWrites++;
00522 }
00523 commitTransaction(tm.getTransactionId());
00524 }
00525
00532
protected abstract void commitTransaction(
long transactionId);
00533
00539 public final void commitCompleted(
long transactionId)
00540 {
00541
00542
synchronized (transactionSync)
00543 {
00544 pendingTransactions--;
00545
00546
00547
00548
00549
if (suspendedTransactions && (pendingTransactions == 0))
00550 {
00551
synchronized (endOfCurrentTransactions)
00552 {
00553 endOfCurrentTransactions.notifyAll();
00554 }
00555 }
00556 }
00557
00558
synchronized (writesSync)
00559 {
00560 pendingWrites--;
00561
00562
00563
00564
if (suspendedWrites && (pendingWrites == 0))
00565 {
00566
synchronized (endOfCurrentWrites)
00567 {
00568 endOfCurrentWrites.notifyAll();
00569 }
00570 }
00571 }
00572 }
00573
00583 public final void rollback(
TransactionMarkerMetaData tm)
throws SQLException
00584 {
00585
00586
synchronized (writesSync)
00587 {
00588
if (suspendedWrites)
00589 {
00590
try
00591 {
00592
00593
long timeout = tm.getTimeout();
00594
if (timeout > 0)
00595 {
00596
long start = System.currentTimeMillis();
00597 writesSync.wait(timeout);
00598
long end = System.currentTimeMillis();
00599
long remaining = timeout - (end - start);
00600
if (remaining > 0)
00601 tm.setTimeout(remaining);
00602
else
00603 {
00604 String msg =
Translate
00605 .get(
"scheduler.rollback.timeout.writeSync");
00606 logger.warn(msg);
00607
throw new SQLException(msg);
00608 }
00609 }
00610
else
00611 writesSync.wait();
00612 }
00613
catch (InterruptedException e)
00614 {
00615 String msg =
Translate.get(
"scheduler.rollback.timeout.writeSync")
00616 +
" (" + e +
")";
00617 logger.error(msg);
00618
throw new SQLException(msg);
00619 }
00620 }
00621 pendingWrites++;
00622 }
00623 rollbackTransaction(tm.getTransactionId());
00624 }
00625
00632
protected abstract void rollbackTransaction(
long transactionId);
00633
00639 public final void rollbackCompleted(
long transactionId)
00640 {
00641
00642
synchronized (transactionSync)
00643 {
00644 pendingTransactions--;
00645
00646
00647
00648
00649
if (suspendedTransactions && (pendingTransactions == 0))
00650 {
00651
synchronized (endOfCurrentTransactions)
00652 {
00653 endOfCurrentTransactions.notifyAll();
00654 }
00655 }
00656 }
00657
00658
synchronized (writesSync)
00659 {
00660 pendingWrites--;
00661
00662
00663
00664
if (suspendedWrites && (pendingWrites == 0))
00665 {
00666
synchronized (endOfCurrentWrites)
00667 {
00668 endOfCurrentWrites.notifyAll();
00669 }
00670 }
00671 }
00672 }
00673
00674
00675
00676
00677
00688 public final void suspendNewTransactionsForCheckpoint() throws SQLException
00689 {
00690
synchronized (transactionSync)
00691 {
00692 suspendedTransactions =
true;
00693
if (pendingTransactions == 0)
00694
return;
00695 }
00696
00697
synchronized (endOfCurrentTransactions)
00698 {
00699
00700
00701
00702
00703
00704
if (pendingTransactions == 0)
00705
return;
00706
00707
00708
try
00709 {
00710 endOfCurrentTransactions.wait();
00711 }
00712
catch (InterruptedException e)
00713 {
00714 String msg =
Translate.get(
"scheduler.suspend.transaction.failed", e);
00715 logger.error(msg);
00716
throw new SQLException(msg);
00717 }
00718 }
00719 }
00720
00727 public final void resumeNewTransactions()
00728 {
00729
synchronized (transactionSync)
00730 {
00731 suspendedTransactions =
false;
00732
00733 transactionSync.notifyAll();
00734 }
00735 }
00736
00746 public void suspendWrites() throws SQLException
00747 {
00748
synchronized (writesSync)
00749 {
00750 suspendedWrites =
true;
00751
if (pendingWrites == 0)
00752
return;
00753 }
00754
00755
synchronized (endOfCurrentWrites)
00756 {
00757
00758
00759
00760
00761
00762
if (pendingWrites == 0)
00763
return;
00764
00765
00766
try
00767 {
00768 endOfCurrentWrites.wait();
00769 }
00770
catch (InterruptedException e)
00771 {
00772 String msg =
Translate.get(
"scheduler.suspend.writes.failed", e);
00773 logger.error(msg);
00774
throw new SQLException(msg);
00775 }
00776 }
00777 }
00778
00785 public void resumeWrites()
00786 {
00787
synchronized (writesSync)
00788 {
00789 suspendedWrites =
false;
00790
00791 writesSync.notifyAll();
00792 }
00793 }
00794
00795
00796
00797
00798
00799
protected abstract String getXmlImpl();
00800
00806 public String getXml()
00807 {
00808 StringBuffer info =
new StringBuffer();
00809 info.append(
"<" +
DatabasesXmlTags.ELT_RequestScheduler +
">");
00810 info.append(
this.getXmlImpl());
00811 info.append(
"</" +
DatabasesXmlTags.ELT_RequestScheduler +
">");
00812
return info.toString();
00813 }
00814
00820 public String[] getSchedulerData()
00821 {
00822 String[] data =
new String[7];
00823 data[0] =
"" + numberRead;
00824 data[1] =
"" + numberWrite;
00825 data[2] =
"" + pendingTransactions;
00826 data[3] =
"" + pendingWrites;
00827 data[4] =
"" + numberRead + numberWrite;
00828 data[5] = (suspendedTransactions) ?
"1" :
"0";
00829 data[6] = (suspendedWrites) ?
"1" :
"0";
00830
return data;
00831 }
00832
00836 public int getNumberRead()
00837 {
00838
return numberRead;
00839 }
00840
00844 public int getNumberWrite()
00845 {
00846
return numberWrite;
00847 }
00848
00852 public int getPendingTransactions()
00853 {
00854
return pendingTransactions;
00855 }
00856 }