src/org/objectweb/cjdbc/controller/loadbalancer/paralleldb/ParallelDB.java

説明を見る。
00001 00025 package org.objectweb.cjdbc.controller.loadbalancer.paralleldb; 00026 00027 import java.sql.Connection; 00028 import java.sql.SQLException; 00029 import java.util.Hashtable; 00030 00031 import org.objectweb.cjdbc.common.exceptions.BadConnectionException; 00032 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException; 00033 import org.objectweb.cjdbc.common.i18n.Translate; 00034 import org.objectweb.cjdbc.common.sql.AbstractRequest; 00035 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 00036 import org.objectweb.cjdbc.common.sql.ParsingGranularities; 00037 import org.objectweb.cjdbc.common.sql.SelectRequest; 00038 import org.objectweb.cjdbc.common.sql.StoredProcedure; 00039 import org.objectweb.cjdbc.common.sql.UnknownRequest; 00040 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 00041 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 00042 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache; 00043 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager; 00044 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer; 00045 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException; 00046 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels; 00047 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData; 00048 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet; 00049 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase; 00050 00068 public abstract class ParallelDB extends AbstractLoadBalancer 00069 { 00070 //transaction id -> DatabaseBackend 00071 private Hashtable backendPerTransactionId; 00072 00080 public ParallelDB(VirtualDatabase vdb) throws SQLException 00081 { 00082 super(vdb, RAIDbLevels.SingleDB, ParsingGranularities.NO_PARSING); 00083 } 00084 00089 public ControllerResultSet execReadRequest(SelectRequest request, 00090 MetadataCache metadataCache) throws SQLException 00091 { 00092 DatabaseBackend backend; 00093 if (request.isAutoCommit()) 00094 backend = chooseBackendForReadRequest(request); 00095 else 00096 backend = (DatabaseBackend) backendPerTransactionId.get(new Long(request 00097 .getTransactionId())); 00098 00099 if (backend == null) 00100 throw new SQLException(Translate.get( 00101 "loadbalancer.execute.no.backend.found", request.getSQLShortForm(vdb 00102 .getSQLShortFormLength()))); 00103 00104 ControllerResultSet rs = null; 00105 // Execute the request on the chosen backend 00106 try 00107 { 00108 rs = executeReadRequestOnBackend(request, backend, metadataCache); 00109 } 00110 catch (UnreachableBackendException urbe) 00111 { 00112 // Try to execute query on different backend 00113 return execReadRequest(request, metadataCache); 00114 } 00115 catch (SQLException se) 00116 { 00117 String msg = Translate.get("loadbalancer.request.failed", new String[]{ 00118 String.valueOf(request.getId()), se.getMessage()}); 00119 if (logger.isInfoEnabled()) 00120 logger.info(msg); 00121 throw new SQLException(msg); 00122 } 00123 catch (RuntimeException e) 00124 { 00125 String msg = Translate.get("loadbalancer.request.failed.on.backend", 00126 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()), 00127 backend.getName(), e.getMessage()}); 00128 logger.error(msg, e); 00129 throw new SQLException(msg); 00130 } 00131 00132 return rs; 00133 } 00134 00139 public ControllerResultSet execReadOnlyReadStoredProcedure( 00140 StoredProcedure proc, MetadataCache metadataCache) throws SQLException 00141 { 00142 return execReadStoredProcedure(proc, metadataCache); 00143 } 00144 00149 public ControllerResultSet execReadStoredProcedure(StoredProcedure proc, 00150 MetadataCache metadataCache) throws SQLException 00151 { 00152 DatabaseBackend backend; 00153 if (proc.isAutoCommit()) 00154 backend = chooseBackendForReadRequest(proc); 00155 else 00156 backend = (DatabaseBackend) backendPerTransactionId.get(new Long(proc 00157 .getTransactionId())); 00158 00159 if (backend == null) 00160 throw new SQLException(Translate.get( 00161 "loadbalancer.storedprocedure.no.backend.found", proc 00162 .getSQLShortForm(vdb.getSQLShortFormLength()))); 00163 00164 ControllerResultSet rs = null; 00165 // Execute the request on the chosen backend 00166 try 00167 { 00168 rs = executeReadStoredProcedureOnBackend(proc, backend, metadataCache); 00169 } 00170 catch (UnreachableBackendException urbe) 00171 { 00172 // Try to execute query on different backend 00173 return execReadStoredProcedure(proc, metadataCache); 00174 } 00175 catch (SQLException se) 00176 { 00177 String msg = Translate.get("loadbalancer.storedprocedure.failed", 00178 new String[]{String.valueOf(proc.getId()), se.getMessage()}); 00179 if (logger.isInfoEnabled()) 00180 logger.info(msg); 00181 throw new SQLException(msg); 00182 } 00183 catch (RuntimeException e) 00184 { 00185 String msg = Translate.get( 00186 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00187 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00188 backend.getName(), e.getMessage()}); 00189 logger.error(msg, e); 00190 throw new SQLException(msg); 00191 } 00192 00193 return rs; 00194 } 00195 00199 public int execWriteRequest(AbstractWriteRequest request) 00200 throws AllBackendsFailedException, SQLException 00201 { 00202 DatabaseBackend backend; 00203 if (request.isAutoCommit()) 00204 backend = chooseBackendForWriteRequest(request); 00205 else 00206 backend = (DatabaseBackend) backendPerTransactionId.get(new Long(request 00207 .getTransactionId())); 00208 00209 if (backend == null) 00210 throw new SQLException(Translate.get( 00211 "loadbalancer.execute.no.backend.found", request.getSQLShortForm(vdb 00212 .getSQLShortFormLength()))); 00213 00214 int result; 00215 // Execute the request on the chosen backend 00216 try 00217 { 00218 result = executeWriteRequestOnBackend(request, backend); 00219 } 00220 catch (UnreachableBackendException urbe) 00221 { 00222 // Try to execute query on different backend 00223 return execWriteRequest(request); 00224 } 00225 catch (SQLException se) 00226 { 00227 String msg = Translate.get("loadbalancer.request.failed", new String[]{ 00228 String.valueOf(request.getId()), se.getMessage()}); 00229 if (logger.isInfoEnabled()) 00230 logger.info(msg); 00231 throw new SQLException(msg); 00232 } 00233 catch (RuntimeException e) 00234 { 00235 String msg = Translate.get("loadbalancer.request.failed.on.backend", 00236 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()), 00237 backend.getName(), e.getMessage()}); 00238 logger.error(msg, e); 00239 throw new SQLException(msg); 00240 } 00241 00242 return result; 00243 } 00244 00249 public ControllerResultSet execWriteRequestWithKeys( 00250 AbstractWriteRequest request, MetadataCache metadataCache) 00251 throws AllBackendsFailedException, SQLException 00252 { 00253 DatabaseBackend backend; 00254 if (request.isAutoCommit()) 00255 backend = chooseBackendForWriteRequest(request); 00256 else 00257 backend = (DatabaseBackend) backendPerTransactionId.get(new Long(request 00258 .getTransactionId())); 00259 00260 if (backend == null) 00261 throw new SQLException(Translate.get( 00262 "loadbalancer.execute.no.backend.found", request.getSQLShortForm(vdb 00263 .getSQLShortFormLength()))); 00264 00265 ControllerResultSet rs; 00266 // Execute the request on the chosen backend 00267 try 00268 { 00269 rs = executeWriteRequestWithKeysOnBackend(request, backend, metadataCache); 00270 } 00271 catch (UnreachableBackendException urbe) 00272 { 00273 // Try to execute query on different backend 00274 return execWriteRequestWithKeys(request, metadataCache); 00275 } 00276 catch (SQLException se) 00277 { 00278 String msg = Translate.get("loadbalancer.request.failed", new String[]{ 00279 String.valueOf(request.getId()), se.getMessage()}); 00280 if (logger.isInfoEnabled()) 00281 logger.info(msg); 00282 throw new SQLException(msg); 00283 } 00284 catch (RuntimeException e) 00285 { 00286 String msg = Translate.get("loadbalancer.request.failed.on.backend", 00287 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()), 00288 backend.getName(), e.getMessage()}); 00289 logger.error(msg, e); 00290 throw new SQLException(msg); 00291 } 00292 00293 return rs; 00294 } 00295 00299 public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException 00300 { 00301 DatabaseBackend backend; 00302 if (proc.isAutoCommit()) 00303 backend = chooseBackendForReadRequest(proc); 00304 else 00305 backend = (DatabaseBackend) backendPerTransactionId.get(new Long(proc 00306 .getTransactionId())); 00307 00308 if (backend == null) 00309 throw new SQLException(Translate.get( 00310 "loadbalancer.storedprocedure.no.backend.found", proc 00311 .getSQLShortForm(vdb.getSQLShortFormLength()))); 00312 00313 int result; 00314 // Execute the request on the chosen backend 00315 try 00316 { 00317 result = executeWriteStoredProcedureOnBackend(proc, backend); 00318 } 00319 catch (UnreachableBackendException urbe) 00320 { 00321 // Try to execute query on different backend 00322 return execWriteStoredProcedure(proc); 00323 } 00324 catch (SQLException se) 00325 { 00326 String msg = Translate.get("loadbalancer.storedprocedure.failed", 00327 new String[]{String.valueOf(proc.getId()), se.getMessage()}); 00328 if (logger.isInfoEnabled()) 00329 logger.info(msg); 00330 throw new SQLException(msg); 00331 } 00332 catch (RuntimeException e) 00333 { 00334 String msg = Translate.get( 00335 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00336 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00337 backend.getName(), e.getMessage()}); 00338 logger.error(msg, e); 00339 throw new SQLException(msg); 00340 } 00341 00342 return result; 00343 } 00344 00354 private ControllerResultSet executeReadRequestOnBackend( 00355 SelectRequest request, DatabaseBackend backend, 00356 MetadataCache metadataCache) throws SQLException, 00357 UnreachableBackendException 00358 { 00359 // Ok, we have a backend, let's execute the request 00360 AbstractConnectionManager cm = backend.getConnectionManager(request 00361 .getLogin()); 00362 00363 // Sanity check 00364 if (cm == null) 00365 { 00366 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00367 new String[]{request.getLogin(), backend.getName()}); 00368 logger.error(msg); 00369 throw new SQLException(msg); 00370 } 00371 00372 // Execute the query 00373 if (request.isAutoCommit()) 00374 { 00375 ControllerResultSet rs = null; 00376 boolean badConnection; 00377 do 00378 { 00379 badConnection = false; 00380 // Use a connection just for this request 00381 Connection c = null; 00382 try 00383 { 00384 c = cm.getConnection(); 00385 } 00386 catch (UnreachableBackendException e1) 00387 { 00388 logger.error(Translate.get( 00389 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00390 backend.disable(); 00391 throw new UnreachableBackendException(Translate.get( 00392 "loadbalancer.backend.unreacheable", backend.getName())); 00393 } 00394 00395 // Sanity check 00396 if (c == null) 00397 throw new SQLException(Translate.get( 00398 "loadbalancer.backend.no.connection", backend.getName())); 00399 00400 // Execute Query 00401 try 00402 { 00403 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 00404 cm.releaseConnection(c); 00405 } 00406 catch (SQLException e) 00407 { 00408 cm.releaseConnection(c); 00409 throw new SQLException(Translate.get( 00410 "loadbalancer.request.failed.on.backend", new String[]{ 00411 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00412 backend.getName(), e.getMessage()})); 00413 } 00414 catch (BadConnectionException e) 00415 { // Get rid of the bad connection 00416 cm.deleteConnection(c); 00417 badConnection = true; 00418 } 00419 } 00420 while (badConnection); 00421 if (logger.isDebugEnabled()) 00422 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00423 String.valueOf(request.getId()), backend.getName()})); 00424 return rs; 00425 } 00426 else 00427 { // Inside a transaction 00428 Connection c; 00429 long tid = request.getTransactionId(); 00430 Long lTid = new Long(tid); 00431 00432 if (!backend.isStartedTransaction(lTid)) 00433 { // transaction has not been started yet on this backend 00434 try 00435 { 00436 c = cm.getConnection(tid); 00437 } 00438 catch (UnreachableBackendException e1) 00439 { 00440 logger.error(Translate.get( 00441 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00442 backend.disable(); 00443 throw new UnreachableBackendException(Translate.get( 00444 "loadbalancer.backend.unreacheable", backend.getName())); 00445 } 00446 00447 // Sanity check 00448 if (c == null) 00449 throw new SQLException(Translate.get( 00450 "loadbalancer.unable.get.connection", new String[]{ 00451 String.valueOf(tid), backend.getName()})); 00452 00453 // begin transaction 00454 backend.startTransaction(lTid); 00455 c.setAutoCommit(false); 00456 } 00457 else 00458 { // Re-use the connection used by this transaction 00459 c = cm.retrieveConnection(tid); 00460 00461 // Sanity check 00462 if (c == null) 00463 throw new SQLException(Translate.get( 00464 "loadbalancer.unable.retrieve.connection", new String[]{ 00465 String.valueOf(tid), backend.getName()})); 00466 } 00467 00468 // Execute Query 00469 ControllerResultSet rs = null; 00470 try 00471 { 00472 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 00473 } 00474 catch (SQLException e) 00475 { 00476 throw new SQLException(Translate.get( 00477 "loadbalancer.request.failed.on.backend", new String[]{ 00478 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00479 backend.getName(), e.getMessage()})); 00480 } 00481 catch (BadConnectionException e) 00482 { // Connection failed, so did the transaction 00483 // Disable the backend. 00484 cm.deleteConnection(tid); 00485 String msg = Translate.get( 00486 "loadbalancer.backend.disabling.connection.failure", backend 00487 .getName()); 00488 logger.error(msg); 00489 backend.disable(); 00490 throw new SQLException(msg); 00491 } 00492 if (logger.isDebugEnabled()) 00493 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00494 new String[]{String.valueOf(tid), String.valueOf(request.getId()), 00495 backend.getName()})); 00496 return rs; 00497 } 00498 } 00499 00509 private ControllerResultSet executeReadStoredProcedureOnBackend( 00510 StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache) 00511 throws SQLException, UnreachableBackendException 00512 { 00513 // Ok, we have a backend, let's execute the request 00514 AbstractConnectionManager cm = backend 00515 .getConnectionManager(proc.getLogin()); 00516 00517 // Sanity check 00518 if (cm == null) 00519 { 00520 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00521 new String[]{proc.getLogin(), backend.getName()}); 00522 logger.error(msg); 00523 throw new SQLException(msg); 00524 } 00525 00526 // Execute the query 00527 if (proc.isAutoCommit()) 00528 { 00529 // Use a connection just for this request 00530 Connection c = null; 00531 try 00532 { 00533 c = cm.getConnection(); 00534 } 00535 catch (UnreachableBackendException e1) 00536 { 00537 logger.error(Translate.get( 00538 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00539 backend.disable(); 00540 throw new UnreachableBackendException(Translate.get( 00541 "loadbalancer.backend.unreacheable", backend.getName())); 00542 } 00543 00544 // Sanity check 00545 if (c == null) 00546 throw new UnreachableBackendException(Translate.get( 00547 "loadbalancer.backend.no.connection", backend.getName())); 00548 00549 // Execute Query 00550 ControllerResultSet rs = null; 00551 try 00552 { 00553 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 00554 backend, c, metadataCache); 00555 } 00556 catch (Exception e) 00557 { 00558 throw new SQLException(Translate.get( 00559 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00560 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00561 backend.getName(), e.getMessage()})); 00562 } 00563 finally 00564 { 00565 cm.releaseConnection(c); 00566 } 00567 if (logger.isDebugEnabled()) 00568 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 00569 new String[]{String.valueOf(proc.getId()), backend.getName()})); 00570 return rs; 00571 } 00572 else 00573 { // Inside a transaction 00574 Connection c; 00575 long tid = proc.getTransactionId(); 00576 Long lTid = new Long(tid); 00577 00578 if (!backend.isStartedTransaction(lTid)) 00579 { // transaction has not been started yet on this backend 00580 try 00581 { 00582 c = cm.getConnection(tid); 00583 } 00584 catch (UnreachableBackendException e1) 00585 { 00586 logger.error(Translate.get( 00587 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00588 backend.disable(); 00589 throw new UnreachableBackendException(Translate.get( 00590 "loadbalancer.backend.unreacheable", backend.getName())); 00591 } 00592 00593 // Sanity check 00594 if (c == null) 00595 throw new SQLException(Translate.get( 00596 "loadbalancer.unable.get.connection", new String[]{ 00597 String.valueOf(tid), backend.getName()})); 00598 00599 // begin transaction 00600 backend.startTransaction(lTid); 00601 c.setAutoCommit(false); 00602 } 00603 else 00604 { // Re-use the connection used by this transaction 00605 c = cm.retrieveConnection(tid); 00606 00607 // Sanity check 00608 if (c == null) 00609 throw new SQLException(Translate.get( 00610 "loadbalancer.unable.retrieve.connection", new String[]{ 00611 String.valueOf(tid), backend.getName()})); 00612 } 00613 00614 // Execute Query 00615 ControllerResultSet rs; 00616 try 00617 { 00618 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 00619 backend, c, metadataCache); 00620 } 00621 catch (Exception e) 00622 { 00623 throw new SQLException(Translate.get( 00624 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00625 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00626 backend.getName(), e.getMessage()})); 00627 } 00628 if (logger.isDebugEnabled()) 00629 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00630 new String[]{String.valueOf(tid), String.valueOf(proc.getId()), 00631 backend.getName()})); 00632 return rs; 00633 } 00634 } 00635 00644 private int executeWriteRequestOnBackend(AbstractWriteRequest request, 00645 DatabaseBackend backend) throws SQLException, UnreachableBackendException 00646 { 00647 if (backend == null) 00648 throw new SQLException(Translate.get( 00649 "loadbalancer.execute.no.backend.available", request.getId())); 00650 00651 try 00652 { 00653 AbstractConnectionManager cm = backend.getConnectionManager(request 00654 .getLogin()); 00655 if (request.isAutoCommit()) 00656 { // Use a connection just for this request 00657 Connection c = null; 00658 try 00659 { 00660 c = cm.getConnection(); 00661 } 00662 catch (UnreachableBackendException e1) 00663 { 00664 String backendName = backend.getName(); 00665 logger.error(Translate.get( 00666 "loadbalancer.backend.disabling.unreachable", backendName)); 00667 backend.disable(); 00668 throw new UnreachableBackendException(Translate.get( 00669 "loadbalancer.backend.unreacheable", backendName)); 00670 } 00671 00672 // Sanity check 00673 if (c == null) 00674 throw new UnreachableBackendException(Translate.get( 00675 "loadbalancer.backend.no.connection", backend.getName())); 00676 00677 // Execute Query 00678 int result; 00679 try 00680 { 00681 result = executeUpdateRequestOnBackend(request, backend, c); 00682 } 00683 catch (Exception e) 00684 { 00685 throw new SQLException(Translate.get( 00686 "loadbalancer.request.failed.on.backend", new String[]{ 00687 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00688 backend.getName(), e.getMessage()})); 00689 } 00690 finally 00691 { 00692 cm.releaseConnection(c); 00693 } 00694 return result; 00695 } 00696 else 00697 { // Re-use the connection used by this transaction 00698 Connection c = cm.retrieveConnection(request.getTransactionId()); 00699 00700 // Sanity check 00701 if (c == null) 00702 throw new SQLException(Translate.get( 00703 "loadbalancer.unable.retrieve.connection", 00704 new String[]{String.valueOf(request.getTransactionId()), 00705 backend.getName()})); 00706 00707 // Execute Query 00708 int result; 00709 try 00710 { 00711 result = executeUpdateRequestOnBackend(request, backend, c); 00712 return result; 00713 } 00714 catch (Exception e) 00715 { 00716 throw new SQLException(Translate.get( 00717 "loadbalancer.request.failed.on.backend", new String[]{ 00718 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00719 backend.getName(), e.getMessage()})); 00720 } 00721 } 00722 } 00723 catch (RuntimeException e) 00724 { 00725 String msg = Translate.get("loadbalancer.request.failed.on.backend", 00726 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()), 00727 backend.getName(), e.getMessage()}); 00728 logger.fatal(msg, e); 00729 throw new SQLException(msg); 00730 } 00731 } 00732 00743 private ControllerResultSet executeWriteRequestWithKeysOnBackend( 00744 AbstractWriteRequest request, DatabaseBackend backend, 00745 MetadataCache metadataCache) throws SQLException, 00746 UnreachableBackendException 00747 { 00748 if (backend == null) 00749 throw new SQLException(Translate.get( 00750 "loadbalancer.execute.no.backend.available", request.getId())); 00751 00752 if (!backend.getDriverCompliance().supportGetGeneratedKeys()) 00753 throw new SQLException(Translate.get( 00754 "loadbalancer.backend.autogeneratedkeys.unsupported", backend 00755 .getName())); 00756 00757 try 00758 { 00759 AbstractConnectionManager cm = backend.getConnectionManager(request 00760 .getLogin()); 00761 if (request.isAutoCommit()) 00762 { // Use a connection just for this request 00763 Connection c = null; 00764 try 00765 { 00766 c = cm.getConnection(); 00767 } 00768 catch (UnreachableBackendException e1) 00769 { 00770 String backendName = backend.getName(); 00771 logger.error(Translate.get( 00772 "loadbalancer.backend.disabling.unreachable", backendName)); 00773 backend.disable(); 00774 throw new UnreachableBackendException(Translate.get( 00775 "loadbalancer.backend.unreacheable", backendName)); 00776 } 00777 00778 // Sanity check 00779 if (c == null) 00780 throw new UnreachableBackendException(Translate.get( 00781 "loadbalancer.backend.no.connection", backend.getName())); 00782 00783 // Execute Query 00784 ControllerResultSet result; 00785 try 00786 { 00787 result = new ControllerResultSet(request, 00788 executeUpdateRequestOnBackendWithKeys(request, backend, c), 00789 metadataCache); 00790 } 00791 catch (Exception e) 00792 { 00793 throw new SQLException(Translate.get( 00794 "loadbalancer.request.failed.on.backend", new String[]{ 00795 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00796 backend.getName(), e.getMessage()})); 00797 } 00798 finally 00799 { 00800 cm.releaseConnection(c); 00801 } 00802 return result; 00803 } 00804 else 00805 { // Re-use the connection used by this transaction 00806 Connection c = cm.retrieveConnection(request.getTransactionId()); 00807 00808 // Sanity check 00809 if (c == null) 00810 throw new SQLException(Translate.get( 00811 "loadbalancer.unable.retrieve.connection", 00812 new String[]{String.valueOf(request.getTransactionId()), 00813 backend.getName()})); 00814 00815 // Execute Query 00816 try 00817 { 00818 return new ControllerResultSet(request, 00819 executeUpdateRequestOnBackendWithKeys(request, backend, c), 00820 metadataCache); 00821 } 00822 catch (Exception e) 00823 { 00824 throw new SQLException(Translate.get( 00825 "loadbalancer.request.failed.on.backend", new String[]{ 00826 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00827 backend.getName(), e.getMessage()})); 00828 } 00829 } 00830 } 00831 catch (RuntimeException e) 00832 { 00833 String msg = Translate 00834 .get("loadbalancer.request.failed", new String[]{ 00835 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00836 e.getMessage()}); 00837 logger.fatal(msg, e); 00838 throw new SQLException(msg); 00839 } 00840 } 00841 00850 private int executeWriteStoredProcedureOnBackend(StoredProcedure proc, 00851 DatabaseBackend backend) throws SQLException, UnreachableBackendException 00852 { 00853 // Ok, we have a backend, let's execute the request 00854 AbstractConnectionManager cm = backend 00855 .getConnectionManager(proc.getLogin()); 00856 00857 // Sanity check 00858 if (cm == null) 00859 { 00860 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00861 new String[]{proc.getLogin(), backend.getName()}); 00862 logger.error(msg); 00863 throw new SQLException(msg); 00864 } 00865 00866 // Execute the query 00867 if (proc.isAutoCommit()) 00868 { 00869 // Use a connection just for this request 00870 Connection c = null; 00871 try 00872 { 00873 c = cm.getConnection(); 00874 } 00875 catch (UnreachableBackendException e1) 00876 { 00877 logger.error(Translate.get( 00878 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00879 backend.disable(); 00880 throw new UnreachableBackendException(Translate.get( 00881 "loadbalancer.backend.unreacheable", backend.getName())); 00882 } 00883 00884 // Sanity check 00885 if (c == null) 00886 throw new UnreachableBackendException(Translate.get( 00887 "loadbalancer.backend.no.connection", backend.getName())); 00888 00889 // Execute Query 00890 int result; 00891 try 00892 { 00893 result = AbstractLoadBalancer.executeWriteStoredProcedureOnBackend( 00894 proc, backend, c); 00895 00896 // Warning! No way to detect if schema has been modified unless 00897 // we ask the backend again using DatabaseMetaData.getTables(). 00898 } 00899 catch (Exception e) 00900 { 00901 throw new SQLException(Translate.get( 00902 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00903 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00904 backend.getName(), e.getMessage()})); 00905 } 00906 finally 00907 { 00908 cm.releaseConnection(c); 00909 } 00910 if (logger.isDebugEnabled()) 00911 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 00912 new String[]{String.valueOf(proc.getId()), backend.getName()})); 00913 return result; 00914 } 00915 else 00916 { // Inside a transaction 00917 Connection c; 00918 long tid = proc.getTransactionId(); 00919 Long lTid = new Long(tid); 00920 00921 if (!backend.isStartedTransaction(lTid)) 00922 { // transaction has not been started yet on this backend 00923 try 00924 { 00925 c = cm.getConnection(tid); 00926 } 00927 catch (UnreachableBackendException e1) 00928 { 00929 logger.error(Translate.get( 00930 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00931 backend.disable(); 00932 throw new SQLException(Translate.get( 00933 "loadbalancer.backend.unreacheable", backend.getName())); 00934 } 00935 00936 // Sanity check 00937 if (c == null) 00938 throw new SQLException(Translate.get( 00939 "loadbalancer.unable.get.connection", new String[]{ 00940 String.valueOf(tid), backend.getName()})); 00941 00942 // begin transaction 00943 backend.startTransaction(lTid); 00944 c.setAutoCommit(false); 00945 } 00946 else 00947 { // Re-use the connection used by this transaction 00948 c = cm.retrieveConnection(tid); 00949 00950 // Sanity check 00951 if (c == null) 00952 throw new SQLException(Translate.get( 00953 "loadbalancer.unable.retrieve.connection", new String[]{ 00954 String.valueOf(tid), backend.getName()})); 00955 } 00956 00957 // Execute Query 00958 int result; 00959 try 00960 { 00961 result = AbstractLoadBalancer.executeWriteStoredProcedureOnBackend( 00962 proc, backend, c); 00963 00964 // Warning! No way to detect if schema has been modified unless 00965 // we ask the backend again using DatabaseMetaData.getTables(). 00966 } 00967 catch (Exception e) 00968 { 00969 throw new SQLException(Translate.get( 00970 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00971 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00972 backend.getName(), e.getMessage()})); 00973 } 00974 if (logger.isDebugEnabled()) 00975 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00976 new String[]{String.valueOf(tid), String.valueOf(proc.getId()), 00977 backend.getName()})); 00978 return result; 00979 } 00980 } 00981 00982 // 00983 // Transaction Management 00984 // 00985 00992 public void begin(TransactionMarkerMetaData tm) throws SQLException 00993 { 00994 Long lTid = new Long(tm.getTransactionId()); 00995 if (backendPerTransactionId.containsKey(lTid)) 00996 throw new SQLException(Translate.get( 00997 "loadbalancer.transaction.already.started", lTid.toString())); 00998 00999 DatabaseBackend backend = chooseBackendForReadRequest(new UnknownRequest( 01000 "begin", false, 0, "\n")); 01001 backendPerTransactionId.put(lTid, backend); 01002 backend.startTransaction(lTid); 01003 } 01004 01008 public void commit(TransactionMarkerMetaData tm) throws SQLException 01009 { 01010 long tid = tm.getTransactionId(); 01011 Long lTid = new Long(tid); 01012 DatabaseBackend db = (DatabaseBackend) backendPerTransactionId.remove(lTid); 01013 01014 AbstractConnectionManager cm = db.getConnectionManager(tm.getLogin()); 01015 Connection c = cm.retrieveConnection(tid); 01016 01017 // Sanity check 01018 if (c == null) 01019 { // Bad connection 01020 db.stopTransaction(lTid); 01021 01022 throw new SQLException(Translate.get( 01023 "loadbalancer.unable.retrieve.connection", new String[]{ 01024 String.valueOf(tid), db.getName()})); 01025 } 01026 01027 // Execute Query 01028 try 01029 { 01030 c.commit(); 01031 c.setAutoCommit(true); 01032 } 01033 catch (Exception e) 01034 { 01035 String msg = Translate.get("loadbalancer.commit.failed", new String[]{ 01036 String.valueOf(tid), db.getName(), e.getMessage()}); 01037 logger.error(msg); 01038 throw new SQLException(msg); 01039 } 01040 finally 01041 { 01042 cm.releaseConnection(tid); 01043 db.stopTransaction(lTid); 01044 } 01045 } 01046 01050 public void rollback(TransactionMarkerMetaData tm) throws SQLException 01051 { 01052 long tid = tm.getTransactionId(); 01053 Long lTid = new Long(tid); 01054 DatabaseBackend db = (DatabaseBackend) backendPerTransactionId.remove(lTid); 01055 01056 AbstractConnectionManager cm = db.getConnectionManager(tm.getLogin()); 01057 Connection c = cm.retrieveConnection(tid); 01058 01059 // Sanity check 01060 if (c == null) 01061 { // Bad connection 01062 db.stopTransaction(lTid); 01063 01064 throw new SQLException(Translate.get( 01065 "loadbalancer.unable.retrieve.connection", new String[]{ 01066 String.valueOf(tid), db.getName()})); 01067 } 01068 01069 // Execute Query 01070 try 01071 { 01072 c.rollback(); 01073 c.setAutoCommit(true); 01074 } 01075 catch (Exception e) 01076 { 01077 String msg = Translate.get("loadbalancer.rollback.failed", new String[]{ 01078 String.valueOf(tid), db.getName(), e.getMessage()}); 01079 logger.error(msg); 01080 throw new SQLException(msg); 01081 } 01082 finally 01083 { 01084 cm.releaseConnection(tid); 01085 db.stopTransaction(lTid); 01086 } 01087 } 01088 01099 public void enableBackend(DatabaseBackend db, boolean writeEnabled) 01100 throws SQLException 01101 { 01102 logger.info(Translate.get("loadbalancer.backend.enabling", db.getName())); 01103 if (!db.isInitialized()) 01104 db.initializeConnections(); 01105 db.enableRead(); 01106 if (writeEnabled) 01107 db.enableWrite(); 01108 } 01109 01119 public void disableBackend(DatabaseBackend db) throws SQLException 01120 { 01121 logger.info(Translate.get("loadbalancer.backend.disabling", db.getName())); 01122 db.disable(); 01123 if (db.isInitialized()) 01124 db.finalizeConnections(); 01125 } 01126 01130 public String getXmlImpl() 01131 { 01132 StringBuffer info = new StringBuffer(); 01133 info.append("<" + DatabasesXmlTags.ELT_ParallelDB + ">"); 01134 info.append(getParallelDBXml()); 01135 info.append("</" + DatabasesXmlTags.ELT_ParallelDB + ">"); 01136 return info.toString(); 01137 } 01138 01144 public abstract String getParallelDBXml(); 01145 01154 public abstract DatabaseBackend chooseBackendForReadRequest( 01155 AbstractRequest request) throws SQLException; 01156 01165 public abstract DatabaseBackend chooseBackendForWriteRequest( 01166 AbstractWriteRequest request) throws SQLException; 01167 01168 }

CJDBCversion1.0.4に対してTue Oct 12 15:16:01 2004に生成されました。 doxygen 1.3.8