src/org/objectweb/cjdbc/controller/scheduler/AbstractScheduler.java

説明を見る。
00001 00025 package org.objectweb.cjdbc.controller.scheduler; 00026 00027 import java.sql.SQLException; 00028 00029 import org.objectweb.cjdbc.common.exceptions.RollbackException; 00030 import org.objectweb.cjdbc.common.i18n.Translate; 00031 import org.objectweb.cjdbc.common.log.Trace; 00032 import org.objectweb.cjdbc.common.sql.AbstractRequest; 00033 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 00034 import org.objectweb.cjdbc.common.sql.SelectRequest; 00035 import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema; 00036 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 00037 import org.objectweb.cjdbc.common.xml.XmlComponent; 00038 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData; 00039 00050 public abstract class AbstractScheduler implements XmlComponent 00051 { 00052 00053 // 00054 // How the code is organized ? 00055 // 00056 // 1. Member variables 00057 // 2. Constructor 00058 // 3. Getter/Setter (possibly in alphabetical order) 00059 // 4. Request handling 00060 // 5. Transaction management 00061 // 6. Checkpoint management 00062 // 7. Debug/Monitoring 00063 // 00064 00065 protected int raidbLevel; 00066 protected int parsingGranularity; 00067 00068 // Transaction management 00069 private long tid; 00070 private boolean suspendedTransactions = false; 00071 private int pendingTransactions; 00072 private Object transactionSync = new Object(); 00073 private Object endOfCurrentTransactions = new Object(); 00074 00075 // Writes management 00076 private boolean suspendedWrites = false; 00077 private int pendingWrites; 00078 private Object writesSync = new Object(); 00079 private Object endOfCurrentWrites = new Object(); 00080 00081 protected static Trace logger = Trace 00082 .getLogger("org.objectweb.cjdbc.controller.scheduler"); 00083 00084 // Monitoring values 00085 private int numberRead = 0; 00086 private int numberWrite = 0; 00087 00088 // 00089 // Constructor 00090 // 00091 00099 public AbstractScheduler(int raidbLevel, int parsingGranularity) 00100 { 00101 this.raidbLevel = raidbLevel; 00102 this.parsingGranularity = parsingGranularity; 00103 this.tid = 0; 00104 this.pendingTransactions = 0; 00105 this.pendingWrites = 0; 00106 } 00107 00108 // 00109 // Getter/Setter methods 00110 // 00111 00118 public final void initializeTransactionId(long transactionId) 00119 { 00120 this.tid = transactionId; 00121 } 00122 00128 public final int getParsingGranularity() 00129 { 00130 return parsingGranularity; 00131 } 00132 00138 public final void setParsingGranularity(int parsingGranularity) 00139 { 00140 this.parsingGranularity = parsingGranularity; 00141 } 00142 00148 public final int getPendingWrites() 00149 { 00150 return pendingWrites; 00151 } 00152 00158 public final int getRAIDbLevel() 00159 { 00160 return raidbLevel; 00161 } 00162 00168 public final void setRAIDbLevel(int raidbLevel) 00169 { 00170 this.raidbLevel = raidbLevel; 00171 } 00172 00181 public void setDatabaseSchema(DatabaseSchema dbs) 00182 { 00183 if (logger.isInfoEnabled()) 00184 logger.info(Translate.get("scheduler.doesnt.support.schemas")); 00185 } 00186 00193 public void mergeDatabaseSchema(DatabaseSchema dbs) 00194 { 00195 logger.info(Translate.get("scheduler.doesnt.support.schemas")); 00196 } 00197 00198 // 00199 // Request Scheduling 00200 // 00201 00210 public abstract void scheduleReadRequest(SelectRequest request) 00211 throws SQLException; 00212 00218 public abstract void readCompletedNotify(SelectRequest request); 00219 00225 public final void readCompleted(SelectRequest request) 00226 { 00227 numberRead++; 00228 this.readCompletedNotify(request); 00229 } 00230 00243 public final void scheduleWriteRequest(AbstractWriteRequest request) 00244 throws SQLException, RollbackException 00245 { 00246 synchronized (writesSync) 00247 { 00248 if (suspendedWrites) 00249 { 00250 try 00251 { 00252 // Wait on writesSync 00253 int timeout = request.getTimeout(); 00254 if (timeout > 0) 00255 { 00256 long start = System.currentTimeMillis(); 00257 long lTimeout = timeout * 1000; 00258 writesSync.wait(lTimeout); 00259 long end = System.currentTimeMillis(); 00260 int remaining = (int) (lTimeout - (end - start)); 00261 if (remaining > 0) 00262 request.setTimeout(remaining); 00263 else 00264 { 00265 String msg = Translate.get("scheduler.request.timeout", 00266 new String[]{String.valueOf(request.getId()), 00267 String.valueOf(request.getTimeout())}); 00268 logger.warn(msg); 00269 throw new SQLException(msg); 00270 } 00271 } 00272 else 00273 this.writesSync.wait(); 00274 } 00275 catch (InterruptedException e) 00276 { 00277 String msg = Translate.get("scheduler.request.timeout.failed", e); 00278 logger.warn(msg); 00279 throw new SQLException(msg); 00280 } 00281 } 00282 pendingWrites++; 00283 } 00284 scheduleNonSuspendedWriteRequest(request); 00285 } 00286 00296 public abstract void scheduleNonSuspendedWriteRequest( 00297 AbstractWriteRequest request) throws SQLException, RollbackException; 00298 00311 public final void writeCompleted(AbstractWriteRequest request) 00312 { 00313 synchronized (writesSync) 00314 { 00315 pendingWrites--; 00316 00317 notifyWriteCompleted(request); 00318 00319 // It this is the last write to complete and writes are 00320 // suspended we have to notify suspendedWrites() 00321 if (suspendedWrites && (pendingWrites == 0)) 00322 { 00323 synchronized (endOfCurrentWrites) 00324 { 00325 endOfCurrentWrites.notifyAll(); 00326 } 00327 } 00328 } 00329 numberWrite++; 00330 } 00331 00339 public abstract void notifyWriteCompleted(AbstractWriteRequest request); 00340 00348 protected boolean hasSQLMacros(AbstractRequest request) 00349 { 00350 String lower = request.getSQL().toLowerCase(); 00351 if (lower.indexOf("now()") > 0) 00352 return true; 00353 if (lower.indexOf("rand()") > 0) 00354 return true; 00355 return false; 00356 } 00357 00358 // 00359 // Transaction management 00360 // 00361 00371 public final long begin(TransactionMarkerMetaData tm) throws SQLException 00372 { 00373 // Check if writes are suspended 00374 synchronized (writesSync) 00375 { 00376 if (suspendedWrites) 00377 { 00378 try 00379 { 00380 // Wait on writesSync 00381 long timeout = tm.getTimeout(); 00382 if (timeout > 0) 00383 { 00384 long start = System.currentTimeMillis(); 00385 writesSync.wait(timeout); 00386 long end = System.currentTimeMillis(); 00387 long remaining = timeout - (end - start); 00388 if (remaining > 0) 00389 tm.setTimeout(remaining); 00390 else 00391 { 00392 String msg = Translate.get("scheduler.begin.timeout.writeSync"); 00393 logger.warn(msg); 00394 throw new SQLException(msg); 00395 } 00396 } 00397 else 00398 writesSync.wait(); 00399 } 00400 catch (InterruptedException e) 00401 { 00402 String msg = Translate.get("scheduler.begin.timeout.writeSync") 00403 + " (" + e + ")"; 00404 logger.error(msg); 00405 throw new SQLException(msg); 00406 } 00407 } 00408 pendingWrites++; 00409 } 00410 00411 // Check if transactions are suspended 00412 synchronized (transactionSync) 00413 { 00414 if (suspendedTransactions) 00415 try 00416 { 00417 // Wait on transactionSync 00418 long timeout = tm.getTimeout(); 00419 if (timeout > 0) 00420 { 00421 long start = System.currentTimeMillis(); 00422 transactionSync.wait(timeout); 00423 long end = System.currentTimeMillis(); 00424 long remaining = timeout - (end - start); 00425 if (remaining > 0) 00426 tm.setTimeout(remaining); 00427 else 00428 { 00429 String msg = Translate 00430 .get("scheduler.begin.timeout.transactionSync"); 00431 logger.warn(msg); 00432 throw new SQLException(msg); 00433 } 00434 } 00435 else 00436 transactionSync.wait(); 00437 } 00438 catch (InterruptedException e) 00439 { 00440 String msg = Translate.get("scheduler.begin.timeout.transactionSync") 00441 + " (" + e + ")"; 00442 logger.error(msg); 00443 throw new SQLException(msg); 00444 } 00445 tid++; 00446 pendingTransactions++; 00447 return tid; 00448 } 00449 } 00450 00456 public final void beginCompleted(long transactionId) 00457 { 00458 // Take care of suspended write 00459 synchronized (writesSync) 00460 { 00461 pendingWrites--; 00462 00463 // It this is the last write to complete and writes are 00464 // suspended we have to notify suspendedWrites() 00465 if (suspendedWrites && (pendingWrites == 0)) 00466 { 00467 synchronized (endOfCurrentWrites) 00468 { 00469 endOfCurrentWrites.notifyAll(); 00470 } 00471 } 00472 } 00473 } 00474 00484 public final void commit(TransactionMarkerMetaData tm) throws SQLException 00485 { 00486 // Check if writes are suspended 00487 synchronized (writesSync) 00488 { 00489 if (suspendedWrites) 00490 { 00491 try 00492 { 00493 // Wait on writesSync 00494 long timeout = tm.getTimeout(); 00495 if (timeout > 0) 00496 { 00497 long start = System.currentTimeMillis(); 00498 writesSync.wait(timeout); 00499 long end = System.currentTimeMillis(); 00500 long remaining = timeout - (end - start); 00501 if (remaining > 0) 00502 tm.setTimeout(remaining); 00503 else 00504 { 00505 String msg = Translate.get("scheduler.commit.timeout.writeSync"); 00506 logger.warn(msg); 00507 throw new SQLException(msg); 00508 } 00509 } 00510 else 00511 writesSync.wait(); 00512 } 00513 catch (InterruptedException e) 00514 { 00515 String msg = Translate.get("scheduler.commit.timeout.writeSync") 00516 + " (" + e + ")"; 00517 logger.error(msg); 00518 throw new SQLException(msg); 00519 } 00520 } 00521 pendingWrites++; 00522 } 00523 commitTransaction(tm.getTransactionId()); 00524 } 00525 00532 protected abstract void commitTransaction(long transactionId); 00533 00539 public final void commitCompleted(long transactionId) 00540 { 00541 // Take care of suspended transactions 00542 synchronized (transactionSync) 00543 { 00544 pendingTransactions--; 00545 00546 // If it is the last pending transaction to complete and we 00547 // are waiting for pending transactions to complete, then wake 00548 // up suspendNewTransactionsForCheckpoint() 00549 if (suspendedTransactions && (pendingTransactions == 0)) 00550 { 00551 synchronized (endOfCurrentTransactions) 00552 { 00553 endOfCurrentTransactions.notifyAll(); 00554 } 00555 } 00556 } 00557 // Take care of suspended write 00558 synchronized (writesSync) 00559 { 00560 pendingWrites--; 00561 00562 // It this is the last write to complete and writes are 00563 // suspended we have to notify suspendedWrites() 00564 if (suspendedWrites && (pendingWrites == 0)) 00565 { 00566 synchronized (endOfCurrentWrites) 00567 { 00568 endOfCurrentWrites.notifyAll(); 00569 } 00570 } 00571 } 00572 } 00573 00583 public final void rollback(TransactionMarkerMetaData tm) throws SQLException 00584 { 00585 // Check if writes are suspended 00586 synchronized (writesSync) 00587 { 00588 if (suspendedWrites) 00589 { 00590 try 00591 { 00592 // Wait on writesSync 00593 long timeout = tm.getTimeout(); 00594 if (timeout > 0) 00595 { 00596 long start = System.currentTimeMillis(); 00597 writesSync.wait(timeout); 00598 long end = System.currentTimeMillis(); 00599 long remaining = timeout - (end - start); 00600 if (remaining > 0) 00601 tm.setTimeout(remaining); 00602 else 00603 { 00604 String msg = Translate 00605 .get("scheduler.rollback.timeout.writeSync"); 00606 logger.warn(msg); 00607 throw new SQLException(msg); 00608 } 00609 } 00610 else 00611 writesSync.wait(); 00612 } 00613 catch (InterruptedException e) 00614 { 00615 String msg = Translate.get("scheduler.rollback.timeout.writeSync") 00616 + " (" + e + ")"; 00617 logger.error(msg); 00618 throw new SQLException(msg); 00619 } 00620 } 00621 pendingWrites++; 00622 } 00623 rollbackTransaction(tm.getTransactionId()); 00624 } 00625 00632 protected abstract void rollbackTransaction(long transactionId); 00633 00639 public final void rollbackCompleted(long transactionId) 00640 { 00641 // Take care of suspended transactions 00642 synchronized (transactionSync) 00643 { 00644 pendingTransactions--; 00645 00646 // If it is the last pending transaction to complete and we 00647 // are waiting for pending transactions to complete, then wake 00648 // up suspendNewTransactionsForCheckpoint() 00649 if (suspendedTransactions && (pendingTransactions == 0)) 00650 { 00651 synchronized (endOfCurrentTransactions) 00652 { 00653 endOfCurrentTransactions.notifyAll(); 00654 } 00655 } 00656 } 00657 // Take care of suspended write 00658 synchronized (writesSync) 00659 { 00660 pendingWrites--; 00661 00662 // It this is the last write to complete and writes are 00663 // suspended we have to notify suspendedWrites() 00664 if (suspendedWrites && (pendingWrites == 0)) 00665 { 00666 synchronized (endOfCurrentWrites) 00667 { 00668 endOfCurrentWrites.notifyAll(); 00669 } 00670 } 00671 } 00672 } 00673 00674 // 00675 // Checkpoint management 00676 // 00677 00688 public final void suspendNewTransactionsForCheckpoint() throws SQLException 00689 { 00690 synchronized (transactionSync) 00691 { 00692 suspendedTransactions = true; 00693 if (pendingTransactions == 0) 00694 return; 00695 } 00696 00697 synchronized (endOfCurrentTransactions) 00698 { 00699 // Here we have a potential synchronization problem since the last 00700 // transaction completion could have happened before we entered this 00701 // synchronized block. Therefore we recheck if there is effectively 00702 // still pending transactions. If this is not the case, we don't have 00703 // to sleep and we can immediately return. 00704 if (pendingTransactions == 0) 00705 return; 00706 00707 // Wait for pending transactions to end 00708 try 00709 { 00710 endOfCurrentTransactions.wait(); 00711 } 00712 catch (InterruptedException e) 00713 { 00714 String msg = Translate.get("scheduler.suspend.transaction.failed", e); 00715 logger.error(msg); 00716 throw new SQLException(msg); 00717 } 00718 } 00719 } 00720 00727 public final void resumeNewTransactions() 00728 { 00729 synchronized (transactionSync) 00730 { 00731 suspendedTransactions = false; 00732 // Wake up all pending begin statements 00733 transactionSync.notifyAll(); 00734 } 00735 } 00736 00746 public void suspendWrites() throws SQLException 00747 { 00748 synchronized (writesSync) 00749 { 00750 suspendedWrites = true; 00751 if (pendingWrites == 0) 00752 return; 00753 } 00754 00755 synchronized (endOfCurrentWrites) 00756 { 00757 // Here we have a potential synchronization problem since the last 00758 // write completion could have happened before we entered this 00759 // synchronized block. Therefore we recheck if there is effectively 00760 // still pending writes. If this is not the case, we don't have 00761 // to sleep and we can immediately return. 00762 if (pendingWrites == 0) 00763 return; 00764 00765 // Wait for pending transactions to end 00766 try 00767 { 00768 endOfCurrentWrites.wait(); 00769 } 00770 catch (InterruptedException e) 00771 { 00772 String msg = Translate.get("scheduler.suspend.writes.failed", e); 00773 logger.error(msg); 00774 throw new SQLException(msg); 00775 } 00776 } 00777 } 00778 00785 public void resumeWrites() 00786 { 00787 synchronized (writesSync) 00788 { 00789 suspendedWrites = false; 00790 // Wake up all waiting writes 00791 writesSync.notifyAll(); 00792 } 00793 } 00794 00795 // 00796 // Debug/Monitoring 00797 // 00798 00799 protected abstract String getXmlImpl(); 00800 00806 public String getXml() 00807 { 00808 StringBuffer info = new StringBuffer(); 00809 info.append("<" + DatabasesXmlTags.ELT_RequestScheduler + ">"); 00810 info.append(this.getXmlImpl()); 00811 info.append("</" + DatabasesXmlTags.ELT_RequestScheduler + ">"); 00812 return info.toString(); 00813 } 00814 00820 public String[] getSchedulerData() 00821 { 00822 String[] data = new String[7]; 00823 data[0] = "" + numberRead; 00824 data[1] = "" + numberWrite; 00825 data[2] = "" + pendingTransactions; 00826 data[3] = "" + pendingWrites; 00827 data[4] = "" + numberRead + numberWrite; 00828 data[5] = (suspendedTransactions) ? "1" : "0"; 00829 data[6] = (suspendedWrites) ? "1" : "0"; 00830 return data; 00831 } 00832 00836 public int getNumberRead() 00837 { 00838 return numberRead; 00839 } 00840 00844 public int getNumberWrite() 00845 { 00846 return numberWrite; 00847 } 00848 00852 public int getPendingTransactions() 00853 { 00854 return pendingTransactions; 00855 } 00856 }

CJDBCversion1.0.4に対してTue Oct 12 15:16:03 2004に生成されました。 doxygen 1.3.8