src/org/objectweb/cjdbc/controller/loadbalancer/raidb2/RAIDb2.java

説明を見る。
00001 00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb2; 00026 00027 import java.sql.Connection; 00028 import java.sql.SQLException; 00029 import java.util.ArrayList; 00030 00031 import org.objectweb.cjdbc.common.exceptions.BadConnectionException; 00032 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException; 00033 import org.objectweb.cjdbc.common.i18n.Translate; 00034 import org.objectweb.cjdbc.common.log.Trace; 00035 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 00036 import org.objectweb.cjdbc.common.sql.ParsingGranularities; 00037 import org.objectweb.cjdbc.common.sql.SelectRequest; 00038 import org.objectweb.cjdbc.common.sql.StoredProcedure; 00039 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock; 00040 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 00041 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 00042 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache; 00043 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager; 00044 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer; 00045 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException; 00046 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread; 00047 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy; 00048 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableException; 00049 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy; 00050 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule; 00051 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask; 00052 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask; 00053 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask; 00054 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask; 00055 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask; 00056 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestTask; 00057 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestWithKeysTask; 00058 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask; 00059 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels; 00060 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData; 00061 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet; 00062 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase; 00063 00075 public abstract class RAIDb2 extends AbstractLoadBalancer 00076 { 00077 // 00078 // How the code is organized ? 00079 // 1. Member variables 00080 // 2. Constructor(s) 00081 // 3. Request handling 00082 // 4. Transaction handling 00083 // 5. Backend management 00084 // 00085 00086 protected ArrayList backendBlockingThreads; 00087 protected ArrayList backendNonBlockingThreads; 00088 protected ReadPrioritaryFIFOWriteLock backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock(); 00089 protected ReadPrioritaryFIFOWriteLock backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock(); 00090 // Should we wait for all backends to commit before returning ? 00091 protected WaitForCompletionPolicy waitForCompletionPolicy; 00092 protected CreateTablePolicy createTablePolicy; 00093 00094 protected static Trace logger = Trace 00095 .getLogger("org.objectweb.cjdbc.controller.loadbalancer.raidb2"); 00096 00097 /* 00098 * Constructors 00099 */ 00100 00112 public RAIDb2(VirtualDatabase vdb, 00113 WaitForCompletionPolicy waitForCompletionPolicy, 00114 CreateTablePolicy createTablePolicy) throws SQLException 00115 { 00116 super(vdb, RAIDbLevels.RAIDb2, ParsingGranularities.TABLE); 00117 00118 this.waitForCompletionPolicy = waitForCompletionPolicy; 00119 backendBlockingThreads = new ArrayList(); 00120 backendNonBlockingThreads = new ArrayList(); 00121 this.createTablePolicy = createTablePolicy; 00122 } 00123 00124 /* 00125 * Request Handling 00126 */ 00127 00135 private int getNbToWait(int nbOfThreads) 00136 { 00137 int nbToWait; 00138 switch (waitForCompletionPolicy.getPolicy()) 00139 { 00140 case WaitForCompletionPolicy.FIRST : 00141 nbToWait = 1; 00142 break; 00143 case WaitForCompletionPolicy.MAJORITY : 00144 nbToWait = nbOfThreads / 2 + 1; 00145 break; 00146 default : 00147 logger 00148 .warn(Translate.get("loadbalancer.waitforcompletion.unsupported")); 00149 case WaitForCompletionPolicy.ALL : 00150 nbToWait = nbOfThreads; 00151 break; 00152 } 00153 return nbToWait; 00154 } 00155 00166 public int execWriteRequest(AbstractWriteRequest request) 00167 throws AllBackendsFailedException, SQLException 00168 { 00169 return ((WriteRequestTask) execWriteRequest(request, false, null)) 00170 .getResult(); 00171 } 00172 00183 public ControllerResultSet execWriteRequestWithKeys( 00184 AbstractWriteRequest request, MetadataCache metadataCache) 00185 throws AllBackendsFailedException, SQLException 00186 { 00187 return ((WriteRequestWithKeysTask) execWriteRequest(request, true, 00188 metadataCache)).getResult(); 00189 } 00190 00204 private AbstractTask execWriteRequest(AbstractWriteRequest request, 00205 boolean useKeys, MetadataCache metadataCache) 00206 throws AllBackendsFailedException, SQLException 00207 { 00208 ArrayList backendThreads; 00209 ReadPrioritaryFIFOWriteLock lock; 00210 00211 // Handle macros 00212 handleMacros(request); 00213 00214 // Determine which list (blocking or not) to use 00215 if (request.mightBlock()) 00216 { // Blocking 00217 backendThreads = backendBlockingThreads; 00218 lock = backendBlockingThreadsRWLock; 00219 } 00220 else 00221 { // Non-blocking 00222 backendThreads = backendNonBlockingThreads; 00223 lock = backendNonBlockingThreadsRWLock; 00224 if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00225 && (request.getTransactionId() != 0)) 00226 waitForAllWritesToComplete(request.getTransactionId()); 00227 } 00228 00229 try 00230 { 00231 lock.acquireRead(); 00232 } 00233 catch (InterruptedException e) 00234 { 00235 String msg = Translate.get( 00236 "loadbalancer.backendlist.acquire.readlock.failed", e); 00237 logger.error(msg); 00238 throw new SQLException(msg); 00239 } 00240 00241 int nbOfThreads = backendThreads.size(); 00242 ArrayList writeList = new ArrayList(); 00243 String tableName = request.getTableName(); 00244 00245 if (request.isCreate()) 00246 { // Choose the backend according to the defined policy 00247 CreateTableRule rule = createTablePolicy.getTableRule(request 00248 .getTableName()); 00249 if (rule == null) 00250 rule = createTablePolicy.getDefaultRule(); 00251 00252 // Ask the rule to pickup the backends 00253 ArrayList chosen; 00254 try 00255 { 00256 chosen = rule.getBackends(vdb.getBackends()); 00257 } 00258 catch (CreateTableException e) 00259 { 00260 throw new SQLException(Translate.get( 00261 "loadbalancer.create.table.rule.failed", e.getMessage())); 00262 } 00263 00264 // Build the thread list from the backend list 00265 if (chosen != null) 00266 { 00267 for (int i = 0; i < nbOfThreads; i++) 00268 { 00269 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00270 .get(i); 00271 if (chosen.contains(thread.getBackend())) 00272 writeList.add(thread); 00273 } 00274 } 00275 } 00276 else 00277 { // Build the list of backends that need to execute this request 00278 for (int i = 0; i < nbOfThreads; i++) 00279 { 00280 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00281 .get(i); 00282 if (thread.getBackend().hasTable(tableName)) 00283 writeList.add(thread); 00284 } 00285 } 00286 00287 nbOfThreads = writeList.size(); 00288 if (nbOfThreads == 0) 00289 { 00290 String msg = Translate.get("loadbalancer.execute.no.backend.found", 00291 request.getSQLShortForm(vdb.getSQLShortFormLength())); 00292 logger.warn(msg); 00293 throw new SQLException(msg); 00294 } 00295 else 00296 logger.debug(Translate.get("loadbalancer.execute.on.several", 00297 new String[]{String.valueOf(request.getId()), 00298 String.valueOf(nbOfThreads)})); 00299 00300 // Create the task 00301 AbstractTask task; 00302 if (useKeys) 00303 task = new WriteRequestWithKeysTask(getNbToWait(nbOfThreads), 00304 nbOfThreads, request, metadataCache); 00305 else 00306 task = new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads, 00307 request); 00308 00309 synchronized (task) 00310 { 00311 if (waitForCompletionPolicy.getPolicy() == WaitForCompletionPolicy.ALL) 00312 { // Post the task in each backendThread tasklist and wakeup the threads 00313 for (int i = 0; i < nbOfThreads; i++) 00314 { 00315 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00316 .get(i); 00317 synchronized (thread) 00318 { 00319 thread.addTask(task); 00320 thread.notify(); 00321 } 00322 } 00323 } 00324 else 00325 { 00326 // We have to first post the request on each backend before letting the 00327 // first backend to execute the request. Therefore we have 2 phases: 00328 // 1. post the task in each thread queue 00329 // 2. notify each thread to execute the query 00330 00331 // 1. Post the task 00332 if (request.mightBlock()) 00333 { 00334 for (int i = 0; i < nbOfThreads; i++) 00335 { 00336 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00337 .get(i); 00338 synchronized (thread) 00339 { 00340 thread.addTask(task, request.getTransactionId()); 00341 } 00342 } 00343 } 00344 else 00345 { 00346 for (int i = 0; i < nbOfThreads; i++) 00347 { 00348 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00349 .get(i); 00350 synchronized (thread) 00351 { 00352 thread.addTask(task); 00353 } 00354 } 00355 } 00356 00357 // 2. Start the task execution on each backend 00358 for (int i = 0; i < nbOfThreads; i++) 00359 { 00360 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00361 .get(i); 00362 synchronized (thread) 00363 { 00364 thread.notify(); 00365 } 00366 } 00367 } 00368 00369 lock.releaseRead(); 00370 00371 // Wait for completion (notified by the task) 00372 try 00373 { 00374 // Wait on task 00375 long timeout = request.getTimeout() * 1000; 00376 if (timeout > 0) 00377 { 00378 long start = System.currentTimeMillis(); 00379 task.wait(timeout); 00380 long end = System.currentTimeMillis(); 00381 long remaining = timeout - (end - start); 00382 if (remaining <= 0) 00383 { 00384 String msg = Translate.get("loadbalancer.request.timeout", 00385 new String[]{String.valueOf(request.getId()), 00386 String.valueOf(task.getSuccess()), 00387 String.valueOf(task.getFailed())}); 00388 00389 // Try to remove the request from the task list 00390 lock.acquireRead(); 00391 nbOfThreads = backendThreads.size(); 00392 for (int i = 0; i < nbOfThreads; i++) 00393 { 00394 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00395 .get(i); 00396 synchronized (thread) 00397 { 00398 thread.removeTask(task); 00399 } 00400 } 00401 lock.releaseRead(); 00402 00403 logger.warn(msg); 00404 throw new SQLException(msg); 00405 } 00406 // No need to update request timeout since the execution is finished 00407 } 00408 else 00409 task.wait(); 00410 } 00411 catch (InterruptedException e) 00412 { 00413 // Try to remove the request from the task list 00414 try 00415 { 00416 lock.acquireRead(); 00417 nbOfThreads = backendThreads.size(); 00418 } 00419 catch (InterruptedException ignore) 00420 { 00421 nbOfThreads = 0; // Give up 00422 } 00423 for (int i = 0; i < nbOfThreads; i++) 00424 { 00425 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00426 .get(i); 00427 synchronized (thread) 00428 { 00429 thread.removeTask(task); 00430 } 00431 } 00432 lock.releaseRead(); 00433 00434 throw new SQLException(Translate.get("loadbalancer.request.timeout", 00435 new String[]{String.valueOf(request.getId()), 00436 String.valueOf(task.getSuccess()), 00437 String.valueOf(task.getFailed())})); 00438 } 00439 00440 if (task.getSuccess() > 0) 00441 return task; 00442 else 00443 { // All tasks failed 00444 ArrayList exceptions = task.getExceptions(); 00445 if (exceptions == null) 00446 throw new AllBackendsFailedException(Translate.get( 00447 "loadbalancer.request.failed.all", request.getId())); 00448 else 00449 { 00450 String errorMsg = Translate.get("loadbalancer.request.failed.stack", 00451 request.getId()) 00452 + "\n"; 00453 for (int i = 0; i < exceptions.size(); i++) 00454 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 00455 logger.error(errorMsg); 00456 throw new SQLException(errorMsg); 00457 } 00458 } 00459 } 00460 } 00461 00470 public abstract ControllerResultSet execReadRequest(SelectRequest request, 00471 MetadataCache metadataCache) throws SQLException; 00472 00482 protected ControllerResultSet executeRequestOnBackend(SelectRequest request, 00483 DatabaseBackend backend, MetadataCache metadataCache) 00484 throws SQLException, UnreachableBackendException 00485 { 00486 // Handle macros 00487 handleMacros(request); 00488 00489 // Ok, we have a backend, let's execute the request 00490 AbstractConnectionManager cm = backend.getConnectionManager(request 00491 .getLogin()); 00492 00493 // Sanity check 00494 if (cm == null) 00495 { 00496 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00497 new String[]{request.getLogin(), backend.getName()}); 00498 logger.error(msg); 00499 throw new SQLException(msg); 00500 } 00501 00502 // Execute the query 00503 if (request.isAutoCommit()) 00504 { 00505 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00506 // We could do something finer grain here by waiting 00507 // only for writes that depend on the tables we need 00508 // but is that really worth the overhead ? 00509 waitForAllWritesToComplete(backend); 00510 00511 ControllerResultSet rs = null; 00512 boolean badConnection; 00513 do 00514 { 00515 badConnection = false; 00516 // Use a connection just for this request 00517 Connection c = null; 00518 try 00519 { 00520 c = cm.getConnection(); 00521 } 00522 catch (UnreachableBackendException e1) 00523 { 00524 logger.error(Translate.get( 00525 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00526 backend.disable(); 00527 throw new UnreachableBackendException(Translate.get( 00528 "loadbalancer.backend.unreacheable", backend.getName())); 00529 } 00530 00531 // Sanity check 00532 if (c == null) 00533 throw new UnreachableBackendException( 00534 "No more connections on backend " + backend.getName()); 00535 00536 // Execute Query 00537 try 00538 { 00539 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 00540 cm.releaseConnection(c); 00541 } 00542 catch (SQLException e) 00543 { 00544 cm.releaseConnection(c); 00545 throw new SQLException(Translate.get( 00546 "loadbalancer.request.failed.on.backend", new String[]{ 00547 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00548 backend.getName(), e.getMessage()})); 00549 } 00550 catch (BadConnectionException e) 00551 { // Get rid of the bad connection 00552 cm.deleteConnection(c); 00553 badConnection = true; 00554 } 00555 } 00556 while (badConnection); 00557 if (logger.isDebugEnabled()) 00558 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00559 String.valueOf(request.getId()), backend.getName()})); 00560 return rs; 00561 } 00562 else 00563 { // Inside a transaction 00564 Connection c; 00565 long tid = request.getTransactionId(); 00566 Long lTid = new Long(tid); 00567 00568 // Wait for previous writes to complete 00569 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00570 waitForAllWritesToComplete(backend, request.getTransactionId()); 00571 00572 if (!backend.isStartedTransaction(lTid)) 00573 { // transaction has not been started yet on this backend 00574 try 00575 { 00576 c = cm.getConnection(tid); 00577 } 00578 catch (UnreachableBackendException e1) 00579 { 00580 logger.error(Translate.get( 00581 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00582 backend.disable(); 00583 throw new UnreachableBackendException(Translate.get( 00584 "loadbalancer.backend.unreacheable", backend.getName())); 00585 } 00586 00587 // Sanity check 00588 if (c == null) 00589 throw new SQLException(Translate.get( 00590 "loadbalancer.unable.get.connection", new String[]{ 00591 String.valueOf(tid), backend.getName()})); 00592 00593 // begin transaction 00594 backend.startTransaction(lTid); 00595 c.setAutoCommit(false); 00596 } 00597 else 00598 { // Re-use the connection used by this transaction 00599 c = cm.retrieveConnection(tid); 00600 00601 // Sanity check 00602 if (c == null) 00603 throw new SQLException(Translate.get( 00604 "loadbalancer.unable.retrieve.connection", new String[]{ 00605 String.valueOf(tid), backend.getName()})); 00606 } 00607 00608 // Execute Query 00609 ControllerResultSet rs = null; 00610 try 00611 { 00612 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 00613 } 00614 catch (SQLException e) 00615 { 00616 throw new SQLException(Translate.get( 00617 "loadbalancer.request.failed.on.backend", new String[]{ 00618 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00619 backend.getName(), e.getMessage()})); 00620 } 00621 catch (BadConnectionException e) 00622 { // Connection failed, so did the transaction 00623 // Disable the backend. 00624 cm.deleteConnection(tid); 00625 String msg = Translate.get( 00626 "loadbalancer.backend.disabling.connection.failure", backend 00627 .getName()); 00628 logger.error(msg); 00629 backend.disable(); 00630 throw new SQLException(msg); 00631 } 00632 if (logger.isDebugEnabled()) 00633 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00634 new String[]{String.valueOf(tid), String.valueOf(request.getId()), 00635 backend.getName()})); 00636 return rs; 00637 } 00638 } 00639 00649 protected ControllerResultSet executeStoredProcedureOnBackend( 00650 StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache) 00651 throws SQLException, UnreachableBackendException 00652 { 00653 // Handle macros 00654 handleMacros(proc); 00655 00656 // Ok, we have a backend, let's execute the request 00657 AbstractConnectionManager cm = backend 00658 .getConnectionManager(proc.getLogin()); 00659 00660 // Sanity check 00661 if (cm == null) 00662 { 00663 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00664 new String[]{proc.getLogin(), backend.getName()}); 00665 logger.error(msg); 00666 throw new SQLException(msg); 00667 } 00668 00669 // Execute the query 00670 if (proc.isAutoCommit()) 00671 { 00672 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00673 // We could do something finer grain here by waiting 00674 // only for writes that depend on the tables we need 00675 // but is that really worth the overhead ? 00676 waitForAllWritesToComplete(backend); 00677 00678 // Use a connection just for this request 00679 Connection c = null; 00680 try 00681 { 00682 c = cm.getConnection(); 00683 } 00684 catch (UnreachableBackendException e1) 00685 { 00686 logger.error(Translate.get( 00687 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00688 backend.disable(); 00689 throw new UnreachableBackendException(Translate.get( 00690 "loadbalancer.backend.unreacheable", backend.getName())); 00691 } 00692 00693 // Sanity check 00694 if (c == null) 00695 throw new SQLException(Translate.get( 00696 "loadbalancer.backend.no.connection", backend.getName())); 00697 00698 // Execute Query 00699 ControllerResultSet rs = null; 00700 try 00701 { 00702 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 00703 backend, c, metadataCache); 00704 } 00705 catch (Exception e) 00706 { 00707 throw new SQLException(Translate.get( 00708 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00709 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00710 backend.getName(), e.getMessage()})); 00711 } 00712 finally 00713 { 00714 cm.releaseConnection(c); 00715 } 00716 if (logger.isDebugEnabled()) 00717 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 00718 new String[]{String.valueOf(proc.getId()), backend.getName()})); 00719 return rs; 00720 } 00721 else 00722 { // Inside a transaction 00723 Connection c; 00724 long tid = proc.getTransactionId(); 00725 Long lTid = new Long(tid); 00726 00727 // Wait for previous writes to complete 00728 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00729 waitForAllWritesToComplete(backend, proc.getTransactionId()); 00730 00731 if (!backend.isStartedTransaction(lTid)) 00732 { // transaction has not been started yet on this backend 00733 try 00734 { 00735 c = cm.getConnection(tid); 00736 } 00737 catch (UnreachableBackendException e1) 00738 { 00739 logger.error(Translate.get( 00740 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00741 backend.disable(); 00742 throw new SQLException(Translate.get( 00743 "loadbalancer.backend.unreacheable", backend.getName())); 00744 } 00745 00746 // Sanity check 00747 if (c == null) 00748 throw new SQLException(Translate.get( 00749 "loadbalancer.unable.get.connection", new String[]{ 00750 String.valueOf(tid), backend.getName()})); 00751 00752 // begin transaction 00753 backend.startTransaction(lTid); 00754 c.setAutoCommit(false); 00755 } 00756 else 00757 { // Re-use the connection used by this transaction 00758 c = cm.retrieveConnection(tid); 00759 00760 // Sanity check 00761 if (c == null) 00762 throw new SQLException(Translate.get( 00763 "loadbalancer.unable.retrieve.connection", new String[]{ 00764 String.valueOf(tid), backend.getName()})); 00765 } 00766 00767 // Execute Query 00768 ControllerResultSet rs; 00769 try 00770 { 00771 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 00772 backend, c, metadataCache); 00773 } 00774 catch (Exception e) 00775 { 00776 throw new SQLException(Translate.get( 00777 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00778 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00779 backend.getName(), e.getMessage()})); 00780 } 00781 if (logger.isDebugEnabled()) 00782 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00783 new String[]{String.valueOf(tid), String.valueOf(proc.getId()), 00784 backend.getName()})); 00785 return rs; 00786 } 00787 } 00788 00793 public ControllerResultSet execReadStoredProcedure(StoredProcedure proc, 00794 MetadataCache metadataCache) throws SQLException 00795 { 00796 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure( 00797 proc, true, metadataCache); 00798 return task.getResult(); 00799 } 00800 00804 public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException 00805 { 00806 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure( 00807 proc, false, null); 00808 return task.getResult(); 00809 } 00810 00821 private AbstractTask callStoredProcedure(StoredProcedure proc, 00822 boolean isRead, MetadataCache metadataCache) throws SQLException 00823 { 00824 ArrayList backendThreads = backendBlockingThreads; 00825 ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock; 00826 00827 try 00828 { 00829 lock.acquireRead(); 00830 } 00831 catch (InterruptedException e) 00832 { 00833 String msg = Translate.get( 00834 "loadbalancer.backendlist.acquire.readlock.failed", e); 00835 logger.error(msg); 00836 throw new SQLException(msg); 00837 } 00838 00839 int nbOfThreads = backendThreads.size(); 00840 00841 // Create the task 00842 AbstractTask task; 00843 if (isRead) 00844 task = new ReadStoredProcedureTask(getNbToWait(nbOfThreads), nbOfThreads, 00845 proc, metadataCache); 00846 else 00847 task = new WriteStoredProcedureTask(getNbToWait(nbOfThreads), 00848 nbOfThreads, proc); 00849 00850 synchronized (task) 00851 { 00852 int nbOfBackends = 0; 00853 00854 // Post the task in each backendThread tasklist and wakeup the threads 00855 for (int i = 0; i < nbOfThreads; i++) 00856 { 00857 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00858 .get(i); 00859 if (thread.getBackend().hasStoredProcedure(proc.getProcedureName())) 00860 { 00861 nbOfBackends++; 00862 synchronized (thread) 00863 { 00864 if ((waitForCompletionPolicy.getPolicy() == WaitForCompletionPolicy.ALL)) 00865 thread.addTask(task); 00866 else 00867 thread.addTask(task, proc.getTransactionId()); 00868 thread.notify(); 00869 } 00870 } 00871 if (nbOfBackends == 0) 00872 throw new SQLException(Translate.get( 00873 "loadbalancer.backend.no.required.storedprocedure", proc 00874 .getProcedureName())); 00875 else 00876 task.setTotalNb(nbOfBackends); 00877 } 00878 00879 lock.releaseRead(); 00880 00881 // Wait for completion (notified by the task) 00882 try 00883 { 00884 // Wait on task 00885 long timeout = proc.getTimeout() * 1000; 00886 if (timeout > 0) 00887 { 00888 long start = System.currentTimeMillis(); 00889 task.wait(timeout); 00890 long end = System.currentTimeMillis(); 00891 long remaining = timeout - (end - start); 00892 if (remaining <= 0) 00893 { 00894 String msg = Translate.get("loadbalancer.storedprocedure.timeout", 00895 new String[]{String.valueOf(proc.getId()), 00896 String.valueOf(task.getSuccess()), 00897 String.valueOf(task.getFailed())}); 00898 00899 // Try to remove the request from the task list 00900 lock.acquireRead(); 00901 nbOfThreads = backendThreads.size(); 00902 for (int i = 0; i < nbOfThreads; i++) 00903 { 00904 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00905 .get(i); 00906 synchronized (thread) 00907 { 00908 thread.removeTask(task); 00909 } 00910 } 00911 lock.releaseRead(); 00912 00913 logger.warn(msg); 00914 throw new SQLException(msg); 00915 } 00916 // No need to update request timeout since the execution is finished 00917 } 00918 else 00919 task.wait(); 00920 } 00921 catch (InterruptedException e) 00922 { 00923 // Try to remove the request from the task list 00924 try 00925 { 00926 lock.acquireRead(); 00927 nbOfThreads = backendThreads.size(); 00928 } 00929 catch (InterruptedException ignore) 00930 { 00931 nbOfThreads = 0; // Give up 00932 } 00933 for (int i = 0; i < nbOfThreads; i++) 00934 { 00935 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00936 .get(i); 00937 synchronized (thread) 00938 { 00939 thread.removeTask(task); 00940 } 00941 } 00942 lock.releaseRead(); 00943 00944 throw new SQLException(Translate.get( 00945 "loadbalancer.storedprocedure.timeout", new String[]{ 00946 String.valueOf(proc.getId()), 00947 String.valueOf(task.getSuccess()), 00948 String.valueOf(task.getFailed())})); 00949 } 00950 00951 if (task.getSuccess() > 0) 00952 return task; 00953 else 00954 { // All tasks failed 00955 ArrayList exceptions = task.getExceptions(); 00956 if (exceptions == null) 00957 throw new SQLException(Translate.get( 00958 "loadbalancer.storedprocedure.all.failed", proc.getId())); 00959 else 00960 { 00961 String errorMsg = Translate.get( 00962 "loadbalancer.storedprocedure.failed.stack", proc.getId()) 00963 + "\n"; 00964 for (int i = 0; i < exceptions.size(); i++) 00965 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 00966 logger.error(errorMsg); 00967 throw new SQLException(errorMsg); 00968 } 00969 } 00970 } 00971 } 00972 00973 /* 00974 * Transaction management 00975 */ 00976 00983 public final void begin(TransactionMarkerMetaData tm) throws SQLException 00984 { 00985 } 00986 00993 public void commit(TransactionMarkerMetaData tm) throws SQLException 00994 { 00995 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00996 waitForAllWritesToComplete(tm.getTransactionId()); 00997 00998 try 00999 { 01000 backendNonBlockingThreadsRWLock.acquireRead(); 01001 } 01002 catch (InterruptedException e) 01003 { 01004 String msg = Translate.get( 01005 "loadbalancer.backendlist.acquire.readlock.failed", e); 01006 logger.error(msg); 01007 throw new SQLException(msg); 01008 } 01009 01010 int nbOfThreads = backendNonBlockingThreads.size(); 01011 ArrayList commitList = new ArrayList(); 01012 Long iTid = new Long(tm.getTransactionId()); 01013 01014 // Build the list of backend that need to commit this transaction 01015 for (int i = 0; i < nbOfThreads; i++) 01016 { 01017 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01018 .get(i); 01019 if (thread.getBackend().isStartedTransaction(iTid)) 01020 commitList.add(thread); 01021 } 01022 01023 nbOfThreads = commitList.size(); 01024 if (nbOfThreads == 0) 01025 { 01026 backendNonBlockingThreadsRWLock.releaseRead(); 01027 return; 01028 } 01029 01030 // Create the task 01031 CommitTask task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm 01032 .getTimeout(), tm.getLogin(), tm.getTransactionId()); 01033 01034 synchronized (task) 01035 { 01036 // Post the task in each backendThread tasklist and wakeup the threads 01037 for (int i = 0; i < nbOfThreads; i++) 01038 { 01039 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 01040 synchronized (thread) 01041 { 01042 thread.addTask(task); 01043 thread.notify(); 01044 } 01045 } 01046 01047 backendNonBlockingThreadsRWLock.releaseRead(); 01048 01049 // Wait for completion (notified by the task) 01050 try 01051 { 01052 // Wait on task 01053 long timeout = tm.getTimeout(); 01054 if (timeout > 0) 01055 { 01056 long start = System.currentTimeMillis(); 01057 task.wait(timeout); 01058 long end = System.currentTimeMillis(); 01059 long remaining = timeout - (end - start); 01060 if (remaining <= 0) 01061 { 01062 String msg = Translate.get("loadbalancer.commit.timeout", 01063 new String[]{String.valueOf(tm.getTransactionId()), 01064 String.valueOf(task.getSuccess()), 01065 String.valueOf(task.getFailed())}); 01066 logger.warn(msg); 01067 throw new SQLException(msg); 01068 } 01069 } 01070 else 01071 task.wait(); 01072 } 01073 catch (InterruptedException e) 01074 { 01075 throw new SQLException(Translate.get("loadbalancer.commit.timeout", 01076 new String[]{String.valueOf(tm.getTransactionId()), 01077 String.valueOf(task.getSuccess()), 01078 String.valueOf(task.getFailed())})); 01079 } 01080 01081 if (task.getSuccess() > 0) 01082 return; 01083 else 01084 { // All tasks failed 01085 ArrayList exceptions = task.getExceptions(); 01086 if (exceptions == null) 01087 throw new SQLException(Translate.get( 01088 "loadbalancer.commit.all.failed", tm.getTransactionId())); 01089 else 01090 { 01091 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 01092 tm.getTransactionId()) 01093 + "\n"; 01094 for (int i = 0; i < exceptions.size(); i++) 01095 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01096 logger.error(errorMsg); 01097 throw new SQLException(errorMsg); 01098 } 01099 } 01100 } 01101 } 01102 01109 public void rollback(TransactionMarkerMetaData tm) throws SQLException 01110 { 01111 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 01112 waitForAllWritesToComplete(tm.getTransactionId()); 01113 01114 try 01115 { 01116 backendNonBlockingThreadsRWLock.acquireRead(); 01117 } 01118 catch (InterruptedException e) 01119 { 01120 String msg = Translate.get( 01121 "loadbalancer.backendlist.acquire.readlock.failed", e); 01122 logger.error(msg); 01123 throw new SQLException(msg); 01124 } 01125 int nbOfThreads = backendNonBlockingThreads.size(); 01126 ArrayList rollbackList = new ArrayList(); 01127 Long iTid = new Long(tm.getTransactionId()); 01128 01129 // Build the list of backend that need to rollback this transaction 01130 for (int i = 0; i < nbOfThreads; i++) 01131 { 01132 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01133 .get(i); 01134 if (thread.getBackend().isStartedTransaction(iTid)) 01135 rollbackList.add(thread); 01136 } 01137 01138 nbOfThreads = rollbackList.size(); 01139 if (nbOfThreads == 0) 01140 { 01141 backendNonBlockingThreadsRWLock.releaseRead(); 01142 return; 01143 } 01144 01145 // Create the task 01146 RollbackTask task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, 01147 tm.getTimeout(), tm.getLogin(), tm.getTransactionId()); 01148 01149 synchronized (task) 01150 { 01151 // Post the task in each backendThread tasklist and wakeup the threads 01152 for (int i = 0; i < nbOfThreads; i++) 01153 { 01154 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 01155 synchronized (thread) 01156 { 01157 thread.addTask(task); 01158 thread.notify(); 01159 } 01160 } 01161 01162 backendNonBlockingThreadsRWLock.releaseRead(); 01163 01164 // Wait for completion (notified by the task) 01165 try 01166 { 01167 // Wait on task 01168 long timeout = tm.getTimeout(); 01169 if (timeout > 0) 01170 { 01171 long start = System.currentTimeMillis(); 01172 task.wait(timeout); 01173 long end = System.currentTimeMillis(); 01174 long remaining = timeout - (end - start); 01175 if (remaining <= 0) 01176 { 01177 String msg = Translate.get("loadbalancer.rollback.timeout", 01178 new String[]{String.valueOf(tm.getTransactionId()), 01179 String.valueOf(task.getSuccess()), 01180 String.valueOf(task.getFailed())}); 01181 logger.warn(msg); 01182 throw new SQLException(msg); 01183 } 01184 } 01185 else 01186 task.wait(); 01187 } 01188 catch (InterruptedException e) 01189 { 01190 throw new SQLException(Translate.get("loadbalancer.rollback.timeout", 01191 new String[]{String.valueOf(tm.getTransactionId()), 01192 String.valueOf(task.getSuccess()), 01193 String.valueOf(task.getFailed())})); 01194 } 01195 01196 if (task.getSuccess() > 0) 01197 return; 01198 else 01199 { // All tasks failed 01200 ArrayList exceptions = task.getExceptions(); 01201 if (exceptions == null) 01202 throw new SQLException(Translate.get( 01203 "loadbalancer.rollback.all.failed", tm.getTransactionId())); 01204 else 01205 { 01206 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 01207 tm.getTransactionId()) 01208 + "\n"; 01209 for (int i = 0; i < exceptions.size(); i++) 01210 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01211 logger.error(errorMsg); 01212 throw new SQLException(errorMsg); 01213 } 01214 } 01215 } 01216 } 01217 01222 protected void waitForAllWritesToComplete(long transactionId) 01223 throws SQLException 01224 { 01225 try 01226 { 01227 backendBlockingThreadsRWLock.acquireRead(); 01228 } 01229 catch (InterruptedException e) 01230 { 01231 String msg = Translate.get( 01232 "loadbalancer.backendlist.acquire.readlock.failed", e); 01233 logger.error(msg); 01234 throw new SQLException(msg); 01235 } 01236 01237 int nbOfThreads = backendBlockingThreads.size(); 01238 01239 for (int i = 0; i < nbOfThreads; i++) 01240 { 01241 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01242 .get(i); 01243 thread.waitForAllTasksToComplete(transactionId); 01244 } 01245 01246 backendBlockingThreadsRWLock.releaseRead(); 01247 } 01248 01256 protected void waitForAllWritesToComplete(DatabaseBackend backend, 01257 long transactionId) throws SQLException 01258 { 01259 try 01260 { 01261 backendBlockingThreadsRWLock.acquireRead(); 01262 } 01263 catch (InterruptedException e) 01264 { 01265 String msg = Translate.get( 01266 "loadbalancer.backendlist.acquire.readlock.failed", e); 01267 logger.error(msg); 01268 throw new SQLException(msg); 01269 } 01270 01271 int nbOfThreads = backendBlockingThreads.size(); 01272 01273 for (int i = 0; i < nbOfThreads; i++) 01274 { 01275 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01276 .get(i); 01277 if (thread.getBackend() == backend) 01278 thread.waitForAllTasksToComplete(transactionId); 01279 } 01280 01281 backendBlockingThreadsRWLock.releaseRead(); 01282 } 01283 01290 protected void waitForAllWritesToComplete(DatabaseBackend backend) 01291 throws SQLException 01292 { 01293 try 01294 { 01295 backendBlockingThreadsRWLock.acquireRead(); 01296 } 01297 catch (InterruptedException e) 01298 { 01299 String msg = Translate.get( 01300 "loadbalancer.backendlist.acquire.readlock.failed", e); 01301 logger.error(msg); 01302 throw new SQLException(msg); 01303 } 01304 01305 int nbOfThreads = backendBlockingThreads.size(); 01306 01307 for (int i = 0; i < nbOfThreads; i++) 01308 { 01309 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01310 .get(i); 01311 if (thread.getBackend() == backend) 01312 thread.waitForAllTasksToComplete(); 01313 } 01314 01315 backendBlockingThreadsRWLock.releaseRead(); 01316 } 01317 01318 /* 01319 * Backends management 01320 */ 01321 01334 public void enableBackend(DatabaseBackend db, boolean writeEnabled) 01335 throws SQLException 01336 { 01337 if (writeEnabled) 01338 { 01339 // Create 2 worker threads 01340 BackendWorkerThread blockingThread = new BackendWorkerThread(db, this); 01341 BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this); 01342 01343 // Add first to the blocking thread list 01344 try 01345 { 01346 backendBlockingThreadsRWLock.acquireWrite(); 01347 } 01348 catch (InterruptedException e) 01349 { 01350 String msg = Translate.get( 01351 "loadbalancer.backendlist.acquire.writelock.failed", e); 01352 logger.error(msg); 01353 throw new SQLException(msg); 01354 } 01355 backendBlockingThreads.add(blockingThread); 01356 backendBlockingThreadsRWLock.releaseWrite(); 01357 blockingThread.start(); 01358 logger.info(Translate.get( 01359 "loadbalancer.backend.workerthread.blocking.add", db.getName())); 01360 01361 // Then add to the non-blocking thread list 01362 try 01363 { 01364 backendNonBlockingThreadsRWLock.acquireWrite(); 01365 } 01366 catch (InterruptedException e) 01367 { 01368 String msg = Translate.get( 01369 "loadbalancer.backendlist.acquire.writelock.failed", e); 01370 logger.error(msg); 01371 throw new SQLException(msg); 01372 } 01373 backendNonBlockingThreads.add(nonBlockingThread); 01374 backendNonBlockingThreadsRWLock.releaseWrite(); 01375 nonBlockingThread.start(); 01376 logger.info(Translate.get( 01377 "loadbalancer.backend.workerthread.non.blocking.add", db.getName())); 01378 db.enableWrite(); 01379 } 01380 01381 if (!db.isInitialized()) 01382 db.initializeConnections(); 01383 db.enableRead(); 01384 } 01385 01397 public synchronized void disableBackend(DatabaseBackend db) 01398 throws SQLException 01399 { 01400 if (db.isWriteEnabled()) 01401 { 01402 // Start with the backendBlockingThread list 01403 try 01404 { 01405 backendBlockingThreadsRWLock.acquireWrite(); 01406 } 01407 catch (InterruptedException e) 01408 { 01409 String msg = Translate.get( 01410 "loadbalancer.backendlist.acquire.writelock.failed", e); 01411 logger.error(msg); 01412 throw new SQLException(msg); 01413 } 01414 01415 int nbOfThreads = backendBlockingThreads.size(); 01416 01417 // Find the right blocking thread 01418 for (int i = 0; i < nbOfThreads; i++) 01419 { 01420 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01421 .get(i); 01422 if (thread.getBackend().equals(db)) 01423 { 01424 logger.info(Translate 01425 .get("loadbalancer.backend.workerthread.blocking.remove", db 01426 .getName())); 01427 01428 backendBlockingThreads.remove(thread); 01429 01430 synchronized (thread) 01431 { 01432 // Kill the thread 01433 thread.addPriorityTask(new KillThreadTask(1, 1)); 01434 thread.notify(); 01435 } 01436 break; 01437 } 01438 } 01439 01440 backendBlockingThreadsRWLock.releaseWrite(); 01441 01442 // Continue with the backendNonBlockingThread list 01443 01444 try 01445 { 01446 backendNonBlockingThreadsRWLock.acquireWrite(); 01447 } 01448 catch (InterruptedException e) 01449 { 01450 String msg = Translate.get( 01451 "loadbalancer.backendlist.acquire.writelock.failed", e); 01452 logger.error(msg); 01453 throw new SQLException(msg); 01454 } 01455 01456 // Find the right non-blocking thread 01457 nbOfThreads = backendNonBlockingThreads.size(); 01458 for (int i = 0; i < nbOfThreads; i++) 01459 { 01460 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01461 .get(i); 01462 if (thread.getBackend().equals(db)) 01463 { 01464 logger.info(Translate.get( 01465 "loadbalancer.backend.workerthread.non.blocking.remove", db 01466 .getName())); 01467 01468 backendNonBlockingThreads.remove(thread); 01469 01470 synchronized (thread) 01471 { 01472 // Kill the thread 01473 thread.addPriorityTask(new KillThreadTask(1, 1)); 01474 thread.notify(); 01475 } 01476 break; 01477 } 01478 } 01479 01480 backendNonBlockingThreadsRWLock.releaseWrite(); 01481 } 01482 01483 db.disable(); 01484 if (db.isInitialized()) 01485 db.finalizeConnections(); 01486 } 01487 01491 public String getXmlImpl() 01492 { 01493 StringBuffer info = new StringBuffer(); 01494 info.append("<" + DatabasesXmlTags.ELT_RAIDb_2 + ">"); 01495 if (createTablePolicy != null) 01496 info.append(createTablePolicy.getXml()); 01497 if (waitForCompletionPolicy != null) 01498 info.append(waitForCompletionPolicy.getXml()); 01499 if (macroHandler != null) 01500 info.append(macroHandler.getXml()); 01501 this.getRaidb2Xml(); 01502 info.append("</" + DatabasesXmlTags.ELT_RAIDb_2 + ">"); 01503 return info.toString(); 01504 } 01505 01511 public abstract String getRaidb2Xml(); 01512 01513 }

CJDBCversion1.0.4に対してTue Oct 12 15:16:02 2004に生成されました。 doxygen 1.3.8