00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.scheduler;
00026
00027 import java.sql.SQLException;
00028
00029 import org.objectweb.cjdbc.common.exceptions.RollbackException;
00030 import org.objectweb.cjdbc.common.i18n.Translate;
00031 import org.objectweb.cjdbc.common.log.Trace;
00032 import org.objectweb.cjdbc.common.sql.AbstractRequest;
00033 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00034 import org.objectweb.cjdbc.common.sql.SelectRequest;
00035 import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema;
00036 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00037 import org.objectweb.cjdbc.common.xml.XmlComponent;
00038 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050 public abstract class AbstractScheduler implements XmlComponent
00051 {
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065 protected int raidbLevel;
00066 protected int parsingGranularity;
00067
00068
00069 private long tid;
00070 private boolean suspendedTransactions = false;
00071 private int pendingTransactions;
00072 private Object transactionSync = new Object();
00073 private Object endOfCurrentTransactions = new Object();
00074
00075
00076 private boolean suspendedWrites = false;
00077 private int pendingWrites;
00078 private Object writesSync = new Object();
00079 private Object endOfCurrentWrites = new Object();
00080
00081 protected static Trace logger = Trace
00082 .getLogger("org.objectweb.cjdbc.controller.scheduler");
00083
00084
00085 private int numberRead = 0;
00086 private int numberWrite = 0;
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099 public AbstractScheduler(int raidbLevel, int parsingGranularity)
00100 {
00101 this.raidbLevel = raidbLevel;
00102 this.parsingGranularity = parsingGranularity;
00103 this.tid = 0;
00104 this.pendingTransactions = 0;
00105 this.pendingWrites = 0;
00106 }
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118 public final void initializeTransactionId(long transactionId)
00119 {
00120 this.tid = transactionId;
00121 }
00122
00123
00124
00125
00126
00127
00128 public final int getParsingGranularity()
00129 {
00130 return parsingGranularity;
00131 }
00132
00133
00134
00135
00136
00137
00138 public final void setParsingGranularity(int parsingGranularity)
00139 {
00140 this.parsingGranularity = parsingGranularity;
00141 }
00142
00143
00144
00145
00146
00147
00148 public final int getPendingWrites()
00149 {
00150 return pendingWrites;
00151 }
00152
00153
00154
00155
00156
00157
00158 public final int getRAIDbLevel()
00159 {
00160 return raidbLevel;
00161 }
00162
00163
00164
00165
00166
00167
00168 public final void setRAIDbLevel(int raidbLevel)
00169 {
00170 this.raidbLevel = raidbLevel;
00171 }
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181 public void setDatabaseSchema(DatabaseSchema dbs)
00182 {
00183 if (logger.isInfoEnabled())
00184 logger.info(Translate.get("scheduler.doesnt.support.schemas"));
00185 }
00186
00187
00188
00189
00190
00191
00192
00193 public void mergeDatabaseSchema(DatabaseSchema dbs)
00194 {
00195 logger.info(Translate.get("scheduler.doesnt.support.schemas"));
00196 }
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210 public abstract void scheduleReadRequest(SelectRequest request)
00211 throws SQLException;
00212
00213
00214
00215
00216
00217
00218 public abstract void readCompletedNotify(SelectRequest request);
00219
00220
00221
00222
00223
00224
00225 public final void readCompleted(SelectRequest request)
00226 {
00227 numberRead++;
00228 this.readCompletedNotify(request);
00229 }
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243 public final void scheduleWriteRequest(AbstractWriteRequest request)
00244 throws SQLException, RollbackException
00245 {
00246 synchronized (writesSync)
00247 {
00248 if (suspendedWrites)
00249 {
00250 try
00251 {
00252
00253 int timeout = request.getTimeout();
00254 if (timeout > 0)
00255 {
00256 long start = System.currentTimeMillis();
00257 long lTimeout = timeout * 1000;
00258 writesSync.wait(lTimeout);
00259 long end = System.currentTimeMillis();
00260 int remaining = (int) (lTimeout - (end - start));
00261 if (remaining > 0)
00262 request.setTimeout(remaining);
00263 else
00264 {
00265 String msg = Translate.get("scheduler.request.timeout",
00266 new String[]{String.valueOf(request.getId()),
00267 String.valueOf(request.getTimeout())});
00268 logger.warn(msg);
00269 throw new SQLException(msg);
00270 }
00271 }
00272 else
00273 this.writesSync.wait();
00274 }
00275 catch (InterruptedException e)
00276 {
00277 String msg = Translate.get("scheduler.request.timeout.failed", e);
00278 logger.warn(msg);
00279 throw new SQLException(msg);
00280 }
00281 }
00282 pendingWrites++;
00283 }
00284 scheduleNonSuspendedWriteRequest(request);
00285 }
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296 public abstract void scheduleNonSuspendedWriteRequest(
00297 AbstractWriteRequest request) throws SQLException, RollbackException;
00298
00299
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311 public final void writeCompleted(AbstractWriteRequest request)
00312 {
00313 synchronized (writesSync)
00314 {
00315 pendingWrites--;
00316
00317 notifyWriteCompleted(request);
00318
00319
00320
00321 if (suspendedWrites && (pendingWrites == 0))
00322 {
00323 synchronized (endOfCurrentWrites)
00324 {
00325 endOfCurrentWrites.notifyAll();
00326 }
00327 }
00328 }
00329 numberWrite++;
00330 }
00331
00332
00333
00334
00335
00336
00337
00338
00339 public abstract void notifyWriteCompleted(AbstractWriteRequest request);
00340
00341
00342
00343
00344
00345
00346
00347
00348 protected boolean hasSQLMacros(AbstractRequest request)
00349 {
00350 String lower = request.getSQL().toLowerCase();
00351 if (lower.indexOf("now()") > 0)
00352 return true;
00353 if (lower.indexOf("rand()") > 0)
00354 return true;
00355 return false;
00356 }
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371 public final long begin(TransactionMarkerMetaData tm) throws SQLException
00372 {
00373
00374 synchronized (writesSync)
00375 {
00376 if (suspendedWrites)
00377 {
00378 try
00379 {
00380
00381 long timeout = tm.getTimeout();
00382 if (timeout > 0)
00383 {
00384 long start = System.currentTimeMillis();
00385 writesSync.wait(timeout);
00386 long end = System.currentTimeMillis();
00387 long remaining = timeout - (end - start);
00388 if (remaining > 0)
00389 tm.setTimeout(remaining);
00390 else
00391 {
00392 String msg = Translate.get("scheduler.begin.timeout.writeSync");
00393 logger.warn(msg);
00394 throw new SQLException(msg);
00395 }
00396 }
00397 else
00398 writesSync.wait();
00399 }
00400 catch (InterruptedException e)
00401 {
00402 String msg = Translate.get("scheduler.begin.timeout.writeSync")
00403 + " (" + e + ")";
00404 logger.error(msg);
00405 throw new SQLException(msg);
00406 }
00407 }
00408 pendingWrites++;
00409 }
00410
00411
00412 synchronized (transactionSync)
00413 {
00414 if (suspendedTransactions)
00415 try
00416 {
00417
00418 long timeout = tm.getTimeout();
00419 if (timeout > 0)
00420 {
00421 long start = System.currentTimeMillis();
00422 transactionSync.wait(timeout);
00423 long end = System.currentTimeMillis();
00424 long remaining = timeout - (end - start);
00425 if (remaining > 0)
00426 tm.setTimeout(remaining);
00427 else
00428 {
00429 String msg = Translate
00430 .get("scheduler.begin.timeout.transactionSync");
00431 logger.warn(msg);
00432 throw new SQLException(msg);
00433 }
00434 }
00435 else
00436 transactionSync.wait();
00437 }
00438 catch (InterruptedException e)
00439 {
00440 String msg = Translate.get("scheduler.begin.timeout.transactionSync")
00441 + " (" + e + ")";
00442 logger.error(msg);
00443 throw new SQLException(msg);
00444 }
00445 tid++;
00446 pendingTransactions++;
00447 return tid;
00448 }
00449 }
00450
00451
00452
00453
00454
00455
00456 public final void beginCompleted(long transactionId)
00457 {
00458
00459 synchronized (writesSync)
00460 {
00461 pendingWrites--;
00462
00463
00464
00465 if (suspendedWrites && (pendingWrites == 0))
00466 {
00467 synchronized (endOfCurrentWrites)
00468 {
00469 endOfCurrentWrites.notifyAll();
00470 }
00471 }
00472 }
00473 }
00474
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484 public final void commit(TransactionMarkerMetaData tm) throws SQLException
00485 {
00486
00487 synchronized (writesSync)
00488 {
00489 if (suspendedWrites)
00490 {
00491 try
00492 {
00493
00494 long timeout = tm.getTimeout();
00495 if (timeout > 0)
00496 {
00497 long start = System.currentTimeMillis();
00498 writesSync.wait(timeout);
00499 long end = System.currentTimeMillis();
00500 long remaining = timeout - (end - start);
00501 if (remaining > 0)
00502 tm.setTimeout(remaining);
00503 else
00504 {
00505 String msg = Translate.get("scheduler.commit.timeout.writeSync");
00506 logger.warn(msg);
00507 throw new SQLException(msg);
00508 }
00509 }
00510 else
00511 writesSync.wait();
00512 }
00513 catch (InterruptedException e)
00514 {
00515 String msg = Translate.get("scheduler.commit.timeout.writeSync")
00516 + " (" + e + ")";
00517 logger.error(msg);
00518 throw new SQLException(msg);
00519 }
00520 }
00521 pendingWrites++;
00522 }
00523 commitTransaction(tm.getTransactionId());
00524 }
00525
00526
00527
00528
00529
00530
00531
00532 protected abstract void commitTransaction(long transactionId);
00533
00534
00535
00536
00537
00538
00539 public final void commitCompleted(long transactionId)
00540 {
00541
00542 synchronized (transactionSync)
00543 {
00544 pendingTransactions--;
00545
00546
00547
00548
00549 if (suspendedTransactions && (pendingTransactions == 0))
00550 {
00551 synchronized (endOfCurrentTransactions)
00552 {
00553 endOfCurrentTransactions.notifyAll();
00554 }
00555 }
00556 }
00557
00558 synchronized (writesSync)
00559 {
00560 pendingWrites--;
00561
00562
00563
00564 if (suspendedWrites && (pendingWrites == 0))
00565 {
00566 synchronized (endOfCurrentWrites)
00567 {
00568 endOfCurrentWrites.notifyAll();
00569 }
00570 }
00571 }
00572 }
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583 public final void rollback(TransactionMarkerMetaData tm) throws SQLException
00584 {
00585
00586 synchronized (writesSync)
00587 {
00588 if (suspendedWrites)
00589 {
00590 try
00591 {
00592
00593 long timeout = tm.getTimeout();
00594 if (timeout > 0)
00595 {
00596 long start = System.currentTimeMillis();
00597 writesSync.wait(timeout);
00598 long end = System.currentTimeMillis();
00599 long remaining = timeout - (end - start);
00600 if (remaining > 0)
00601 tm.setTimeout(remaining);
00602 else
00603 {
00604 String msg = Translate
00605 .get("scheduler.rollback.timeout.writeSync");
00606 logger.warn(msg);
00607 throw new SQLException(msg);
00608 }
00609 }
00610 else
00611 writesSync.wait();
00612 }
00613 catch (InterruptedException e)
00614 {
00615 String msg = Translate.get("scheduler.rollback.timeout.writeSync")
00616 + " (" + e + ")";
00617 logger.error(msg);
00618 throw new SQLException(msg);
00619 }
00620 }
00621 pendingWrites++;
00622 }
00623 rollbackTransaction(tm.getTransactionId());
00624 }
00625
00626
00627
00628
00629
00630
00631
00632 protected abstract void rollbackTransaction(long transactionId);
00633
00634
00635
00636
00637
00638
00639 public final void rollbackCompleted(long transactionId)
00640 {
00641
00642 synchronized (transactionSync)
00643 {
00644 pendingTransactions--;
00645
00646
00647
00648
00649 if (suspendedTransactions && (pendingTransactions == 0))
00650 {
00651 synchronized (endOfCurrentTransactions)
00652 {
00653 endOfCurrentTransactions.notifyAll();
00654 }
00655 }
00656 }
00657
00658 synchronized (writesSync)
00659 {
00660 pendingWrites--;
00661
00662
00663
00664 if (suspendedWrites && (pendingWrites == 0))
00665 {
00666 synchronized (endOfCurrentWrites)
00667 {
00668 endOfCurrentWrites.notifyAll();
00669 }
00670 }
00671 }
00672 }
00673
00674
00675
00676
00677
00678
00679
00680
00681
00682
00683
00684
00685
00686
00687
00688 public final void suspendNewTransactionsForCheckpoint() throws SQLException
00689 {
00690 synchronized (transactionSync)
00691 {
00692 suspendedTransactions = true;
00693 if (pendingTransactions == 0)
00694 return;
00695 }
00696
00697 synchronized (endOfCurrentTransactions)
00698 {
00699
00700
00701
00702
00703
00704 if (pendingTransactions == 0)
00705 return;
00706
00707
00708 try
00709 {
00710 endOfCurrentTransactions.wait();
00711 }
00712 catch (InterruptedException e)
00713 {
00714 String msg = Translate.get("scheduler.suspend.transaction.failed", e);
00715 logger.error(msg);
00716 throw new SQLException(msg);
00717 }
00718 }
00719 }
00720
00721
00722
00723
00724
00725
00726
00727 public final void resumeNewTransactions()
00728 {
00729 synchronized (transactionSync)
00730 {
00731 suspendedTransactions = false;
00732
00733 transactionSync.notifyAll();
00734 }
00735 }
00736
00737
00738
00739
00740
00741
00742
00743
00744
00745
00746 public void suspendWrites() throws SQLException
00747 {
00748 synchronized (writesSync)
00749 {
00750 suspendedWrites = true;
00751 if (pendingWrites == 0)
00752 return;
00753 }
00754
00755 synchronized (endOfCurrentWrites)
00756 {
00757
00758
00759
00760
00761
00762 if (pendingWrites == 0)
00763 return;
00764
00765
00766 try
00767 {
00768 endOfCurrentWrites.wait();
00769 }
00770 catch (InterruptedException e)
00771 {
00772 String msg = Translate.get("scheduler.suspend.writes.failed", e);
00773 logger.error(msg);
00774 throw new SQLException(msg);
00775 }
00776 }
00777 }
00778
00779
00780
00781
00782
00783
00784
00785 public void resumeWrites()
00786 {
00787 synchronized (writesSync)
00788 {
00789 suspendedWrites = false;
00790
00791 writesSync.notifyAll();
00792 }
00793 }
00794
00795
00796
00797
00798
00799 protected abstract String getXmlImpl();
00800
00801
00802
00803
00804
00805
00806 public String getXml()
00807 {
00808 StringBuffer info = new StringBuffer();
00809 info.append("<" + DatabasesXmlTags.ELT_RequestScheduler + ">");
00810 info.append(this.getXmlImpl());
00811 info.append("</" + DatabasesXmlTags.ELT_RequestScheduler + ">");
00812 return info.toString();
00813 }
00814
00815
00816
00817
00818
00819
00820 public String[] getSchedulerData()
00821 {
00822 String[] data = new String[7];
00823 data[0] = "" + numberRead;
00824 data[1] = "" + numberWrite;
00825 data[2] = "" + pendingTransactions;
00826 data[3] = "" + pendingWrites;
00827 data[4] = "" + numberRead + numberWrite;
00828 data[5] = (suspendedTransactions) ? "1" : "0";
00829 data[6] = (suspendedWrites) ? "1" : "0";
00830 return data;
00831 }
00832
00833
00834
00835
00836 public int getNumberRead()
00837 {
00838 return numberRead;
00839 }
00840
00841
00842
00843
00844 public int getNumberWrite()
00845 {
00846 return numberWrite;
00847 }
00848
00849
00850
00851
00852 public int getPendingTransactions()
00853 {
00854 return pendingTransactions;
00855 }
00856
00857
00858
00859 public boolean isSuspendedTransactions()
00860 {
00861 return suspendedTransactions;
00862 }
00863
00864
00865
00866 public boolean isSuspendedWrites()
00867 {
00868 return suspendedWrites;
00869 }
00870 }