src/org/objectweb/cjdbc/controller/loadbalancer/raidb1/RAIDb1.java

説明を見る。
00001 00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb1; 00026 00027 import java.sql.Connection; 00028 import java.sql.SQLException; 00029 import java.util.ArrayList; 00030 00031 import org.objectweb.cjdbc.common.exceptions.BadConnectionException; 00032 import org.objectweb.cjdbc.common.exceptions.NoMoreBackendException; 00033 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException; 00034 import org.objectweb.cjdbc.common.i18n.Translate; 00035 import org.objectweb.cjdbc.common.log.Trace; 00036 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 00037 import org.objectweb.cjdbc.common.sql.ParsingGranularities; 00038 import org.objectweb.cjdbc.common.sql.SelectRequest; 00039 import org.objectweb.cjdbc.common.sql.StoredProcedure; 00040 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock; 00041 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 00042 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 00043 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache; 00044 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager; 00045 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer; 00046 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException; 00047 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread; 00048 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy; 00049 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask; 00050 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask; 00051 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask; 00052 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask; 00053 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask; 00054 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestTask; 00055 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestWithKeysTask; 00056 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask; 00057 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels; 00058 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData; 00059 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet; 00060 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase; 00061 00072 public abstract class RAIDb1 extends AbstractLoadBalancer 00073 { 00074 /* 00075 * How the code is organized ? 1. Member variables 2. Constructor(s) 3. 00076 * Request handling 4. Transaction handling 5. Backend management 00077 */ 00078 00079 protected ArrayList backendBlockingThreads; 00080 protected ArrayList backendNonBlockingThreads; 00081 protected ReadPrioritaryFIFOWriteLock backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock(); 00082 protected ReadPrioritaryFIFOWriteLock backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock(); 00083 // Should we wait for all backends to commit before returning ? 00084 protected WaitForCompletionPolicy waitForCompletionPolicy; 00085 00086 protected static Trace logger = Trace 00087 .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb1"); 00088 00089 /* 00090 * Constructors 00091 */ 00092 00102 public RAIDb1(VirtualDatabase vdb, 00103 WaitForCompletionPolicy waitForCompletionPolicy) throws SQLException 00104 { 00105 super(vdb, RAIDbLevels.RAIDb1, ParsingGranularities.NO_PARSING); 00106 00107 this.waitForCompletionPolicy = waitForCompletionPolicy; 00108 backendBlockingThreads = new ArrayList(); 00109 backendNonBlockingThreads = new ArrayList(); 00110 } 00111 00112 /* 00113 * Request Handling 00114 */ 00115 00123 private int getNbToWait(int nbOfThreads) 00124 { 00125 int nbToWait; 00126 switch (waitForCompletionPolicy.getPolicy()) 00127 { 00128 case WaitForCompletionPolicy.FIRST : 00129 nbToWait = 1; 00130 break; 00131 case WaitForCompletionPolicy.MAJORITY : 00132 nbToWait = nbOfThreads / 2 + 1; 00133 break; 00134 default : 00135 logger 00136 .warn(Translate.get("loadbalancer.waitforcompletion.unsupported")); 00137 case WaitForCompletionPolicy.ALL : 00138 nbToWait = nbOfThreads; 00139 break; 00140 } 00141 return nbToWait; 00142 } 00143 00147 public abstract ControllerResultSet execReadRequest(SelectRequest request, 00148 MetadataCache metadataCache) throws SQLException; 00149 00159 protected ControllerResultSet executeRequestOnBackend(SelectRequest request, 00160 DatabaseBackend backend, MetadataCache metadataCache) 00161 throws SQLException, UnreachableBackendException 00162 { 00163 // Handle macros 00164 handleMacros(request); 00165 00166 // Ok, we have a backend, let's execute the request 00167 AbstractConnectionManager cm = backend.getConnectionManager(request 00168 .getLogin()); 00169 00170 // Sanity check 00171 if (cm == null) 00172 { 00173 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00174 new String[]{request.getLogin(), backend.getName()}); 00175 logger.error(msg); 00176 throw new SQLException(msg); 00177 } 00178 00179 // Execute the query 00180 if (request.isAutoCommit()) 00181 { 00182 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00183 // We could do something finer grain here by waiting 00184 // only for writes that depend on the tables we need 00185 // but is that really worth the overhead ? 00186 waitForAllWritesToComplete(backend); 00187 00188 ControllerResultSet rs = null; 00189 boolean badConnection; 00190 do 00191 { 00192 badConnection = false; 00193 // Use a connection just for this request 00194 Connection c = null; 00195 try 00196 { 00197 c = cm.getConnection(); 00198 } 00199 catch (UnreachableBackendException e1) 00200 { 00201 logger.error(Translate.get( 00202 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00203 backend.disable(); 00204 throw new UnreachableBackendException(Translate.get( 00205 "loadbalancer.backend.unreacheable", backend.getName())); 00206 } 00207 00208 // Sanity check 00209 if (c == null) 00210 throw new SQLException(Translate.get( 00211 "loadbalancer.backend.no.connection", backend.getName())); 00212 00213 // Execute Query 00214 try 00215 { 00216 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 00217 cm.releaseConnection(c); 00218 } 00219 catch (SQLException e) 00220 { 00221 cm.releaseConnection(c); 00222 throw new SQLException(Translate.get( 00223 "loadbalancer.request.failed.on.backend", new String[]{ 00224 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00225 backend.getName(), e.getMessage()})); 00226 } 00227 catch (BadConnectionException e) 00228 { // Get rid of the bad connection 00229 cm.deleteConnection(c); 00230 badConnection = true; 00231 } 00232 } 00233 while (badConnection); 00234 if (logger.isDebugEnabled()) 00235 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00236 String.valueOf(request.getId()), backend.getName()})); 00237 return rs; 00238 } 00239 else 00240 { // Inside a transaction 00241 Connection c; 00242 long tid = request.getTransactionId(); 00243 Long lTid = new Long(tid); 00244 00245 // Wait for previous writes to complete 00246 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00247 waitForAllWritesToComplete(backend, request.getTransactionId()); 00248 00249 if (!backend.isStartedTransaction(lTid)) 00250 { // transaction has not been started yet on this backend 00251 try 00252 { 00253 c = cm.getConnection(tid); 00254 } 00255 catch (UnreachableBackendException e1) 00256 { 00257 logger.error(Translate.get( 00258 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00259 backend.disable(); 00260 throw new SQLException(Translate.get( 00261 "loadbalancer.backend.unreacheable", backend.getName())); 00262 } 00263 00264 // Sanity check 00265 if (c == null) 00266 throw new SQLException(Translate.get( 00267 "loadbalancer.unable.get.connection", new String[]{ 00268 String.valueOf(tid), backend.getName()})); 00269 00270 // begin transaction 00271 backend.startTransaction(lTid); 00272 c.setAutoCommit(false); 00273 } 00274 else 00275 { // Re-use the connection used by this transaction 00276 c = cm.retrieveConnection(tid); 00277 00278 // Sanity check 00279 if (c == null) 00280 throw new SQLException(Translate.get( 00281 "loadbalancer.unable.retrieve.connection", new String[]{ 00282 String.valueOf(tid), backend.getName()})); 00283 } 00284 00285 // Execute Query 00286 ControllerResultSet rs = null; 00287 try 00288 { 00289 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 00290 } 00291 catch (SQLException e) 00292 { 00293 throw new SQLException(Translate.get( 00294 "loadbalancer.request.failed.on.backend", new String[]{ 00295 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00296 backend.getName(), e.getMessage()})); 00297 } 00298 catch (BadConnectionException e) 00299 { // Connection failed, so did the transaction 00300 // Disable the backend. 00301 cm.deleteConnection(tid); 00302 String msg = Translate.get( 00303 "loadbalancer.backend.disabling.connection.failure", backend 00304 .getName()); 00305 logger.error(msg); 00306 backend.disable(); 00307 throw new SQLException(msg); 00308 } 00309 if (logger.isDebugEnabled()) 00310 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00311 new String[]{String.valueOf(tid), String.valueOf(request.getId()), 00312 backend.getName()})); 00313 return rs; 00314 } 00315 } 00316 00327 public int execWriteRequest(AbstractWriteRequest request) 00328 throws AllBackendsFailedException, NoMoreBackendException,SQLException 00329 { 00330 return ((WriteRequestTask) execWriteRequest(request, false, null)) 00331 .getResult(); 00332 } 00333 00344 public ControllerResultSet execWriteRequestWithKeys( 00345 AbstractWriteRequest request, MetadataCache metadataCache) 00346 throws AllBackendsFailedException, SQLException 00347 { 00348 return ((WriteRequestWithKeysTask) execWriteRequest(request, true, 00349 metadataCache)).getResult(); 00350 } 00351 00364 private AbstractTask execWriteRequest(AbstractWriteRequest request, 00365 boolean useKeys, MetadataCache metadataCache) 00366 throws AllBackendsFailedException, NoMoreBackendException,SQLException 00367 { 00368 ArrayList backendThreads; 00369 ReadPrioritaryFIFOWriteLock lock; 00370 00371 // Handle macros 00372 handleMacros(request); 00373 00374 // Determine which list (blocking or not) to use 00375 if (request.mightBlock()) 00376 { // Blocking 00377 backendThreads = backendBlockingThreads; 00378 lock = backendBlockingThreadsRWLock; 00379 } 00380 else 00381 { // Non-blocking 00382 backendThreads = backendNonBlockingThreads; 00383 lock = backendNonBlockingThreadsRWLock; 00384 if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00385 && (request.getTransactionId() != 0)) 00386 waitForAllWritesToComplete(request.getTransactionId()); 00387 } 00388 00389 try 00390 { 00391 lock.acquireRead(); 00392 } 00393 catch (InterruptedException e) 00394 { 00395 String msg = Translate.get( 00396 "loadbalancer.backendlist.acquire.readlock.failed", e); 00397 logger.error(msg); 00398 throw new SQLException(msg); 00399 } 00400 00401 int nbOfThreads = backendThreads.size(); 00402 if(nbOfThreads==0) 00403 throw new NoMoreBackendException(Translate.get("loadbalancer.backendlist.empty")); 00404 00405 // Create the task 00406 AbstractTask task; 00407 if (useKeys) 00408 task = new WriteRequestWithKeysTask(getNbToWait(nbOfThreads), 00409 nbOfThreads, request, metadataCache); 00410 else 00411 task = new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads, 00412 request); 00413 00414 synchronized (task) 00415 { 00416 if (waitForCompletionPolicy.getPolicy() == WaitForCompletionPolicy.ALL) 00417 { // Post the task in each backendThread tasklist and wakeup the threads 00418 for (int i = 0; i < nbOfThreads; i++) 00419 { 00420 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00421 .get(i); 00422 synchronized (thread) 00423 { 00424 thread.addTask(task); 00425 thread.notify(); 00426 } 00427 } 00428 } 00429 else 00430 { 00431 // We have to first post the request on each backend before letting the 00432 // first backend to execute the request. Therefore we have 2 phases: 00433 // 1. post the task in each thread queue 00434 // 2. notify each thread to execute the query 00435 00436 // 1. Post the task 00437 if (request.mightBlock()) 00438 { 00439 for (int i = 0; i < nbOfThreads; i++) 00440 { 00441 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00442 .get(i); 00443 synchronized (thread) 00444 { 00445 thread.addTask(task, request.getTransactionId()); 00446 } 00447 } 00448 } 00449 else 00450 { 00451 for (int i = 0; i < nbOfThreads; i++) 00452 { 00453 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00454 .get(i); 00455 synchronized (thread) 00456 { 00457 thread.addTask(task); 00458 } 00459 } 00460 } 00461 00462 // 2. Start the task execution on each backend 00463 for (int i = 0; i < nbOfThreads; i++) 00464 { 00465 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00466 .get(i); 00467 synchronized (thread) 00468 { 00469 thread.notify(); 00470 } 00471 } 00472 } 00473 00474 lock.releaseRead(); 00475 00476 // Wait for completion (notified by the task) 00477 try 00478 { 00479 // Wait on task 00480 long timeout = request.getTimeout() * 1000; 00481 if (timeout > 0) 00482 { 00483 long start = System.currentTimeMillis(); 00484 task.wait(timeout); 00485 long end = System.currentTimeMillis(); 00486 long remaining = timeout - (end - start); 00487 if (remaining <= 0) 00488 { 00489 String msg = Translate.get("loadbalancer.request.timeout", 00490 new String[]{String.valueOf(request.getId()), 00491 String.valueOf(task.getSuccess()), 00492 String.valueOf(task.getFailed())}); 00493 00494 // Try to remove the request from the task list 00495 lock.acquireRead(); 00496 nbOfThreads = backendThreads.size(); 00497 for (int i = 0; i < nbOfThreads; i++) 00498 { 00499 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00500 .get(i); 00501 synchronized (thread) 00502 { 00503 thread.removeTask(task); 00504 } 00505 } 00506 lock.releaseRead(); 00507 00508 logger.warn(msg); 00509 throw new SQLException(msg); 00510 } 00511 // No need to update request timeout since the execution is finished 00512 } 00513 else 00514 task.wait(); 00515 } 00516 catch (InterruptedException e) 00517 { 00518 // Try to remove the request from the task list 00519 try 00520 { 00521 lock.acquireRead(); 00522 nbOfThreads = backendThreads.size(); 00523 } 00524 catch (InterruptedException ignore) 00525 { 00526 nbOfThreads = 0; // Give up 00527 } 00528 for (int i = 0; i < nbOfThreads; i++) 00529 { 00530 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00531 .get(i); 00532 synchronized (thread) 00533 { 00534 thread.removeTask(task); 00535 } 00536 } 00537 lock.releaseRead(); 00538 00539 throw new SQLException(Translate.get("loadbalancer.request.timeout", 00540 new String[]{String.valueOf(request.getId()), 00541 String.valueOf(task.getSuccess()), 00542 String.valueOf(task.getFailed())})); 00543 } 00544 00545 if (task.getSuccess() > 0) 00546 return task; 00547 else 00548 { // All tasks failed 00549 ArrayList exceptions = task.getExceptions(); 00550 if (exceptions == null) 00551 throw new AllBackendsFailedException(Translate.get( 00552 "loadbalancer.request.failed.all", request.getId())); 00553 else 00554 { 00555 String errorMsg = Translate.get("loadbalancer.request.failed.stack", 00556 request.getId()) 00557 + "\n"; 00558 for (int i = 0; i < exceptions.size(); i++) 00559 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 00560 logger.error(errorMsg); 00561 throw new SQLException(errorMsg); 00562 } 00563 } 00564 } 00565 } 00566 00576 protected ControllerResultSet executeStoredProcedureOnBackend( 00577 StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache) 00578 throws SQLException, UnreachableBackendException 00579 { 00580 // Handle macros 00581 handleMacros(proc); 00582 00583 // Ok, we have a backend, let's execute the request 00584 AbstractConnectionManager cm = backend 00585 .getConnectionManager(proc.getLogin()); 00586 00587 // Sanity check 00588 if (cm == null) 00589 { 00590 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00591 new String[]{proc.getLogin(), backend.getName()}); 00592 logger.error(msg); 00593 throw new SQLException(msg); 00594 } 00595 00596 // Execute the query 00597 if (proc.isAutoCommit()) 00598 { 00599 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00600 // We could do something finer grain here by waiting 00601 // only for writes that depend on the tables we need 00602 // but is that really worth the overhead ? 00603 waitForAllWritesToComplete(backend); 00604 00605 // Use a connection just for this request 00606 Connection c = null; 00607 try 00608 { 00609 c = cm.getConnection(); 00610 } 00611 catch (UnreachableBackendException e1) 00612 { 00613 logger.error(Translate.get( 00614 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00615 backend.disable(); 00616 throw new UnreachableBackendException(Translate.get( 00617 "loadbalancer.backend.unreacheable", backend.getName())); 00618 } 00619 00620 // Sanity check 00621 if (c == null) 00622 throw new UnreachableBackendException(Translate.get( 00623 "loadbalancer.backend.no.connection", backend.getName())); 00624 00625 // Execute Query 00626 ControllerResultSet rs = null; 00627 try 00628 { 00629 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 00630 backend, c, metadataCache); 00631 } 00632 catch (Exception e) 00633 { 00634 throw new SQLException(Translate.get( 00635 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00636 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00637 backend.getName(), e.getMessage()})); 00638 } 00639 finally 00640 { 00641 cm.releaseConnection(c); 00642 } 00643 if (logger.isDebugEnabled()) 00644 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 00645 new String[]{String.valueOf(proc.getId()), backend.getName()})); 00646 return rs; 00647 } 00648 else 00649 { // Inside a transaction 00650 Connection c; 00651 long tid = proc.getTransactionId(); 00652 Long lTid = new Long(tid); 00653 00654 // Wait for previous writes to complete 00655 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00656 waitForAllWritesToComplete(backend, proc.getTransactionId()); 00657 00658 if (!backend.isStartedTransaction(lTid)) 00659 { // transaction has not been started yet on this backend 00660 try 00661 { 00662 c = cm.getConnection(tid); 00663 } 00664 catch (UnreachableBackendException e1) 00665 { 00666 logger.error(Translate.get( 00667 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00668 backend.disable(); 00669 throw new SQLException(Translate.get( 00670 "loadbalancer.backend.unreacheable", backend.getName())); 00671 } 00672 00673 // Sanity check 00674 if (c == null) 00675 throw new SQLException(Translate.get( 00676 "loadbalancer.unable.get.connection", new String[]{ 00677 String.valueOf(tid), backend.getName()})); 00678 00679 // begin transaction 00680 backend.startTransaction(lTid); 00681 c.setAutoCommit(false); 00682 } 00683 else 00684 { // Re-use the connection used by this transaction 00685 c = cm.retrieveConnection(tid); 00686 00687 // Sanity check 00688 if (c == null) 00689 throw new SQLException(Translate.get( 00690 "loadbalancer.unable.retrieve.connection", new String[]{ 00691 String.valueOf(tid), backend.getName()})); 00692 } 00693 00694 // Execute Query 00695 ControllerResultSet rs; 00696 try 00697 { 00698 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 00699 backend, c, metadataCache); 00700 } 00701 catch (Exception e) 00702 { 00703 throw new SQLException(Translate.get( 00704 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00705 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00706 backend.getName(), e.getMessage()})); 00707 } 00708 if (logger.isDebugEnabled()) 00709 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00710 new String[]{String.valueOf(tid), String.valueOf(proc.getId()), 00711 backend.getName()})); 00712 return rs; 00713 } 00714 } 00715 00720 public ControllerResultSet execReadStoredProcedure(StoredProcedure proc, 00721 MetadataCache metadataCache) throws SQLException 00722 { 00723 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure( 00724 proc, true, metadataCache); 00725 return task.getResult(); 00726 } 00727 00731 public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException 00732 { 00733 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure( 00734 proc, false, null); 00735 return task.getResult(); 00736 } 00737 00748 private AbstractTask callStoredProcedure(StoredProcedure proc, 00749 boolean isRead, MetadataCache metadataCache) throws SQLException 00750 { 00751 ArrayList backendThreads = backendBlockingThreads; 00752 ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock; 00753 00754 try 00755 { 00756 lock.acquireRead(); 00757 } 00758 catch (InterruptedException e) 00759 { 00760 String msg = Translate.get( 00761 "loadbalancer.backendlist.acquire.readlock.failed", e); 00762 logger.error(msg); 00763 throw new SQLException(msg); 00764 } 00765 00766 int nbOfThreads = backendThreads.size(); 00767 00768 // Create the task 00769 AbstractTask task; 00770 if (isRead) 00771 task = new ReadStoredProcedureTask(getNbToWait(nbOfThreads), nbOfThreads, 00772 proc, metadataCache); 00773 else 00774 task = new WriteStoredProcedureTask(getNbToWait(nbOfThreads), 00775 nbOfThreads, proc); 00776 00777 synchronized (task) 00778 { 00779 // Post the task in each backendThread tasklist and wakeup the threads 00780 for (int i = 0; i < nbOfThreads; i++) 00781 { 00782 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00783 .get(i); 00784 synchronized (thread) 00785 { 00786 if ((waitForCompletionPolicy.getPolicy() == WaitForCompletionPolicy.ALL)) 00787 thread.addTask(task); 00788 else 00789 thread.addTask(task, proc.getTransactionId()); 00790 thread.notify(); 00791 } 00792 } 00793 00794 lock.releaseRead(); 00795 00796 // Wait for completion (notified by the task) 00797 try 00798 { 00799 // Wait on task 00800 long timeout = proc.getTimeout() * 1000; 00801 if (timeout > 0) 00802 { 00803 long start = System.currentTimeMillis(); 00804 task.wait(timeout); 00805 long end = System.currentTimeMillis(); 00806 long remaining = timeout - (end - start); 00807 if (remaining <= 0) 00808 { 00809 String msg = Translate.get("loadbalancer.storedprocedure.timeout", 00810 new String[]{String.valueOf(proc.getId()), 00811 String.valueOf(task.getSuccess()), 00812 String.valueOf(task.getFailed())}); 00813 // Try to remove the request from the task list 00814 lock.acquireRead(); 00815 nbOfThreads = backendThreads.size(); 00816 for (int i = 0; i < nbOfThreads; i++) 00817 { 00818 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00819 .get(i); 00820 synchronized (thread) 00821 { 00822 thread.removeTask(task); 00823 } 00824 } 00825 lock.releaseRead(); 00826 00827 logger.warn(msg); 00828 throw new SQLException(msg); 00829 } 00830 // No need to update request timeout since the execution is finished 00831 } 00832 else 00833 task.wait(); 00834 } 00835 catch (InterruptedException e) 00836 { 00837 // Try to remove the request from the task list 00838 try 00839 { 00840 lock.acquireRead(); 00841 nbOfThreads = backendThreads.size(); 00842 } 00843 catch (InterruptedException ignore) 00844 { 00845 nbOfThreads = 0; // Give up 00846 } 00847 for (int i = 0; i < nbOfThreads; i++) 00848 { 00849 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00850 .get(i); 00851 synchronized (thread) 00852 { 00853 thread.removeTask(task); 00854 } 00855 } 00856 lock.releaseRead(); 00857 00858 throw new SQLException(Translate.get( 00859 "loadbalancer.storedprocedure.timeout", new String[]{ 00860 String.valueOf(proc.getId()), 00861 String.valueOf(task.getSuccess()), 00862 String.valueOf(task.getFailed())})); 00863 } 00864 00865 if (task.getSuccess() > 0) 00866 return task; 00867 else 00868 { // All tasks failed 00869 ArrayList exceptions = task.getExceptions(); 00870 if (exceptions == null) 00871 throw new SQLException(Translate.get( 00872 "loadbalancer.storedprocedure.all.failed", proc.getId())); 00873 else 00874 { 00875 String errorMsg = Translate.get( 00876 "loadbalancer.storedprocedure.failed.stack", proc.getId()) 00877 + "\n"; 00878 for (int i = 0; i < exceptions.size(); i++) 00879 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 00880 logger.error(errorMsg); 00881 throw new SQLException(errorMsg); 00882 } 00883 } 00884 } 00885 } 00886 00887 /* 00888 * Transaction management 00889 */ 00890 00897 public final void begin(TransactionMarkerMetaData tm) throws SQLException 00898 { 00899 } 00900 00907 public void commit(TransactionMarkerMetaData tm) throws SQLException 00908 { 00909 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00910 waitForAllWritesToComplete(tm.getTransactionId()); 00911 00912 try 00913 { 00914 backendNonBlockingThreadsRWLock.acquireRead(); 00915 } 00916 catch (InterruptedException e) 00917 { 00918 String msg = Translate.get( 00919 "loadbalancer.backendlist.acquire.readlock.failed", e); 00920 logger.error(msg); 00921 throw new SQLException(msg); 00922 } 00923 00924 int nbOfThreads = backendNonBlockingThreads.size(); 00925 ArrayList commitList = new ArrayList(); 00926 Long lTid = new Long(tm.getTransactionId()); 00927 00928 // Build the list of backend that need to commit this transaction 00929 for (int i = 0; i < nbOfThreads; i++) 00930 { 00931 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 00932 .get(i); 00933 if (thread.getBackend().isStartedTransaction(lTid)) 00934 commitList.add(thread); 00935 } 00936 00937 nbOfThreads = commitList.size(); 00938 if (nbOfThreads == 0) 00939 { 00940 backendNonBlockingThreadsRWLock.releaseRead(); 00941 return; 00942 } 00943 00944 // Create the task 00945 CommitTask task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm 00946 .getTimeout(), tm.getLogin(), tm.getTransactionId()); 00947 00948 synchronized (task) 00949 { 00950 // Post the task in each backendThread tasklist and wakeup the threads 00951 for (int i = 0; i < nbOfThreads; i++) 00952 { 00953 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 00954 synchronized (thread) 00955 { 00956 thread.addTask(task); 00957 thread.notify(); 00958 } 00959 } 00960 00961 backendNonBlockingThreadsRWLock.releaseRead(); 00962 00963 // Wait for completion (notified by the task) 00964 try 00965 { 00966 // Wait on task 00967 long timeout = tm.getTimeout(); 00968 if (timeout > 0) 00969 { 00970 long start = System.currentTimeMillis(); 00971 task.wait(timeout); 00972 long end = System.currentTimeMillis(); 00973 long remaining = timeout - (end - start); 00974 if (remaining <= 0) 00975 { 00976 String msg = Translate.get("loadbalancer.commit.timeout", 00977 new String[]{String.valueOf(tm.getTransactionId()), 00978 String.valueOf(task.getSuccess()), 00979 String.valueOf(task.getFailed())}); 00980 logger.warn(msg); 00981 throw new SQLException(msg); 00982 } 00983 } 00984 else 00985 task.wait(); 00986 } 00987 catch (InterruptedException e) 00988 { 00989 throw new SQLException(Translate.get("loadbalancer.commit.timeout", 00990 new String[]{String.valueOf(tm.getTransactionId()), 00991 String.valueOf(task.getSuccess()), 00992 String.valueOf(task.getFailed())})); 00993 } 00994 00995 if (task.getSuccess() > 0) 00996 return; 00997 else 00998 { // All tasks failed 00999 ArrayList exceptions = task.getExceptions(); 01000 if (exceptions == null) 01001 throw new SQLException(Translate.get( 01002 "loadbalancer.commit.all.failed", tm.getTransactionId())); 01003 else 01004 { 01005 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 01006 tm.getTransactionId()) 01007 + "\n"; 01008 for (int i = 0; i < exceptions.size(); i++) 01009 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01010 logger.error(errorMsg); 01011 throw new SQLException(errorMsg); 01012 } 01013 } 01014 } 01015 } 01016 01023 public void rollback(TransactionMarkerMetaData tm) throws SQLException 01024 { 01025 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 01026 waitForAllWritesToComplete(tm.getTransactionId()); 01027 01028 try 01029 { 01030 backendNonBlockingThreadsRWLock.acquireRead(); 01031 } 01032 catch (InterruptedException e) 01033 { 01034 String msg = Translate.get( 01035 "loadbalancer.backendlist.acquire.readlock.failed", e); 01036 logger.error(msg); 01037 throw new SQLException(msg); 01038 } 01039 int nbOfThreads = backendNonBlockingThreads.size(); 01040 ArrayList rollbackList = new ArrayList(); 01041 Long lTid = new Long(tm.getTransactionId()); 01042 01043 // Build the list of backend that need to rollback this transaction 01044 for (int i = 0; i < nbOfThreads; i++) 01045 { 01046 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01047 .get(i); 01048 if (thread.getBackend().isStartedTransaction(lTid)) 01049 rollbackList.add(thread); 01050 } 01051 01052 nbOfThreads = rollbackList.size(); 01053 if (nbOfThreads == 0) 01054 { 01055 backendNonBlockingThreadsRWLock.releaseRead(); 01056 return; 01057 } 01058 01059 // Create the task 01060 RollbackTask task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, 01061 tm.getTimeout(), tm.getLogin(), tm.getTransactionId()); 01062 01063 synchronized (task) 01064 { 01065 // Post the task in each backendThread tasklist and wakeup the threads 01066 for (int i = 0; i < nbOfThreads; i++) 01067 { 01068 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 01069 synchronized (thread) 01070 { 01071 thread.addTask(task); 01072 thread.notify(); 01073 } 01074 } 01075 01076 backendNonBlockingThreadsRWLock.releaseRead(); 01077 01078 // Wait for completion (notified by the task) 01079 try 01080 { 01081 // Wait on task 01082 long timeout = tm.getTimeout(); 01083 if (timeout > 0) 01084 { 01085 long start = System.currentTimeMillis(); 01086 task.wait(timeout); 01087 long end = System.currentTimeMillis(); 01088 long remaining = timeout - (end - start); 01089 if (remaining <= 0) 01090 { 01091 String msg = Translate.get("loadbalancer.rollback.timeout", 01092 new String[]{String.valueOf(tm.getTransactionId()), 01093 String.valueOf(task.getSuccess()), 01094 String.valueOf(task.getFailed())}); 01095 logger.warn(msg); 01096 throw new SQLException(msg); 01097 } 01098 } 01099 else 01100 task.wait(); 01101 } 01102 catch (InterruptedException e) 01103 { 01104 throw new SQLException(Translate.get("loadbalancer.rollback.timeout", 01105 new String[]{String.valueOf(tm.getTransactionId()), 01106 String.valueOf(task.getSuccess()), 01107 String.valueOf(task.getFailed())})); 01108 } 01109 01110 if (task.getSuccess() > 0) 01111 return; 01112 else 01113 { // All tasks failed 01114 ArrayList exceptions = task.getExceptions(); 01115 if (exceptions == null) 01116 throw new SQLException(Translate.get( 01117 "loadbalancer.rollback.all.failed", tm.getTransactionId())); 01118 else 01119 { 01120 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 01121 tm.getTransactionId()) 01122 + "\n"; 01123 for (int i = 0; i < exceptions.size(); i++) 01124 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01125 logger.error(errorMsg); 01126 throw new SQLException(errorMsg); 01127 } 01128 } 01129 } 01130 } 01131 01136 protected void waitForAllWritesToComplete(long transactionId) 01137 throws SQLException 01138 { 01139 try 01140 { 01141 backendBlockingThreadsRWLock.acquireRead(); 01142 } 01143 catch (InterruptedException e) 01144 { 01145 String msg = Translate.get( 01146 "loadbalancer.backendlist.acquire.readlock.failed", e); 01147 logger.error(msg); 01148 throw new SQLException(msg); 01149 } 01150 01151 int nbOfThreads = backendBlockingThreads.size(); 01152 01153 for (int i = 0; i < nbOfThreads; i++) 01154 { 01155 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01156 .get(i); 01157 thread.waitForAllTasksToComplete(transactionId); 01158 } 01159 01160 backendBlockingThreadsRWLock.releaseRead(); 01161 } 01162 01170 protected void waitForAllWritesToComplete(DatabaseBackend backend, 01171 long transactionId) throws SQLException 01172 { 01173 try 01174 { 01175 backendBlockingThreadsRWLock.acquireRead(); 01176 } 01177 catch (InterruptedException e) 01178 { 01179 String msg = Translate.get( 01180 "loadbalancer.backendlist.acquire.readlock.failed", e); 01181 logger.error(msg); 01182 throw new SQLException(msg); 01183 } 01184 01185 int nbOfThreads = backendBlockingThreads.size(); 01186 01187 for (int i = 0; i < nbOfThreads; i++) 01188 { 01189 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01190 .get(i); 01191 if (thread.getBackend() == backend) 01192 thread.waitForAllTasksToComplete(transactionId); 01193 } 01194 01195 backendBlockingThreadsRWLock.releaseRead(); 01196 } 01197 01204 protected void waitForAllWritesToComplete(DatabaseBackend backend) 01205 throws SQLException 01206 { 01207 try 01208 { 01209 backendBlockingThreadsRWLock.acquireRead(); 01210 } 01211 catch (InterruptedException e) 01212 { 01213 String msg = Translate.get( 01214 "loadbalancer.backendlist.acquire.readlock.failed", e); 01215 logger.error(msg); 01216 throw new SQLException(msg); 01217 } 01218 01219 int nbOfThreads = backendBlockingThreads.size(); 01220 01221 for (int i = 0; i < nbOfThreads; i++) 01222 { 01223 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01224 .get(i); 01225 if (thread.getBackend() == backend) 01226 thread.waitForAllTasksToComplete(); 01227 } 01228 01229 backendBlockingThreadsRWLock.releaseRead(); 01230 } 01231 01232 /* 01233 * Backends management 01234 */ 01235 01248 public void enableBackend(DatabaseBackend db, boolean writeEnabled) 01249 throws SQLException 01250 { 01251 if (writeEnabled && db.isWriteCanBeEnabled()) 01252 { 01253 // Create 2 worker threads 01254 BackendWorkerThread blockingThread = new BackendWorkerThread(db, this); 01255 BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this); 01256 01257 // Add first to the blocking thread list 01258 try 01259 { 01260 backendBlockingThreadsRWLock.acquireWrite(); 01261 } 01262 catch (InterruptedException e) 01263 { 01264 String msg = Translate.get( 01265 "loadbalancer.backendlist.acquire.writelock.failed", e); 01266 logger.error(msg); 01267 throw new SQLException(msg); 01268 } 01269 backendBlockingThreads.add(blockingThread); 01270 backendBlockingThreadsRWLock.releaseWrite(); 01271 blockingThread.start(); 01272 logger.info(Translate.get( 01273 "loadbalancer.backend.workerthread.blocking.add", db.getName())); 01274 01275 // Then add to the non-blocking thread list 01276 try 01277 { 01278 backendNonBlockingThreadsRWLock.acquireWrite(); 01279 } 01280 catch (InterruptedException e) 01281 { 01282 String msg = Translate.get( 01283 "loadbalancer.backendlist.acquire.writelock.failed", e); 01284 logger.error(msg); 01285 throw new SQLException(msg); 01286 } 01287 backendNonBlockingThreads.add(nonBlockingThread); 01288 backendNonBlockingThreadsRWLock.releaseWrite(); 01289 nonBlockingThread.start(); 01290 logger.info(Translate.get( 01291 "loadbalancer.backend.workerthread.non.blocking.add", db.getName())); 01292 db.enableWrite(); 01293 } 01294 01295 if (!db.isInitialized()) 01296 db.initializeConnections(); 01297 db.enableRead(); 01298 } 01299 01311 public synchronized void disableBackend(DatabaseBackend db) 01312 throws SQLException 01313 { 01314 if (db.isWriteEnabled()) 01315 { 01316 // Starts with backendBlockingThreads 01317 try 01318 { 01319 backendBlockingThreadsRWLock.acquireWrite(); 01320 } 01321 catch (InterruptedException e) 01322 { 01323 String msg = Translate.get( 01324 "loadbalancer.backendlist.acquire.writelock.failed", e); 01325 logger.error(msg); 01326 throw new SQLException(msg); 01327 } 01328 01329 int nbOfThreads = backendBlockingThreads.size(); 01330 01331 // Find the right blocking thread 01332 for (int i = 0; i < nbOfThreads; i++) 01333 { 01334 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01335 .get(i); 01336 if (thread.getBackend().equals(db)) 01337 { 01338 logger.info(Translate 01339 .get("loadbalancer.backend.workerthread.blocking.remove", db 01340 .getName())); 01341 01342 // Remove it from the backendBlockingThread list 01343 backendBlockingThreads.remove(thread); 01344 01345 synchronized (thread) 01346 { 01347 // Kill the thread 01348 thread.addPriorityTask(new KillThreadTask(1, 1)); 01349 thread.notify(); 01350 } 01351 break; 01352 } 01353 } 01354 01355 backendBlockingThreadsRWLock.releaseWrite(); 01356 01357 // Continue with backendNonBlockingThreads 01358 01359 try 01360 { 01361 backendNonBlockingThreadsRWLock.acquireWrite(); 01362 } 01363 catch (InterruptedException e) 01364 { 01365 String msg = Translate.get( 01366 "loadbalancer.backendlist.acquire.writelock.failed", e); 01367 logger.error(msg); 01368 throw new SQLException(msg); 01369 } 01370 01371 // Find the right non-blocking thread 01372 nbOfThreads = backendNonBlockingThreads.size(); 01373 for (int i = 0; i < nbOfThreads; i++) 01374 { 01375 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01376 .get(i); 01377 if (thread.getBackend().equals(db)) 01378 { 01379 logger.info(Translate.get( 01380 "loadbalancer.backend.workerthread.non.blocking.remove", db 01381 .getName())); 01382 01383 // Remove it from the backendNonBlockingThreads list 01384 backendNonBlockingThreads.remove(thread); 01385 01386 synchronized (thread) 01387 { 01388 // Kill the thread 01389 thread.addPriorityTask(new KillThreadTask(1, 1)); 01390 thread.notify(); 01391 } 01392 break; 01393 } 01394 } 01395 01396 backendNonBlockingThreadsRWLock.releaseWrite(); 01397 } 01398 01399 db.disable(); 01400 if (db.isInitialized()) 01401 db.finalizeConnections(); 01402 } 01403 01407 public String getXmlImpl() 01408 { 01409 StringBuffer info = new StringBuffer(); 01410 info.append("<" + DatabasesXmlTags.ELT_RAIDb_1 + ">"); 01411 if (waitForCompletionPolicy != null) 01412 info.append(waitForCompletionPolicy.getXml()); 01413 if (macroHandler != null) 01414 info.append(macroHandler.getXml()); 01415 info.append(getRaidb1Xml()); 01416 info.append("</" + DatabasesXmlTags.ELT_RAIDb_1 + ">"); 01417 return info.toString(); 01418 } 01419 01427 public abstract String getRaidb1Xml(); 01428 }

CJDBCversion1.0.4に対してTue Oct 12 15:16:02 2004に生成されました。 doxygen 1.3.8