00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.loadbalancer.paralleldb;
00026
00027 import java.sql.Connection;
00028 import java.sql.SQLException;
00029 import java.util.Hashtable;
00030
00031 import javax.management.NotCompliantMBeanException;
00032
00033 import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
00034 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException;
00035 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
00036 import org.objectweb.cjdbc.common.i18n.Translate;
00037 import org.objectweb.cjdbc.common.sql.AbstractRequest;
00038 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00039 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00040 import org.objectweb.cjdbc.common.sql.SelectRequest;
00041 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00042 import org.objectweb.cjdbc.common.sql.UnknownRequest;
00043 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00044 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00045 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
00046 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
00047 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00048 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00049 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00050 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00051 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00052 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071 public abstract class ParallelDB extends AbstractLoadBalancer
00072 {
00073
00074 private Hashtable backendPerTransactionId;
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084 public ParallelDB(VirtualDatabase vdb) throws SQLException,
00085 NotCompliantMBeanException
00086 {
00087 super(vdb, RAIDbLevels.SingleDB, ParsingGranularities.NO_PARSING);
00088 }
00089
00090
00091
00092
00093
00094 public ControllerResultSet execReadRequest(SelectRequest request,
00095 MetadataCache metadataCache) throws SQLException
00096 {
00097 DatabaseBackend backend;
00098 if (request.isAutoCommit())
00099 backend = chooseBackendForReadRequest(request);
00100 else
00101 backend = (DatabaseBackend) backendPerTransactionId.get(new Long(request
00102 .getTransactionId()));
00103
00104 if (backend == null)
00105 throw new SQLException(Translate.get(
00106 "loadbalancer.execute.no.backend.found", request.getSQLShortForm(vdb
00107 .getSQLShortFormLength())));
00108
00109 ControllerResultSet rs = null;
00110
00111 try
00112 {
00113 rs = executeReadRequestOnBackend(request, backend, metadataCache);
00114 }
00115 catch (UnreachableBackendException urbe)
00116 {
00117
00118 return execReadRequest(request, metadataCache);
00119 }
00120 catch (SQLException se)
00121 {
00122 String msg = Translate.get("loadbalancer.request.failed", new String[]{
00123 String.valueOf(request.getId()), se.getMessage()});
00124 if (logger.isInfoEnabled())
00125 logger.info(msg);
00126 throw new SQLException(msg);
00127 }
00128 catch (RuntimeException e)
00129 {
00130 String msg = Translate.get("loadbalancer.request.failed.on.backend",
00131 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00132 backend.getName(), e.getMessage()});
00133 logger.error(msg, e);
00134 throw new SQLException(msg);
00135 }
00136
00137 return rs;
00138 }
00139
00140
00141
00142
00143
00144 public ControllerResultSet execReadOnlyReadStoredProcedure(
00145 StoredProcedure proc, MetadataCache metadataCache) throws SQLException
00146 {
00147 return execReadStoredProcedure(proc, metadataCache);
00148 }
00149
00150
00151
00152
00153
00154 public ControllerResultSet execReadStoredProcedure(StoredProcedure proc,
00155 MetadataCache metadataCache) throws SQLException
00156 {
00157 DatabaseBackend backend;
00158 if (proc.isAutoCommit())
00159 backend = chooseBackendForReadRequest(proc);
00160 else
00161 backend = (DatabaseBackend) backendPerTransactionId.get(new Long(proc
00162 .getTransactionId()));
00163
00164 if (backend == null)
00165 throw new SQLException(Translate.get(
00166 "loadbalancer.storedprocedure.no.backend.found", proc
00167 .getSQLShortForm(vdb.getSQLShortFormLength())));
00168
00169 ControllerResultSet rs = null;
00170
00171 try
00172 {
00173 rs = executeReadStoredProcedureOnBackend(proc, backend, metadataCache);
00174 }
00175 catch (UnreachableBackendException urbe)
00176 {
00177
00178 return execReadStoredProcedure(proc, metadataCache);
00179 }
00180 catch (SQLException se)
00181 {
00182 String msg = Translate.get("loadbalancer.storedprocedure.failed",
00183 new String[]{String.valueOf(proc.getId()), se.getMessage()});
00184 if (logger.isInfoEnabled())
00185 logger.info(msg);
00186 throw new SQLException(msg);
00187 }
00188 catch (RuntimeException e)
00189 {
00190 String msg = Translate.get(
00191 "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00192 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00193 backend.getName(), e.getMessage()});
00194 logger.error(msg, e);
00195 throw new SQLException(msg);
00196 }
00197
00198 return rs;
00199 }
00200
00201
00202
00203
00204 public int execWriteRequest(AbstractWriteRequest request)
00205 throws AllBackendsFailedException, SQLException
00206 {
00207 DatabaseBackend backend;
00208 if (request.isAutoCommit())
00209 backend = chooseBackendForWriteRequest(request);
00210 else
00211 backend = (DatabaseBackend) backendPerTransactionId.get(new Long(request
00212 .getTransactionId()));
00213
00214 if (backend == null)
00215 throw new SQLException(Translate.get(
00216 "loadbalancer.execute.no.backend.found", request.getSQLShortForm(vdb
00217 .getSQLShortFormLength())));
00218
00219 int result;
00220
00221 try
00222 {
00223 result = executeWriteRequestOnBackend(request, backend);
00224 }
00225 catch (UnreachableBackendException urbe)
00226 {
00227
00228 return execWriteRequest(request);
00229 }
00230 catch (SQLException se)
00231 {
00232 String msg = Translate.get("loadbalancer.request.failed", new String[]{
00233 String.valueOf(request.getId()), se.getMessage()});
00234 if (logger.isInfoEnabled())
00235 logger.info(msg);
00236 throw new SQLException(msg);
00237 }
00238 catch (RuntimeException e)
00239 {
00240 String msg = Translate.get("loadbalancer.request.failed.on.backend",
00241 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00242 backend.getName(), e.getMessage()});
00243 logger.error(msg, e);
00244 throw new SQLException(msg);
00245 }
00246
00247 return result;
00248 }
00249
00250
00251
00252
00253
00254 public ControllerResultSet execWriteRequestWithKeys(
00255 AbstractWriteRequest request, MetadataCache metadataCache)
00256 throws AllBackendsFailedException, SQLException
00257 {
00258 DatabaseBackend backend;
00259 if (request.isAutoCommit())
00260 backend = chooseBackendForWriteRequest(request);
00261 else
00262 backend = (DatabaseBackend) backendPerTransactionId.get(new Long(request
00263 .getTransactionId()));
00264
00265 if (backend == null)
00266 throw new SQLException(Translate.get(
00267 "loadbalancer.execute.no.backend.found", request.getSQLShortForm(vdb
00268 .getSQLShortFormLength())));
00269
00270 ControllerResultSet rs;
00271
00272 try
00273 {
00274 rs = executeWriteRequestWithKeysOnBackend(request, backend, metadataCache);
00275 }
00276 catch (UnreachableBackendException urbe)
00277 {
00278
00279 return execWriteRequestWithKeys(request, metadataCache);
00280 }
00281 catch (SQLException se)
00282 {
00283 String msg = Translate.get("loadbalancer.request.failed", new String[]{
00284 String.valueOf(request.getId()), se.getMessage()});
00285 if (logger.isInfoEnabled())
00286 logger.info(msg);
00287 throw new SQLException(msg);
00288 }
00289 catch (RuntimeException e)
00290 {
00291 String msg = Translate.get("loadbalancer.request.failed.on.backend",
00292 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00293 backend.getName(), e.getMessage()});
00294 logger.error(msg, e);
00295 throw new SQLException(msg);
00296 }
00297
00298 return rs;
00299 }
00300
00301
00302
00303
00304 public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException
00305 {
00306 DatabaseBackend backend;
00307 if (proc.isAutoCommit())
00308 backend = chooseBackendForReadRequest(proc);
00309 else
00310 backend = (DatabaseBackend) backendPerTransactionId.get(new Long(proc
00311 .getTransactionId()));
00312
00313 if (backend == null)
00314 throw new SQLException(Translate.get(
00315 "loadbalancer.storedprocedure.no.backend.found", proc
00316 .getSQLShortForm(vdb.getSQLShortFormLength())));
00317
00318 int result;
00319
00320 try
00321 {
00322 result = executeWriteStoredProcedureOnBackend(proc, backend);
00323 }
00324 catch (UnreachableBackendException urbe)
00325 {
00326
00327 return execWriteStoredProcedure(proc);
00328 }
00329 catch (SQLException se)
00330 {
00331 String msg = Translate.get("loadbalancer.storedprocedure.failed",
00332 new String[]{String.valueOf(proc.getId()), se.getMessage()});
00333 if (logger.isInfoEnabled())
00334 logger.info(msg);
00335 throw new SQLException(msg);
00336 }
00337 catch (RuntimeException e)
00338 {
00339 String msg = Translate.get(
00340 "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00341 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00342 backend.getName(), e.getMessage()});
00343 logger.error(msg, e);
00344 throw new SQLException(msg);
00345 }
00346
00347 return result;
00348 }
00349
00350
00351
00352
00353
00354
00355
00356
00357
00358
00359 private ControllerResultSet executeReadRequestOnBackend(
00360 SelectRequest request, DatabaseBackend backend,
00361 MetadataCache metadataCache) throws SQLException,
00362 UnreachableBackendException
00363 {
00364
00365 AbstractConnectionManager cm = backend.getConnectionManager(request
00366 .getLogin());
00367
00368
00369 if (cm == null)
00370 {
00371 String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00372 new String[]{request.getLogin(), backend.getName()});
00373 logger.error(msg);
00374 throw new SQLException(msg);
00375 }
00376
00377
00378 if (request.isAutoCommit())
00379 {
00380 ControllerResultSet rs = null;
00381 boolean badConnection;
00382 do
00383 {
00384 badConnection = false;
00385
00386 Connection c = null;
00387 try
00388 {
00389 c = cm.getConnection();
00390 }
00391 catch (UnreachableBackendException e1)
00392 {
00393 logger.error(Translate.get(
00394 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00395 disableBackend(backend);
00396 throw new UnreachableBackendException(Translate.get(
00397 "loadbalancer.backend.unreacheable", backend.getName()));
00398 }
00399
00400
00401 if (c == null)
00402 throw new SQLException(Translate.get(
00403 "loadbalancer.backend.no.connection", backend.getName()));
00404
00405
00406 try
00407 {
00408 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00409 cm.releaseConnection(c);
00410 }
00411 catch (SQLException e)
00412 {
00413 cm.releaseConnection(c);
00414 throw new SQLException(Translate.get(
00415 "loadbalancer.request.failed.on.backend", new String[]{
00416 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00417 backend.getName(), e.getMessage()}));
00418 }
00419 catch (BadConnectionException e)
00420 {
00421 cm.deleteConnection(c);
00422 badConnection = true;
00423 }
00424 }
00425 while (badConnection);
00426 if (logger.isDebugEnabled())
00427 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00428 String.valueOf(request.getId()), backend.getName()}));
00429 return rs;
00430 }
00431 else
00432 {
00433 Connection c;
00434 long tid = request.getTransactionId();
00435 Long lTid = new Long(tid);
00436
00437 try
00438 {
00439 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00440 }
00441 catch (UnreachableBackendException e1)
00442 {
00443 logger.error(Translate.get(
00444 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00445 disableBackend(backend);
00446 throw new SQLException(Translate.get(
00447 "loadbalancer.backend.unreacheable", backend.getName()));
00448 }
00449 catch (NoTransactionStartWhenDisablingException e)
00450 {
00451 String msg = Translate.get("loadbalancer.backend.is.disabling",
00452 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00453 backend.getName()});
00454 logger.error(msg);
00455 throw new SQLException(msg);
00456 }
00457
00458
00459 if (c == null)
00460 throw new SQLException(Translate.get(
00461 "loadbalancer.unable.retrieve.connection", new String[]{
00462 String.valueOf(tid), backend.getName()}));
00463
00464
00465 ControllerResultSet rs = null;
00466 try
00467 {
00468 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00469 }
00470 catch (SQLException e)
00471 {
00472 throw new SQLException(Translate.get(
00473 "loadbalancer.request.failed.on.backend", new String[]{
00474 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00475 backend.getName(), e.getMessage()}));
00476 }
00477 catch (BadConnectionException e)
00478 {
00479
00480 cm.deleteConnection(tid);
00481 String msg = Translate.get(
00482 "loadbalancer.backend.disabling.connection.failure", backend
00483 .getName());
00484 logger.error(msg);
00485 disableBackend(backend);
00486 throw new SQLException(msg);
00487 }
00488 if (logger.isDebugEnabled())
00489 logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00490 new String[]{String.valueOf(tid), String.valueOf(request.getId()),
00491 backend.getName()}));
00492 return rs;
00493 }
00494 }
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505 private ControllerResultSet executeReadStoredProcedureOnBackend(
00506 StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache)
00507 throws SQLException, UnreachableBackendException
00508 {
00509
00510 AbstractConnectionManager cm = backend
00511 .getConnectionManager(proc.getLogin());
00512
00513
00514 if (cm == null)
00515 {
00516 String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00517 new String[]{proc.getLogin(), backend.getName()});
00518 logger.error(msg);
00519 throw new SQLException(msg);
00520 }
00521
00522
00523 if (proc.isAutoCommit())
00524 {
00525
00526 Connection c = null;
00527 try
00528 {
00529 c = cm.getConnection();
00530 }
00531 catch (UnreachableBackendException e1)
00532 {
00533 logger.error(Translate.get(
00534 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00535 disableBackend(backend);
00536 throw new UnreachableBackendException(Translate.get(
00537 "loadbalancer.backend.unreacheable", backend.getName()));
00538 }
00539
00540
00541 if (c == null)
00542 throw new UnreachableBackendException(Translate.get(
00543 "loadbalancer.backend.no.connection", backend.getName()));
00544
00545
00546 ControllerResultSet rs = null;
00547 try
00548 {
00549 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00550 backend, c, metadataCache);
00551 }
00552 catch (Exception e)
00553 {
00554 throw new SQLException(Translate.get(
00555 "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00556 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00557 backend.getName(), e.getMessage()}));
00558 }
00559 finally
00560 {
00561 cm.releaseConnection(c);
00562 }
00563 if (logger.isDebugEnabled())
00564 logger.debug(Translate.get("loadbalancer.storedprocedure.on",
00565 new String[]{String.valueOf(proc.getId()), backend.getName()}));
00566 return rs;
00567 }
00568 else
00569 {
00570 Connection c;
00571 long tid = proc.getTransactionId();
00572 Long lTid = new Long(tid);
00573
00574 try
00575 {
00576 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00577 }
00578 catch (UnreachableBackendException e1)
00579 {
00580 logger.error(Translate.get(
00581 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00582 disableBackend(backend);
00583 throw new SQLException(Translate.get(
00584 "loadbalancer.backend.unreacheable", backend.getName()));
00585 }
00586 catch (NoTransactionStartWhenDisablingException e)
00587 {
00588 String msg = Translate.get("loadbalancer.backend.is.disabling",
00589 new String[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00590 backend.getName()});
00591 logger.error(msg);
00592 throw new SQLException(msg);
00593 }
00594
00595
00596 if (c == null)
00597 throw new SQLException(Translate.get(
00598 "loadbalancer.unable.retrieve.connection", new String[]{
00599 String.valueOf(tid), backend.getName()}));
00600
00601
00602 ControllerResultSet rs;
00603 try
00604 {
00605 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00606 backend, c, metadataCache);
00607 }
00608 catch (Exception e)
00609 {
00610 throw new SQLException(Translate.get(
00611 "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00612 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00613 backend.getName(), e.getMessage()}));
00614 }
00615 if (logger.isDebugEnabled())
00616 logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00617 new String[]{String.valueOf(tid), String.valueOf(proc.getId()),
00618 backend.getName()}));
00619 return rs;
00620 }
00621 }
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631 private int executeWriteRequestOnBackend(AbstractWriteRequest request,
00632 DatabaseBackend backend) throws SQLException, UnreachableBackendException
00633 {
00634 if (backend == null)
00635 throw new SQLException(Translate.get(
00636 "loadbalancer.execute.no.backend.available", request.getId()));
00637
00638 try
00639 {
00640 AbstractConnectionManager cm = backend.getConnectionManager(request
00641 .getLogin());
00642 if (request.isAutoCommit())
00643 {
00644 Connection c = null;
00645 try
00646 {
00647 c = cm.getConnection();
00648 }
00649 catch (UnreachableBackendException e1)
00650 {
00651 String backendName = backend.getName();
00652 logger.error(Translate.get(
00653 "loadbalancer.backend.disabling.unreachable", backendName));
00654 disableBackend(backend);
00655 throw new UnreachableBackendException(Translate.get(
00656 "loadbalancer.backend.unreacheable", backendName));
00657 }
00658
00659
00660 if (c == null)
00661 throw new UnreachableBackendException(Translate.get(
00662 "loadbalancer.backend.no.connection", backend.getName()));
00663
00664
00665 int result;
00666 try
00667 {
00668 result = executeUpdateRequestOnBackend(request, backend, c);
00669 }
00670 catch (Exception e)
00671 {
00672 throw new SQLException(Translate.get(
00673 "loadbalancer.request.failed.on.backend", new String[]{
00674 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00675 backend.getName(), e.getMessage()}));
00676 }
00677 finally
00678 {
00679 cm.releaseConnection(c);
00680 }
00681 return result;
00682 }
00683 else
00684 {
00685 Connection c = cm.retrieveConnection(request.getTransactionId());
00686
00687
00688 if (c == null)
00689 throw new SQLException(Translate.get(
00690 "loadbalancer.unable.retrieve.connection",
00691 new String[]{String.valueOf(request.getTransactionId()),
00692 backend.getName()}));
00693
00694
00695 int result;
00696 try
00697 {
00698 result = executeUpdateRequestOnBackend(request, backend, c);
00699 return result;
00700 }
00701 catch (Exception e)
00702 {
00703 throw new SQLException(Translate.get(
00704 "loadbalancer.request.failed.on.backend", new String[]{
00705 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00706 backend.getName(), e.getMessage()}));
00707 }
00708 }
00709 }
00710 catch (RuntimeException e)
00711 {
00712 String msg = Translate.get("loadbalancer.request.failed.on.backend",
00713 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00714 backend.getName(), e.getMessage()});
00715 logger.fatal(msg, e);
00716 throw new SQLException(msg);
00717 }
00718 }
00719
00720
00721
00722
00723
00724
00725
00726
00727
00728
00729
00730 private ControllerResultSet executeWriteRequestWithKeysOnBackend(
00731 AbstractWriteRequest request, DatabaseBackend backend,
00732 MetadataCache metadataCache) throws SQLException,
00733 UnreachableBackendException
00734 {
00735 if (backend == null)
00736 throw new SQLException(Translate.get(
00737 "loadbalancer.execute.no.backend.available", request.getId()));
00738
00739 if (!backend.getDriverCompliance().supportGetGeneratedKeys())
00740 throw new SQLException(Translate.get(
00741 "loadbalancer.backend.autogeneratedkeys.unsupported", backend
00742 .getName()));
00743
00744 try
00745 {
00746 AbstractConnectionManager cm = backend.getConnectionManager(request
00747 .getLogin());
00748 if (request.isAutoCommit())
00749 {
00750 Connection c = null;
00751 try
00752 {
00753 c = cm.getConnection();
00754 }
00755 catch (UnreachableBackendException e1)
00756 {
00757 String backendName = backend.getName();
00758 logger.error(Translate.get(
00759 "loadbalancer.backend.disabling.unreachable", backendName));
00760 disableBackend(backend);
00761 throw new UnreachableBackendException(Translate.get(
00762 "loadbalancer.backend.unreacheable", backendName));
00763 }
00764
00765
00766 if (c == null)
00767 throw new UnreachableBackendException(Translate.get(
00768 "loadbalancer.backend.no.connection", backend.getName()));
00769
00770
00771 ControllerResultSet result;
00772 try
00773 {
00774 result = executeUpdateRequestOnBackendWithKeys(request, backend, c,
00775 metadataCache);
00776 }
00777 catch (Exception e)
00778 {
00779 throw new SQLException(Translate.get(
00780 "loadbalancer.request.failed.on.backend", new String[]{
00781 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00782 backend.getName(), e.getMessage()}));
00783 }
00784 finally
00785 {
00786 cm.releaseConnection(c);
00787 }
00788 return result;
00789 }
00790 else
00791 {
00792 Connection c = cm.retrieveConnection(request.getTransactionId());
00793
00794
00795 if (c == null)
00796 throw new SQLException(Translate.get(
00797 "loadbalancer.unable.retrieve.connection",
00798 new String[]{String.valueOf(request.getTransactionId()),
00799 backend.getName()}));
00800
00801
00802 try
00803 {
00804 return executeUpdateRequestOnBackendWithKeys(request, backend, c,
00805 metadataCache);
00806 }
00807 catch (Exception e)
00808 {
00809 throw new SQLException(Translate.get(
00810 "loadbalancer.request.failed.on.backend", new String[]{
00811 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00812 backend.getName(), e.getMessage()}));
00813 }
00814 }
00815 }
00816 catch (RuntimeException e)
00817 {
00818 String msg = Translate
00819 .get("loadbalancer.request.failed", new String[]{
00820 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00821 e.getMessage()});
00822 logger.fatal(msg, e);
00823 throw new SQLException(msg);
00824 }
00825 }
00826
00827
00828
00829
00830
00831
00832
00833
00834
00835 private int executeWriteStoredProcedureOnBackend(StoredProcedure proc,
00836 DatabaseBackend backend) throws SQLException, UnreachableBackendException
00837 {
00838
00839 AbstractConnectionManager cm = backend
00840 .getConnectionManager(proc.getLogin());
00841
00842
00843 if (cm == null)
00844 {
00845 String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00846 new String[]{proc.getLogin(), backend.getName()});
00847 logger.error(msg);
00848 throw new SQLException(msg);
00849 }
00850
00851
00852 if (proc.isAutoCommit())
00853 {
00854
00855 Connection c = null;
00856 try
00857 {
00858 c = cm.getConnection();
00859 }
00860 catch (UnreachableBackendException e1)
00861 {
00862 logger.error(Translate.get(
00863 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00864 disableBackend(backend);
00865 throw new UnreachableBackendException(Translate.get(
00866 "loadbalancer.backend.unreacheable", backend.getName()));
00867 }
00868
00869
00870 if (c == null)
00871 throw new UnreachableBackendException(Translate.get(
00872 "loadbalancer.backend.no.connection", backend.getName()));
00873
00874
00875 int result;
00876 try
00877 {
00878 result = AbstractLoadBalancer.executeWriteStoredProcedureOnBackend(
00879 proc, backend, c);
00880
00881
00882
00883 }
00884 catch (Exception e)
00885 {
00886 throw new SQLException(Translate.get(
00887 "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00888 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00889 backend.getName(), e.getMessage()}));
00890 }
00891 finally
00892 {
00893 cm.releaseConnection(c);
00894 }
00895 if (logger.isDebugEnabled())
00896 logger.debug(Translate.get("loadbalancer.storedprocedure.on",
00897 new String[]{String.valueOf(proc.getId()), backend.getName()}));
00898 return result;
00899 }
00900 else
00901 {
00902 Connection c;
00903 long tid = proc.getTransactionId();
00904 Long lTid = new Long(tid);
00905
00906 try
00907 {
00908 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00909 }
00910 catch (UnreachableBackendException e1)
00911 {
00912 logger.error(Translate.get(
00913 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00914 disableBackend(backend);
00915 throw new SQLException(Translate.get(
00916 "loadbalancer.backend.unreacheable", backend.getName()));
00917 }
00918 catch (NoTransactionStartWhenDisablingException e)
00919 {
00920 String msg = Translate.get("loadbalancer.backend.is.disabling",
00921 new String[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00922 backend.getName()});
00923 logger.error(msg);
00924 throw new SQLException(msg);
00925 }
00926
00927
00928 if (c == null)
00929 throw new SQLException(Translate.get(
00930 "loadbalancer.unable.retrieve.connection", new String[]{
00931 String.valueOf(tid), backend.getName()}));
00932
00933
00934 int result;
00935 try
00936 {
00937 result = AbstractLoadBalancer.executeWriteStoredProcedureOnBackend(
00938 proc, backend, c);
00939
00940
00941
00942 }
00943 catch (Exception e)
00944 {
00945 throw new SQLException(Translate.get(
00946 "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00947 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00948 backend.getName(), e.getMessage()}));
00949 }
00950 if (logger.isDebugEnabled())
00951 logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00952 new String[]{String.valueOf(tid), String.valueOf(proc.getId()),
00953 backend.getName()}));
00954 return result;
00955 }
00956 }
00957
00958
00959
00960
00961
00962
00963
00964
00965
00966
00967
00968 public void begin(TransactionMarkerMetaData tm) throws SQLException
00969 {
00970 Long lTid = new Long(tm.getTransactionId());
00971 if (backendPerTransactionId.containsKey(lTid))
00972 throw new SQLException(Translate.get(
00973 "loadbalancer.transaction.already.started", lTid.toString()));
00974
00975 DatabaseBackend backend = chooseBackendForReadRequest(new UnknownRequest(
00976 "begin", false, 0, "\n"));
00977 backendPerTransactionId.put(lTid, backend);
00978 backend.startTransaction(lTid);
00979 }
00980
00981
00982
00983
00984 public void commit(TransactionMarkerMetaData tm) throws SQLException
00985 {
00986 long tid = tm.getTransactionId();
00987 Long lTid = new Long(tid);
00988 DatabaseBackend db = (DatabaseBackend) backendPerTransactionId.remove(lTid);
00989
00990 AbstractConnectionManager cm = db.getConnectionManager(tm.getLogin());
00991 Connection c = cm.retrieveConnection(tid);
00992
00993
00994 if (c == null)
00995 {
00996 db.stopTransaction(lTid);
00997
00998 throw new SQLException(Translate.get(
00999 "loadbalancer.unable.retrieve.connection", new String[]{
01000 String.valueOf(tid), db.getName()}));
01001 }
01002
01003
01004 try
01005 {
01006 c.commit();
01007 c.setAutoCommit(true);
01008 }
01009 catch (Exception e)
01010 {
01011 String msg = Translate.get("loadbalancer.commit.failed", new String[]{
01012 String.valueOf(tid), db.getName(), e.getMessage()});
01013 logger.error(msg);
01014 throw new SQLException(msg);
01015 }
01016 finally
01017 {
01018 cm.releaseConnection(tid);
01019 db.stopTransaction(lTid);
01020 }
01021 }
01022
01023
01024
01025
01026 public void rollback(TransactionMarkerMetaData tm) throws SQLException
01027 {
01028 long tid = tm.getTransactionId();
01029 Long lTid = new Long(tid);
01030 DatabaseBackend db = (DatabaseBackend) backendPerTransactionId.remove(lTid);
01031
01032 AbstractConnectionManager cm = db.getConnectionManager(tm.getLogin());
01033 Connection c = cm.retrieveConnection(tid);
01034
01035
01036 if (c == null)
01037 {
01038 db.stopTransaction(lTid);
01039
01040 throw new SQLException(Translate.get(
01041 "loadbalancer.unable.retrieve.connection", new String[]{
01042 String.valueOf(tid), db.getName()}));
01043 }
01044
01045
01046 try
01047 {
01048 c.rollback();
01049 c.setAutoCommit(true);
01050 }
01051 catch (Exception e)
01052 {
01053 String msg = Translate.get("loadbalancer.rollback.failed", new String[]{
01054 String.valueOf(tid), db.getName(), e.getMessage()});
01055 logger.error(msg);
01056 throw new SQLException(msg);
01057 }
01058 finally
01059 {
01060 cm.releaseConnection(tid);
01061 db.stopTransaction(lTid);
01062 }
01063 }
01064
01065
01066
01067
01068
01069
01070
01071
01072
01073
01074
01075 public void enableBackend(DatabaseBackend db, boolean writeEnabled)
01076 throws SQLException
01077 {
01078 logger.info(Translate.get("loadbalancer.backend.enabling", db.getName()));
01079 if (!db.isInitialized())
01080 db.initializeConnections();
01081 db.enableRead();
01082 if (writeEnabled)
01083 db.enableWrite();
01084 }
01085
01086
01087
01088
01089
01090
01091
01092
01093
01094
01095 public void disableBackend(DatabaseBackend db) throws SQLException
01096 {
01097 logger.info(Translate.get("loadbalancer.backend.disabling", db.getName()));
01098 db.disable();
01099 if (db.isInitialized())
01100 db.finalizeConnections();
01101 }
01102
01103
01104
01105
01106 public String getXmlImpl()
01107 {
01108 StringBuffer info = new StringBuffer();
01109 info.append("<" + DatabasesXmlTags.ELT_ParallelDB + ">");
01110 info.append(getParallelDBXml());
01111 info.append("</" + DatabasesXmlTags.ELT_ParallelDB + ">");
01112 return info.toString();
01113 }
01114
01115
01116
01117
01118
01119
01120 public abstract String getParallelDBXml();
01121
01122
01123
01124
01125
01126
01127
01128
01129
01130 public abstract DatabaseBackend chooseBackendForReadRequest(
01131 AbstractRequest request) throws SQLException;
01132
01133
01134
01135
01136
01137
01138
01139
01140
01141 public abstract DatabaseBackend chooseBackendForWriteRequest(
01142 AbstractWriteRequest request) throws SQLException;
01143
01144 }