src/org/objectweb/cjdbc/controller/loadbalancer/raidb0/RAIDb0.java

説明を見る。
00001 00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb0; 00026 00027 import java.sql.Connection; 00028 import java.sql.SQLException; 00029 import java.util.ArrayList; 00030 00031 import org.objectweb.cjdbc.common.exceptions.BadConnectionException; 00032 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException; 00033 import org.objectweb.cjdbc.common.i18n.Translate; 00034 import org.objectweb.cjdbc.common.log.Trace; 00035 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 00036 import org.objectweb.cjdbc.common.sql.CreateRequest; 00037 import org.objectweb.cjdbc.common.sql.ParsingGranularities; 00038 import org.objectweb.cjdbc.common.sql.SelectRequest; 00039 import org.objectweb.cjdbc.common.sql.StoredProcedure; 00040 import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema; 00041 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock; 00042 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 00043 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 00044 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache; 00045 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager; 00046 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer; 00047 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread; 00048 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableException; 00049 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy; 00050 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule; 00051 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask; 00052 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask; 00053 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask; 00054 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels; 00055 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData; 00056 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet; 00057 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase; 00058 00070 public class RAIDb0 extends AbstractLoadBalancer 00071 { 00072 /* 00073 * How the code is organized ? 1. Member variables 2. Constructor(s) 3. 00074 * Request handling 4. Transaction handling 5. Backend management 6. 00075 * Debug/Monitoring 00076 */ 00077 00078 private ArrayList backendThreads; 00079 private ReadPrioritaryFIFOWriteLock backendThreadsRWLock = new ReadPrioritaryFIFOWriteLock(); 00080 private CreateTablePolicy createTablePolicy; 00081 00082 protected static Trace logger = Trace 00083 .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb0"); 00084 00085 /* 00086 * Constructors 00087 */ 00088 00097 public RAIDb0(VirtualDatabase vdb, CreateTablePolicy createTablePolicy) 00098 throws SQLException 00099 { 00100 super(vdb, RAIDbLevels.RAIDb0, ParsingGranularities.TABLE); 00101 backendThreads = new ArrayList(); 00102 this.createTablePolicy = createTablePolicy; 00103 } 00104 00105 /* 00106 * Request Handling 00107 */ 00108 00119 public ControllerResultSet execReadRequest(SelectRequest request, 00120 MetadataCache metadataCache) throws SQLException 00121 { 00122 try 00123 { 00124 vdb.acquireReadLockBackendLists(); // Acquire read lock 00125 } 00126 catch (InterruptedException e) 00127 { 00128 String msg = Translate.get( 00129 "loadbalancer.backendlist.acquire.readlock.failed", e); 00130 logger.error(msg); 00131 throw new SQLException(msg); 00132 } 00133 00134 try 00135 { 00136 ControllerResultSet rs = null; 00137 ArrayList fromTables = request.getFrom(); 00138 00139 if (fromTables == null) 00140 throw new SQLException(Translate.get("loadbalancer.from.not.found", 00141 request.getSQLShortForm(vdb.getSQLShortFormLength()))); 00142 00143 // Find the backend that has the needed tables 00144 ArrayList backends = vdb.getBackends(); 00145 int size = backends.size(); 00146 00147 DatabaseBackend backend = null; 00148 // The backend that will execute the query 00149 for (int i = 0; i < size; i++) 00150 { 00151 backend = (DatabaseBackend) backends.get(i); 00152 if (backend.isReadEnabled() && backend.hasTables(fromTables)) 00153 break; 00154 } 00155 if (logger.isDebugEnabled()) 00156 { 00157 logger.debug("Backend " + backend.getName() 00158 + " has all tables which are:"); 00159 for (int i = 0; i < fromTables.size(); i++) 00160 { 00161 logger.debug(fromTables.get(i)); 00162 } 00163 } 00164 00165 // Execute the request on the chosen backend 00166 try 00167 { 00168 rs = executeRequestOnBackend(request, backend, metadataCache); 00169 } 00170 catch (SQLException se) 00171 { 00172 String msg = Translate.get("loadbalancer.request.failed", new String[]{ 00173 String.valueOf(request.getId()), se.getMessage()}); 00174 if (logger.isInfoEnabled()) 00175 logger.info(msg); 00176 throw new SQLException(msg); 00177 } 00178 00179 return rs; 00180 } 00181 catch (RuntimeException e) 00182 { 00183 String msg = Translate 00184 .get("loadbalancer.request.failed", new String[]{ 00185 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00186 e.getMessage()}); 00187 logger.fatal(msg, e); 00188 throw new SQLException(msg); 00189 } 00190 finally 00191 { 00192 vdb.releaseReadLockBackendLists(); // Release the lock 00193 } 00194 } 00195 00204 public int execWriteRequest(AbstractWriteRequest request) throws SQLException 00205 { 00206 // Handle macros 00207 handleMacros(request); 00208 00209 try 00210 { 00211 vdb.acquireReadLockBackendLists(); // Acquire read lock 00212 } 00213 catch (InterruptedException e) 00214 { 00215 String msg = Translate.get( 00216 "loadbalancer.backendlist.acquire.readlock.failed", e); 00217 logger.error(msg); 00218 throw new SQLException(msg); 00219 } 00220 00221 try 00222 { 00223 String table = request.getTableName(); 00224 AbstractConnectionManager cm = null; 00225 00226 if (table == null) 00227 throw new SQLException(Translate.get( 00228 "loadbalancer.request.target.table.not.found", request 00229 .getSQLShortForm(vdb.getSQLShortFormLength()))); 00230 00231 // Find the backend that has the needed table 00232 ArrayList backends = vdb.getBackends(); 00233 int size = backends.size(); 00234 00235 DatabaseBackend backend = null; 00236 // The backend that will execute the query 00237 if (request.isCreate()) 00238 { // Choose the backend according to the defined policy 00239 CreateTableRule rule = createTablePolicy.getTableRule(request 00240 .getTableName()); 00241 if (rule == null) 00242 rule = createTablePolicy.getDefaultRule(); 00243 00244 // Ask the rule to pickup a backend 00245 ArrayList choosen; 00246 try 00247 { 00248 choosen = rule.getBackends(backends); 00249 } 00250 catch (CreateTableException e) 00251 { 00252 throw new SQLException(Translate.get( 00253 "loadbalancer.create.table.rule.failed", e.getMessage())); 00254 } 00255 00256 // Get the connection manager from the chosen backend 00257 if (choosen != null) 00258 backend = (DatabaseBackend) choosen.get(0); 00259 if (backend != null) 00260 cm = backend.getConnectionManager(request.getLogin()); 00261 } 00262 else 00263 { // Find the backend that has the table 00264 for (int i = 0; i < size; i++) 00265 { 00266 backend = (DatabaseBackend) backends.get(i); 00267 if (backend.isWriteEnabled() && backend.hasTable(table)) 00268 { 00269 cm = backend.getConnectionManager(request.getLogin()); 00270 break; 00271 } 00272 } 00273 } 00274 00275 // Sanity check 00276 if (cm == null) 00277 throw new SQLException(Translate.get( 00278 "loadbalancer.backend.no.required.table", table)); 00279 00280 // Ok, let's execute the query 00281 00282 if (request.isAutoCommit()) 00283 { 00284 // We do not execute request outside the already open transactions if we 00285 // are disabling the backend. 00286 if (backend.isDisabling()) 00287 throw new SQLException(Translate.get( 00288 "loadbalancer.backend.is.disabling", new String[]{ 00289 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00290 backend.getName()})); 00291 00292 // Use a connection just for this request 00293 Connection c = null; 00294 try 00295 { 00296 c = cm.getConnection(); 00297 } 00298 catch (UnreachableBackendException e1) 00299 { 00300 logger.error(Translate.get( 00301 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00302 backend.disable(); 00303 throw new SQLException(Translate.get( 00304 "loadbalancer.backend.unreacheable", backend.getName())); 00305 } 00306 00307 // Sanity check 00308 if (c == null) 00309 throw new SQLException(Translate.get( 00310 "loadbalancer.backend.no.connection", backend.getName())); 00311 00312 int result; 00313 try 00314 { 00315 result = executeUpdateRequestOnBackend(request, backend, c); 00316 updateSchema(backend, request); 00317 } 00318 catch (Exception e) 00319 { 00320 throw new SQLException(Translate.get("loadbalancer.request.failed", 00321 new String[]{ 00322 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00323 e.getMessage()})); 00324 } 00325 finally 00326 { 00327 cm.releaseConnection(c); 00328 } 00329 if (logger.isDebugEnabled()) 00330 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00331 String.valueOf(request.getId()), backend.getName()})); 00332 return result; 00333 } 00334 else 00335 { // Inside a transaction 00336 Connection c; 00337 long tid = request.getTransactionId(); 00338 Long lTid = new Long(tid); 00339 00340 if (!backend.isStartedTransaction(lTid)) 00341 { 00342 // We do not accept new transactions if we are disabling the backend 00343 if (backend.isDisabling()) 00344 throw new SQLException(Translate.get( 00345 "loadbalancer.backend.is.disabling", new String[]{ 00346 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00347 backend.getName()})); 00348 00349 // transaction has not been started yet on this backend 00350 try 00351 { 00352 c = cm.getConnection(tid); 00353 } 00354 catch (UnreachableBackendException e1) 00355 { 00356 logger.error(Translate 00357 .get("loadbalancer.backend.disabling.unreachable", backend 00358 .getName())); 00359 backend.disable(); 00360 throw new SQLException(Translate.get( 00361 "loadbalancer.backend.unreacheable", backend.getName())); 00362 } 00363 00364 // Sanity check 00365 if (c == null) 00366 throw new SQLException(Translate.get( 00367 "loadbalancer.unable.get.connection", new String[]{ 00368 String.valueOf(tid), backend.getName()})); 00369 00370 // begin transaction 00371 backend.startTransaction(lTid); 00372 c.setAutoCommit(false); 00373 } 00374 else 00375 { // Re-use the connection used by this transaction 00376 c = cm.retrieveConnection(tid); 00377 00378 // Sanity check 00379 if (c == null) 00380 throw new SQLException(Translate.get( 00381 "loadbalancer.unable.retrieve.connection", new String[]{ 00382 String.valueOf(tid), backend.getName()})); 00383 } 00384 00385 // Execute the query 00386 int result; 00387 try 00388 { 00389 result = executeUpdateRequestOnBackend(request, backend, c); 00390 updateSchema(backend, request); 00391 } 00392 catch (Exception e) 00393 { 00394 throw new SQLException(Translate.get("loadbalancer.request.failed", 00395 new String[]{ 00396 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00397 e.getMessage()})); 00398 } 00399 if (logger.isDebugEnabled()) 00400 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00401 String.valueOf(request.getId()), backend.getName()})); 00402 return result; 00403 } 00404 } 00405 catch (RuntimeException e) 00406 { 00407 String msg = Translate 00408 .get("loadbalancer.request.failed", new String[]{ 00409 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00410 e.getMessage()}); 00411 logger.fatal(msg, e); 00412 throw new SQLException(msg); 00413 } 00414 finally 00415 { 00416 vdb.releaseReadLockBackendLists(); // Release the lock 00417 } 00418 } 00419 00421 private void updateSchema(DatabaseBackend b, AbstractWriteRequest request) 00422 { 00423 DatabaseSchema dbs = b.getDatabaseSchema(); 00424 if (dbs == null) 00425 return; 00426 00427 if (request.isCreate()) 00428 { 00429 // Add the table to the schema 00430 dbs.addTable(((CreateRequest) request).getDatabaseTable()); 00431 if (logger.isDebugEnabled()) 00432 logger.debug(Translate.get("loadbalancer.schema.add.table", request 00433 .getTableName())); 00434 } 00435 else if (request.isDrop()) 00436 { 00437 // Delete the table from the schema 00438 dbs.removeTable(dbs.getTable(request.getTableName())); 00439 if (logger.isDebugEnabled()) 00440 logger.debug(Translate.get("loadbalancer.schema.remove.table", request 00441 .getTableName())); 00442 } 00443 } 00444 00449 public ControllerResultSet execWriteRequestWithKeys( 00450 AbstractWriteRequest request, MetadataCache metadataCache) 00451 throws SQLException 00452 { 00453 // Handle macros 00454 handleMacros(request); 00455 00456 try 00457 { 00458 vdb.acquireReadLockBackendLists(); // Acquire read lock 00459 } 00460 catch (InterruptedException e) 00461 { 00462 String msg = Translate.get( 00463 "loadbalancer.backendlist.acquire.readlock.failed", e); 00464 logger.error(msg); 00465 throw new SQLException(msg); 00466 } 00467 00468 try 00469 { 00470 String table = request.getTableName(); 00471 AbstractConnectionManager cm = null; 00472 00473 if (table == null) 00474 throw new SQLException(Translate.get( 00475 "loadbalancer.request.target.table.not.found", request 00476 .getSQLShortForm(vdb.getSQLShortFormLength()))); 00477 00478 // Find the backend that has the needed table 00479 ArrayList backends = vdb.getBackends(); 00480 int size = backends.size(); 00481 00482 DatabaseBackend backend = null; 00483 // The backend that will execute the query 00484 if (request.isCreate()) 00485 { // Choose the backend according to the defined policy 00486 CreateTableRule rule = createTablePolicy.getTableRule(request 00487 .getTableName()); 00488 if (rule == null) 00489 rule = createTablePolicy.getDefaultRule(); 00490 00491 // Ask the rule to pickup a backend 00492 ArrayList choosen; 00493 try 00494 { 00495 choosen = rule.getBackends(backends); 00496 } 00497 catch (CreateTableException e) 00498 { 00499 throw new SQLException(Translate.get( 00500 "loadbalancer.create.table.rule.failed", e.getMessage())); 00501 } 00502 00503 // Get the connection manager from the chosen backend 00504 if (choosen != null) 00505 backend = (DatabaseBackend) choosen.get(0); 00506 if (backend != null) 00507 cm = backend.getConnectionManager(request.getLogin()); 00508 } 00509 else 00510 { // Find the backend that has the table 00511 for (int i = 0; i < size; i++) 00512 { 00513 backend = (DatabaseBackend) backends.get(i); 00514 if (backend.isWriteEnabled() && backend.hasTable(table)) 00515 { 00516 cm = backend.getConnectionManager(request.getLogin()); 00517 break; 00518 } 00519 } 00520 } 00521 00522 // Sanity check 00523 if (cm == null) 00524 throw new SQLException(Translate.get( 00525 "loadbalancer.backend.no.required.table", table)); 00526 00527 if (!backend.getDriverCompliance().supportGetGeneratedKeys()) 00528 throw new SQLException(Translate.get( 00529 "loadbalancer.backend.autogeneratedkeys.unsupported", backend 00530 .getName())); 00531 00532 // Ok, let's execute the query 00533 00534 if (request.isAutoCommit()) 00535 { 00536 // We do not execute request outside the already open transactions if we 00537 // are disabling the backend. 00538 if (backend.isDisabling()) 00539 throw new SQLException(Translate.get( 00540 "loadbalancer.backend.is.disabling", new String[]{ 00541 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00542 backend.getName()})); 00543 00544 // Use a connection just for this request 00545 Connection c = null; 00546 try 00547 { 00548 c = cm.getConnection(); 00549 } 00550 catch (UnreachableBackendException e1) 00551 { 00552 logger.error(Translate.get( 00553 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00554 backend.disable(); 00555 throw new SQLException(Translate.get( 00556 "loadbalancer.backend.unreacheable", backend.getName())); 00557 } 00558 00559 // Sanity check 00560 if (c == null) 00561 throw new SQLException(Translate.get( 00562 "loadbalancer.backend.no.connection", backend.getName())); 00563 00564 // Execute Query 00565 ControllerResultSet result; 00566 try 00567 { 00568 result = new ControllerResultSet(request, 00569 executeUpdateRequestOnBackendWithKeys(request, backend, c), 00570 metadataCache); 00571 updateSchema(backend, request); 00572 } 00573 catch (Exception e) 00574 { 00575 throw new SQLException(Translate.get("loadbalancer.request.failed", 00576 new String[]{ 00577 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00578 e.getMessage()})); 00579 } 00580 finally 00581 { 00582 backend.removePendingRequest(request); 00583 cm.releaseConnection(c); 00584 } 00585 if (logger.isDebugEnabled()) 00586 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00587 String.valueOf(request.getId()), backend.getName()})); 00588 return result; 00589 } 00590 else 00591 { // Inside a transaction 00592 Connection c; 00593 long tid = request.getTransactionId(); 00594 Long lTid = new Long(tid); 00595 00596 if (!backend.isStartedTransaction(lTid)) 00597 { 00598 // We do not accept new transactions if we are disabling the backend 00599 if (backend.isDisabling()) 00600 throw new SQLException(Translate.get( 00601 "loadbalancer.backend.is.disabling", new String[]{ 00602 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00603 backend.getName()})); 00604 00605 // transaction has not been started yet on this backend 00606 try 00607 { 00608 c = cm.getConnection(tid); 00609 } 00610 catch (UnreachableBackendException e1) 00611 { 00612 logger.error(Translate 00613 .get("loadbalancer.backend.disabling.unreachable", backend 00614 .getName())); 00615 backend.disable(); 00616 throw new SQLException(Translate.get( 00617 "loadbalancer.backend.unreacheable", backend.getName())); 00618 } 00619 00620 // Sanity check 00621 if (c == null) 00622 throw new SQLException(Translate.get( 00623 "loadbalancer.unable.get.connection", new String[]{ 00624 String.valueOf(tid), backend.getName()})); 00625 00626 // begin transaction 00627 backend.startTransaction(lTid); 00628 c.setAutoCommit(false); 00629 } 00630 else 00631 { // Re-use the connection used by this transaction 00632 c = cm.retrieveConnection(tid); 00633 00634 // Sanity check 00635 if (c == null) 00636 throw new SQLException(Translate.get( 00637 "loadbalancer.unable.retrieve.connection", new String[]{ 00638 String.valueOf(tid), backend.getName()})); 00639 } 00640 00641 // Execute the query 00642 ControllerResultSet result; 00643 try 00644 { 00645 result = new ControllerResultSet(request, 00646 executeUpdateRequestOnBackendWithKeys(request, backend, c), 00647 metadataCache); 00648 updateSchema(backend, request); 00649 } 00650 catch (Exception e) 00651 { 00652 throw new SQLException(Translate.get("loadbalancer.request.failed", 00653 new String[]{ 00654 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00655 e.getMessage()})); 00656 } 00657 if (logger.isDebugEnabled()) 00658 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00659 String.valueOf(request.getId()), backend.getName()})); 00660 return result; 00661 } 00662 } 00663 catch (RuntimeException e) 00664 { 00665 String msg = Translate 00666 .get("loadbalancer.request.failed", new String[]{ 00667 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00668 e.getMessage()}); 00669 logger.fatal(msg, e); 00670 throw new SQLException(msg); 00671 } 00672 finally 00673 { 00674 vdb.releaseReadLockBackendLists(); // Release the lock 00675 } 00676 } 00677 00687 protected ControllerResultSet executeRequestOnBackend(SelectRequest request, 00688 DatabaseBackend backend, MetadataCache metadataCache) throws SQLException 00689 { 00690 // Handle macros 00691 handleMacros(request); 00692 00693 // Ok, we have a backend, let's execute the request 00694 AbstractConnectionManager cm = backend.getConnectionManager(request 00695 .getLogin()); 00696 00697 // Sanity check 00698 if (cm == null) 00699 { 00700 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00701 new String[]{request.getLogin(), backend.getName()}); 00702 logger.error(msg); 00703 throw new SQLException(msg); 00704 } 00705 00706 // Execute the query 00707 if (request.isAutoCommit()) 00708 { 00709 ControllerResultSet rs = null; 00710 boolean badConnection; 00711 do 00712 { 00713 badConnection = false; 00714 // Use a connection just for this request 00715 Connection c = null; 00716 try 00717 { 00718 c = cm.getConnection(); 00719 } 00720 catch (UnreachableBackendException e1) 00721 { 00722 logger.error(Translate.get( 00723 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00724 backend.disable(); 00725 throw new SQLException(Translate.get( 00726 "loadbalancer.backend.unreacheable", backend.getName())); 00727 } 00728 00729 // Sanity check 00730 if (c == null) 00731 throw new SQLException(Translate.get( 00732 "loadbalancer.backend.no.connection", backend.getName())); 00733 00734 // Execute Query 00735 try 00736 { 00737 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 00738 cm.releaseConnection(c); 00739 } 00740 catch (SQLException e) 00741 { 00742 cm.releaseConnection(c); 00743 throw new SQLException(Translate.get( 00744 "loadbalancer.request.failed.on.backend", new String[]{ 00745 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00746 backend.getName(), e.getMessage()})); 00747 } 00748 catch (BadConnectionException e) 00749 { // Get rid of the bad connection 00750 cm.deleteConnection(c); 00751 badConnection = true; 00752 } 00753 } 00754 while (badConnection); 00755 if (logger.isDebugEnabled()) 00756 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00757 String.valueOf(request.getId()), backend.getName()})); 00758 return rs; 00759 } 00760 else 00761 { // Inside a transaction 00762 Connection c; 00763 long tid = request.getTransactionId(); 00764 Long lTid = new Long(tid); 00765 00766 if (!backend.isStartedTransaction(lTid)) 00767 { // transaction has not been started yet on this backend 00768 try 00769 { 00770 c = cm.getConnection(tid); 00771 } 00772 catch (UnreachableBackendException e1) 00773 { 00774 logger.error(Translate.get( 00775 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00776 backend.disable(); 00777 throw new SQLException(Translate.get( 00778 "loadbalancer.backend.unreacheable", backend.getName())); 00779 } 00780 00781 // Sanity check 00782 if (c == null) 00783 throw new SQLException(Translate.get( 00784 "loadbalancer.unable.get.connection", new String[]{ 00785 String.valueOf(tid), backend.getName()})); 00786 00787 // begin transaction 00788 backend.startTransaction(lTid); 00789 c.setAutoCommit(false); 00790 } 00791 else 00792 { // Re-use the connection used by this transaction 00793 c = cm.retrieveConnection(tid); 00794 00795 // Sanity check 00796 if (c == null) 00797 throw new SQLException(Translate.get( 00798 "loadbalancer.unable.retrieve.connection", new String[]{ 00799 String.valueOf(tid), backend.getName()})); 00800 } 00801 00802 // Execute Query 00803 ControllerResultSet rs = null; 00804 try 00805 { 00806 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 00807 } 00808 catch (SQLException e) 00809 { 00810 throw new SQLException(Translate.get( 00811 "loadbalancer.request.failed.on.backend", new String[]{ 00812 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00813 backend.getName(), e.getMessage()})); 00814 } 00815 catch (BadConnectionException e) 00816 { // Get rid of the bad connection 00817 cm.deleteConnection(tid); 00818 throw new SQLException(Translate 00819 .get("loadbalancer.connection.failed", new String[]{ 00820 String.valueOf(tid), backend.getName(), e.getMessage()})); 00821 } 00822 if (logger.isDebugEnabled()) 00823 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00824 new String[]{String.valueOf(tid), String.valueOf(request.getId()), 00825 backend.getName()})); 00826 return rs; 00827 } 00828 } 00829 00834 public ControllerResultSet execReadOnlyReadStoredProcedure( 00835 StoredProcedure proc, MetadataCache metadataCache) throws SQLException 00836 { 00837 throw new SQLException( 00838 "Stored procedure calls are not supported with RAIDb-0 load balancers."); 00839 } 00840 00845 public ControllerResultSet execReadStoredProcedure(StoredProcedure proc, 00846 MetadataCache metadataCache) throws SQLException 00847 { 00848 throw new SQLException( 00849 "Stored procedure calls are not supported with RAIDb-0 load balancers."); 00850 } 00851 00855 public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException 00856 { 00857 throw new SQLException( 00858 "Stored procedure calls are not supported with RAIDb-0 load balancers."); 00859 } 00860 00861 /* 00862 * Transaction management 00863 */ 00864 00871 public final void begin(TransactionMarkerMetaData tm) throws SQLException 00872 { 00873 } 00874 00881 public void commit(TransactionMarkerMetaData tm) throws SQLException 00882 { 00883 try 00884 { 00885 backendThreadsRWLock.acquireRead(); 00886 } 00887 catch (InterruptedException e) 00888 { 00889 String msg = Translate.get( 00890 "loadbalancer.backendlist.acquire.readlock.failed", e); 00891 logger.error(msg); 00892 throw new SQLException(msg); 00893 } 00894 00895 int nbOfThreads = backendThreads.size(); 00896 ArrayList commitList = new ArrayList(); 00897 Long lTid = new Long(tm.getTransactionId()); 00898 00899 // Build the list of backend that need to commit this transaction 00900 for (int i = 0; i < nbOfThreads; i++) 00901 { 00902 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i); 00903 if (thread.getBackend().isStartedTransaction(lTid)) 00904 commitList.add(thread); 00905 } 00906 00907 nbOfThreads = commitList.size(); 00908 // Create the task 00909 CommitTask task = new CommitTask(nbOfThreads, // Wait for all to commit 00910 nbOfThreads, tm.getTimeout(), tm.getLogin(), tm.getTransactionId()); 00911 00912 synchronized (task) 00913 { 00914 // Post the task in each backendThread tasklist and wakeup the threads 00915 for (int i = 0; i < nbOfThreads; i++) 00916 { 00917 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 00918 synchronized (thread) 00919 { 00920 thread.addTask(task); 00921 thread.notify(); 00922 } 00923 } 00924 00925 backendThreadsRWLock.releaseRead(); 00926 00927 // Wait for completion (notified by the task) 00928 try 00929 { 00930 // Wait on task 00931 long timeout = tm.getTimeout(); 00932 if (timeout > 0) 00933 { 00934 long start = System.currentTimeMillis(); 00935 task.wait(timeout); 00936 long end = System.currentTimeMillis(); 00937 long remaining = timeout - (end - start); 00938 if (remaining <= 0) 00939 { 00940 String msg = Translate.get("loadbalancer.commit.timeout", 00941 new String[]{String.valueOf(tm.getTransactionId()), 00942 String.valueOf(task.getSuccess()), 00943 String.valueOf(task.getFailed())}); 00944 logger.warn(msg); 00945 throw new SQLException(msg); 00946 } 00947 } 00948 else 00949 task.wait(); 00950 } 00951 catch (InterruptedException e) 00952 { 00953 throw new SQLException(Translate.get("loadbalancer.commit.timeout", 00954 new String[]{String.valueOf(tm.getTransactionId()), 00955 String.valueOf(task.getSuccess()), 00956 String.valueOf(task.getFailed())})); 00957 } 00958 00959 if (task.getSuccess() > 0) 00960 return; 00961 else 00962 { // All tasks failed 00963 ArrayList exceptions = task.getExceptions(); 00964 if (exceptions == null) 00965 throw new SQLException(Translate.get( 00966 "loadbalancer.commit.all.failed", tm.getTransactionId())); 00967 else 00968 { 00969 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 00970 tm.getTransactionId()) 00971 + "\n"; 00972 for (int i = 0; i < exceptions.size(); i++) 00973 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 00974 logger.error(errorMsg); 00975 throw new SQLException(errorMsg); 00976 } 00977 } 00978 } 00979 } 00980 00987 public void rollback(TransactionMarkerMetaData tm) throws SQLException 00988 { 00989 try 00990 { 00991 backendThreadsRWLock.acquireRead(); 00992 } 00993 catch (InterruptedException e) 00994 { 00995 String msg = Translate.get( 00996 "loadbalancer.backendlist.acquire.readlock.failed", e); 00997 logger.error(msg); 00998 throw new SQLException(msg); 00999 } 01000 int nbOfThreads = backendThreads.size(); 01001 ArrayList rollbackList = new ArrayList(); 01002 Long lTid = new Long(tm.getTransactionId()); 01003 01004 // Build the list of backend that need to rollback this transaction 01005 for (int i = 0; i < nbOfThreads; i++) 01006 { 01007 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i); 01008 if (thread.getBackend().isStartedTransaction(lTid)) 01009 rollbackList.add(thread); 01010 } 01011 01012 nbOfThreads = rollbackList.size(); 01013 01014 // Create the task 01015 RollbackTask task = new RollbackTask(nbOfThreads, // Wait for all to 01016 // rollback 01017 nbOfThreads, tm.getTimeout(), tm.getLogin(), tm.getTransactionId()); 01018 01019 synchronized (task) 01020 { 01021 // Post the task in each backendThread tasklist and wakeup the threads 01022 for (int i = 0; i < nbOfThreads; i++) 01023 { 01024 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 01025 synchronized (thread) 01026 { 01027 thread.addTask(task); 01028 thread.notify(); 01029 } 01030 } 01031 01032 backendThreadsRWLock.releaseRead(); 01033 01034 // Wait for completion (notified by the task) 01035 try 01036 { 01037 // Wait on task 01038 long timeout = tm.getTimeout(); 01039 if (timeout > 0) 01040 { 01041 long start = System.currentTimeMillis(); 01042 task.wait(timeout); 01043 long end = System.currentTimeMillis(); 01044 long remaining = timeout - (end - start); 01045 if (remaining <= 0) 01046 { 01047 String msg = Translate.get("loadbalancer.rollback.timeout", 01048 new String[]{String.valueOf(tm.getTransactionId()), 01049 String.valueOf(task.getSuccess()), 01050 String.valueOf(task.getFailed())}); 01051 logger.warn(msg); 01052 throw new SQLException(msg); 01053 } 01054 } 01055 else 01056 task.wait(); 01057 } 01058 catch (InterruptedException e) 01059 { 01060 throw new SQLException(Translate.get("loadbalancer.rollback.timeout", 01061 new String[]{String.valueOf(tm.getTransactionId()), 01062 String.valueOf(task.getSuccess()), 01063 String.valueOf(task.getFailed())})); 01064 } 01065 01066 if (task.getSuccess() > 0) 01067 return; 01068 else 01069 { // All tasks failed 01070 ArrayList exceptions = task.getExceptions(); 01071 if (exceptions == null) 01072 throw new SQLException(Translate.get( 01073 "loadbalancer.rollback.all.failed", tm.getTransactionId())); 01074 else 01075 { 01076 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 01077 tm.getTransactionId()) 01078 + "\n"; 01079 for (int i = 0; i < exceptions.size(); i++) 01080 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01081 logger.error(errorMsg); 01082 throw new SQLException(errorMsg); 01083 } 01084 } 01085 } 01086 } 01087 01088 /* 01089 * Backends management 01090 */ 01091 01104 public void enableBackend(DatabaseBackend db, boolean writeEnabled) 01105 throws SQLException 01106 { 01107 // Create a worker thread and add it to the list 01108 BackendWorkerThread thread = new BackendWorkerThread(db, this); 01109 try 01110 { 01111 backendThreadsRWLock.acquireWrite(); 01112 } 01113 catch (InterruptedException e) 01114 { 01115 String msg = Translate.get( 01116 "loadbalancer.backendlist.acquire.writelock.failed", e); 01117 logger.error(msg); 01118 throw new SQLException(msg); 01119 } 01120 backendThreads.add(thread); 01121 backendThreadsRWLock.releaseWrite(); 01122 thread.start(); 01123 logger.info(Translate.get("loadbalancer.backend.workerthread.add", db 01124 .getName())); 01125 01126 if (!db.isInitialized()) 01127 db.initializeConnections(); 01128 db.enableRead(); 01129 if (writeEnabled) 01130 db.enableWrite(); 01131 } 01132 01144 public synchronized void disableBackend(DatabaseBackend db) 01145 throws SQLException 01146 { 01147 int nbOfThreads = backendThreads.size(); 01148 01149 // Find the right thread 01150 for (int i = 0; i < nbOfThreads; i++) 01151 { 01152 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i); 01153 if (thread.getBackend().equals(db)) 01154 { 01155 logger.info(Translate.get("loadbalancer.backend.workerthread.remove", 01156 db.getName())); 01157 01158 // Remove it from the backendThread list 01159 try 01160 { 01161 backendThreadsRWLock.acquireWrite(); 01162 } 01163 catch (InterruptedException e) 01164 { 01165 String msg = Translate.get( 01166 "loadbalancer.backendlist.acquire.writelock.failed", e); 01167 logger.error(msg); 01168 throw new SQLException(msg); 01169 } 01170 backendThreads.remove(thread); 01171 backendThreadsRWLock.releaseWrite(); 01172 01173 synchronized (thread) 01174 { 01175 // Kill the thread 01176 thread.addPriorityTask(new KillThreadTask(1, 1)); 01177 thread.notify(); 01178 } 01179 break; 01180 } 01181 } 01182 01183 db.disable(); 01184 if (db.isInitialized()) 01185 db.finalizeConnections(); 01186 } 01187 01192 public void setWeight(String name, int w) throws SQLException 01193 { 01194 throw new SQLException("Weight is not supported with this load balancer"); 01195 } 01196 01197 /* 01198 * Debug/Monitoring 01199 */ 01200 01206 public String getInformation() 01207 { 01208 return "RAIDb-0 Request load balancer\n"; 01209 } 01210 01214 public String getXmlImpl() 01215 { 01216 StringBuffer info = new StringBuffer(); 01217 info.append("<" + DatabasesXmlTags.ELT_RAIDb_0 + ">"); 01218 createTablePolicy.getXml(); 01219 info.append("</" + DatabasesXmlTags.ELT_RAIDb_0 + ">"); 01220 return info.toString(); 01221 } 01222 01223 }

CJDBCversion1.0.4に対してTue Oct 12 15:16:02 2004に生成されました。 doxygen 1.3.8