00001
00025 package org.objectweb.cjdbc.controller.loadbalancer.paralleldb;
00026
00027
import java.sql.Connection;
00028
import java.sql.SQLException;
00029
import java.util.Hashtable;
00030
00031
import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
00032
import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
00033
import org.objectweb.cjdbc.common.i18n.Translate;
00034
import org.objectweb.cjdbc.common.sql.AbstractRequest;
00035
import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00036
import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00037
import org.objectweb.cjdbc.common.sql.SelectRequest;
00038
import org.objectweb.cjdbc.common.sql.StoredProcedure;
00039
import org.objectweb.cjdbc.common.sql.UnknownRequest;
00040
import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00041
import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00042
import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
00043
import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
00044
import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00045
import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00046
import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00047
import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00048
import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00049
import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00050
00068 public abstract class ParallelDB extends AbstractLoadBalancer
00069 {
00070
00071 private Hashtable
backendPerTransactionId;
00072
00080 public ParallelDB(
VirtualDatabase vdb)
throws SQLException
00081 {
00082 super(vdb,
RAIDbLevels.SingleDB,
ParsingGranularities.NO_PARSING);
00083 }
00084
00089 public ControllerResultSet execReadRequest(
SelectRequest request,
00090
MetadataCache metadataCache)
throws SQLException
00091 {
00092
DatabaseBackend backend;
00093
if (request.isAutoCommit())
00094 backend =
chooseBackendForReadRequest(request);
00095
else
00096 backend = (
DatabaseBackend)
backendPerTransactionId.get(
new Long(request
00097 .getTransactionId()));
00098
00099
if (backend == null)
00100
throw new SQLException(
Translate.get(
00101
"loadbalancer.execute.no.backend.found", request.getSQLShortForm(vdb
00102 .
getSQLShortFormLength())));
00103
00104
ControllerResultSet rs = null;
00105
00106
try
00107 {
00108 rs =
executeReadRequestOnBackend(request, backend, metadataCache);
00109 }
00110
catch (
UnreachableBackendException urbe)
00111 {
00112
00113
return execReadRequest(request, metadataCache);
00114 }
00115
catch (SQLException se)
00116 {
00117 String msg =
Translate.get(
"loadbalancer.request.failed",
new String[]{
00118 String.valueOf(request.getId()), se.getMessage()});
00119
if (logger.
isInfoEnabled())
00120 logger.
info(msg);
00121
throw new SQLException(msg);
00122 }
00123
catch (RuntimeException e)
00124 {
00125 String msg =
Translate.get(
"loadbalancer.request.failed.on.backend",
00126
new String[]{request.getSQLShortForm(vdb.
getSQLShortFormLength()),
00127 backend.getName(), e.getMessage()});
00128 logger.error(msg, e);
00129
throw new SQLException(msg);
00130 }
00131
00132
return rs;
00133 }
00134
00139 public ControllerResultSet execReadOnlyReadStoredProcedure(
00140
StoredProcedure proc,
MetadataCache metadataCache)
throws SQLException
00141 {
00142
return execReadStoredProcedure(proc, metadataCache);
00143 }
00144
00149 public ControllerResultSet execReadStoredProcedure(
StoredProcedure proc,
00150
MetadataCache metadataCache)
throws SQLException
00151 {
00152
DatabaseBackend backend;
00153
if (proc.isAutoCommit())
00154 backend = chooseBackendForReadRequest(proc);
00155
else
00156 backend = (
DatabaseBackend) backendPerTransactionId.get(
new Long(proc
00157 .getTransactionId()));
00158
00159
if (backend == null)
00160
throw new SQLException(
Translate.get(
00161
"loadbalancer.storedprocedure.no.backend.found", proc
00162 .getSQLShortForm(vdb.getSQLShortFormLength())));
00163
00164
ControllerResultSet rs = null;
00165
00166
try
00167 {
00168 rs = executeReadStoredProcedureOnBackend(proc, backend, metadataCache);
00169 }
00170
catch (
UnreachableBackendException urbe)
00171 {
00172
00173
return execReadStoredProcedure(proc, metadataCache);
00174 }
00175
catch (SQLException se)
00176 {
00177 String msg =
Translate.get(
"loadbalancer.storedprocedure.failed",
00178
new String[]{String.valueOf(proc.getId()), se.getMessage()});
00179
if (logger.isInfoEnabled())
00180 logger.info(msg);
00181
throw new SQLException(msg);
00182 }
00183
catch (RuntimeException e)
00184 {
00185 String msg =
Translate.get(
00186
"loadbalancer.storedprocedure.failed.on.backend",
new String[]{
00187 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00188 backend.getName(), e.getMessage()});
00189 logger.error(msg, e);
00190
throw new SQLException(msg);
00191 }
00192
00193
return rs;
00194 }
00195
00199 public int execWriteRequest(
AbstractWriteRequest request)
00200
throws AllBackendsFailedException, SQLException
00201 {
00202
DatabaseBackend backend;
00203
if (request.isAutoCommit())
00204 backend = chooseBackendForWriteRequest(request);
00205
else
00206 backend = (
DatabaseBackend) backendPerTransactionId.get(
new Long(request
00207 .getTransactionId()));
00208
00209
if (backend == null)
00210
throw new SQLException(
Translate.get(
00211
"loadbalancer.execute.no.backend.found", request.getSQLShortForm(vdb
00212 .getSQLShortFormLength())));
00213
00214
int result;
00215
00216
try
00217 {
00218 result = executeWriteRequestOnBackend(request, backend);
00219 }
00220
catch (
UnreachableBackendException urbe)
00221 {
00222
00223
return execWriteRequest(request);
00224 }
00225
catch (SQLException se)
00226 {
00227 String msg =
Translate.get(
"loadbalancer.request.failed",
new String[]{
00228 String.valueOf(request.getId()), se.getMessage()});
00229
if (logger.isInfoEnabled())
00230 logger.info(msg);
00231
throw new SQLException(msg);
00232 }
00233
catch (RuntimeException e)
00234 {
00235 String msg =
Translate.get(
"loadbalancer.request.failed.on.backend",
00236
new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00237 backend.getName(), e.getMessage()});
00238 logger.error(msg, e);
00239
throw new SQLException(msg);
00240 }
00241
00242
return result;
00243 }
00244
00249 public ControllerResultSet execWriteRequestWithKeys(
00250
AbstractWriteRequest request,
MetadataCache metadataCache)
00251
throws AllBackendsFailedException, SQLException
00252 {
00253
DatabaseBackend backend;
00254
if (request.isAutoCommit())
00255 backend = chooseBackendForWriteRequest(request);
00256
else
00257 backend = (
DatabaseBackend) backendPerTransactionId.get(
new Long(request
00258 .getTransactionId()));
00259
00260
if (backend == null)
00261
throw new SQLException(
Translate.get(
00262
"loadbalancer.execute.no.backend.found", request.getSQLShortForm(vdb
00263 .getSQLShortFormLength())));
00264
00265
ControllerResultSet rs;
00266
00267
try
00268 {
00269 rs = executeWriteRequestWithKeysOnBackend(request, backend, metadataCache);
00270 }
00271
catch (
UnreachableBackendException urbe)
00272 {
00273
00274
return execWriteRequestWithKeys(request, metadataCache);
00275 }
00276
catch (SQLException se)
00277 {
00278 String msg =
Translate.get(
"loadbalancer.request.failed",
new String[]{
00279 String.valueOf(request.getId()), se.getMessage()});
00280
if (logger.isInfoEnabled())
00281 logger.info(msg);
00282
throw new SQLException(msg);
00283 }
00284
catch (RuntimeException e)
00285 {
00286 String msg =
Translate.get(
"loadbalancer.request.failed.on.backend",
00287
new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00288 backend.getName(), e.getMessage()});
00289 logger.error(msg, e);
00290
throw new SQLException(msg);
00291 }
00292
00293
return rs;
00294 }
00295
00299 public int execWriteStoredProcedure(
StoredProcedure proc)
throws SQLException
00300 {
00301
DatabaseBackend backend;
00302
if (proc.isAutoCommit())
00303 backend = chooseBackendForReadRequest(proc);
00304
else
00305 backend = (
DatabaseBackend) backendPerTransactionId.get(
new Long(proc
00306 .getTransactionId()));
00307
00308
if (backend == null)
00309
throw new SQLException(
Translate.get(
00310
"loadbalancer.storedprocedure.no.backend.found", proc
00311 .getSQLShortForm(vdb.getSQLShortFormLength())));
00312
00313
int result;
00314
00315
try
00316 {
00317 result = executeWriteStoredProcedureOnBackend(proc, backend);
00318 }
00319
catch (
UnreachableBackendException urbe)
00320 {
00321
00322
return execWriteStoredProcedure(proc);
00323 }
00324
catch (SQLException se)
00325 {
00326 String msg =
Translate.get(
"loadbalancer.storedprocedure.failed",
00327
new String[]{String.valueOf(proc.getId()), se.getMessage()});
00328
if (logger.isInfoEnabled())
00329 logger.info(msg);
00330
throw new SQLException(msg);
00331 }
00332
catch (RuntimeException e)
00333 {
00334 String msg =
Translate.get(
00335
"loadbalancer.storedprocedure.failed.on.backend",
new String[]{
00336 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00337 backend.getName(), e.getMessage()});
00338 logger.error(msg, e);
00339
throw new SQLException(msg);
00340 }
00341
00342
return result;
00343 }
00344
00354 private ControllerResultSet executeReadRequestOnBackend(
00355
SelectRequest request,
DatabaseBackend backend,
00356
MetadataCache metadataCache)
throws SQLException,
00357
UnreachableBackendException
00358 {
00359
00360
AbstractConnectionManager cm = backend.getConnectionManager(request
00361 .getLogin());
00362
00363
00364
if (cm == null)
00365 {
00366 String msg =
Translate.get(
"loadbalancer.connectionmanager.not.found",
00367
new String[]{request.getLogin(), backend.getName()});
00368 logger.error(msg);
00369
throw new SQLException(msg);
00370 }
00371
00372
00373
if (request.isAutoCommit())
00374 {
00375
ControllerResultSet rs = null;
00376
boolean badConnection;
00377
do
00378 {
00379 badConnection =
false;
00380
00381 Connection c = null;
00382
try
00383 {
00384 c = cm.getConnection();
00385 }
00386
catch (UnreachableBackendException e1)
00387 {
00388 logger.error(
Translate.get(
00389
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00390 backend.disable();
00391
throw new UnreachableBackendException(
Translate.get(
00392
"loadbalancer.backend.unreacheable", backend.getName()));
00393 }
00394
00395
00396
if (c == null)
00397
throw new SQLException(Translate.get(
00398
"loadbalancer.backend.no.connection", backend.getName()));
00399
00400
00401
try
00402 {
00403 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00404 cm.releaseConnection(c);
00405 }
00406
catch (SQLException e)
00407 {
00408 cm.releaseConnection(c);
00409
throw new SQLException(Translate.get(
00410
"loadbalancer.request.failed.on.backend",
new String[]{
00411 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00412 backend.getName(), e.getMessage()}));
00413 }
00414
catch (BadConnectionException e)
00415 {
00416 cm.deleteConnection(c);
00417 badConnection =
true;
00418 }
00419 }
00420
while (badConnection);
00421
if (logger.isDebugEnabled())
00422 logger.debug(Translate.get(
"loadbalancer.execute.on",
new String[]{
00423 String.valueOf(request.getId()), backend.getName()}));
00424
return rs;
00425 }
00426
else
00427 {
00428 Connection c;
00429
long tid = request.getTransactionId();
00430 Long lTid =
new Long(tid);
00431
00432
if (!backend.isStartedTransaction(lTid))
00433 {
00434
try
00435 {
00436 c = cm.getConnection(tid);
00437 }
00438
catch (UnreachableBackendException e1)
00439 {
00440 logger.error(Translate.get(
00441
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00442 backend.disable();
00443
throw new UnreachableBackendException(Translate.get(
00444
"loadbalancer.backend.unreacheable", backend.getName()));
00445 }
00446
00447
00448
if (c == null)
00449
throw new SQLException(Translate.get(
00450
"loadbalancer.unable.get.connection",
new String[]{
00451 String.valueOf(tid), backend.getName()}));
00452
00453
00454 backend.startTransaction(lTid);
00455 c.setAutoCommit(
false);
00456 }
00457
else
00458 {
00459 c = cm.retrieveConnection(tid);
00460
00461
00462
if (c == null)
00463
throw new SQLException(Translate.get(
00464
"loadbalancer.unable.retrieve.connection",
new String[]{
00465 String.valueOf(tid), backend.getName()}));
00466 }
00467
00468
00469 ControllerResultSet rs = null;
00470
try
00471 {
00472 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00473 }
00474
catch (SQLException e)
00475 {
00476
throw new SQLException(Translate.get(
00477
"loadbalancer.request.failed.on.backend",
new String[]{
00478 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00479 backend.getName(), e.getMessage()}));
00480 }
00481
catch (BadConnectionException e)
00482 {
00483
00484 cm.deleteConnection(tid);
00485 String msg = Translate.get(
00486
"loadbalancer.backend.disabling.connection.failure", backend
00487 .getName());
00488 logger.error(msg);
00489 backend.disable();
00490
throw new SQLException(msg);
00491 }
00492
if (logger.isDebugEnabled())
00493 logger.debug(Translate.get(
"loadbalancer.execute.transaction.on",
00494
new String[]{String.valueOf(tid), String.valueOf(request.getId()),
00495 backend.getName()}));
00496
return rs;
00497 }
00498 }
00499
00509 private ControllerResultSet executeReadStoredProcedureOnBackend(
00510
StoredProcedure proc,
DatabaseBackend backend,
MetadataCache metadataCache)
00511
throws SQLException,
UnreachableBackendException
00512 {
00513
00514
AbstractConnectionManager cm = backend
00515 .getConnectionManager(proc.getLogin());
00516
00517
00518
if (cm == null)
00519 {
00520 String msg =
Translate.get(
"loadbalancer.connectionmanager.not.found",
00521
new String[]{proc.getLogin(), backend.getName()});
00522 logger.error(msg);
00523
throw new SQLException(msg);
00524 }
00525
00526
00527
if (proc.isAutoCommit())
00528 {
00529
00530 Connection c = null;
00531
try
00532 {
00533 c = cm.getConnection();
00534 }
00535
catch (UnreachableBackendException e1)
00536 {
00537 logger.error(
Translate.get(
00538
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00539 backend.disable();
00540
throw new UnreachableBackendException(
Translate.get(
00541
"loadbalancer.backend.unreacheable", backend.getName()));
00542 }
00543
00544
00545
if (c == null)
00546
throw new UnreachableBackendException(Translate.get(
00547
"loadbalancer.backend.no.connection", backend.getName()));
00548
00549
00550 ControllerResultSet rs = null;
00551
try
00552 {
00553 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00554 backend, c, metadataCache);
00555 }
00556
catch (Exception e)
00557 {
00558
throw new SQLException(Translate.get(
00559
"loadbalancer.storedprocedure.failed.on.backend",
new String[]{
00560 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00561 backend.getName(), e.getMessage()}));
00562 }
00563 finally
00564 {
00565 cm.releaseConnection(c);
00566 }
00567
if (logger.isDebugEnabled())
00568 logger.debug(Translate.get(
"loadbalancer.storedprocedure.on",
00569
new String[]{String.valueOf(proc.getId()), backend.getName()}));
00570
return rs;
00571 }
00572
else
00573 {
00574 Connection c;
00575
long tid = proc.getTransactionId();
00576 Long lTid =
new Long(tid);
00577
00578
if (!backend.isStartedTransaction(lTid))
00579 {
00580
try
00581 {
00582 c = cm.getConnection(tid);
00583 }
00584
catch (UnreachableBackendException e1)
00585 {
00586 logger.error(Translate.get(
00587
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00588 backend.disable();
00589
throw new UnreachableBackendException(Translate.get(
00590
"loadbalancer.backend.unreacheable", backend.getName()));
00591 }
00592
00593
00594
if (c == null)
00595
throw new SQLException(Translate.get(
00596
"loadbalancer.unable.get.connection",
new String[]{
00597 String.valueOf(tid), backend.getName()}));
00598
00599
00600 backend.startTransaction(lTid);
00601 c.setAutoCommit(
false);
00602 }
00603
else
00604 {
00605 c = cm.retrieveConnection(tid);
00606
00607
00608
if (c == null)
00609
throw new SQLException(Translate.get(
00610
"loadbalancer.unable.retrieve.connection",
new String[]{
00611 String.valueOf(tid), backend.getName()}));
00612 }
00613
00614
00615 ControllerResultSet rs;
00616
try
00617 {
00618 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00619 backend, c, metadataCache);
00620 }
00621
catch (Exception e)
00622 {
00623
throw new SQLException(Translate.get(
00624
"loadbalancer.storedprocedure.failed.on.backend",
new String[]{
00625 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00626 backend.getName(), e.getMessage()}));
00627 }
00628
if (logger.isDebugEnabled())
00629 logger.debug(Translate.get(
"loadbalancer.execute.transaction.on",
00630
new String[]{String.valueOf(tid), String.valueOf(proc.getId()),
00631 backend.getName()}));
00632
return rs;
00633 }
00634 }
00635
00644 private int executeWriteRequestOnBackend(
AbstractWriteRequest request,
00645
DatabaseBackend backend)
throws SQLException,
UnreachableBackendException
00646 {
00647
if (backend == null)
00648
throw new SQLException(
Translate.get(
00649
"loadbalancer.execute.no.backend.available", request.getId()));
00650
00651
try
00652 {
00653
AbstractConnectionManager cm = backend.getConnectionManager(request
00654 .getLogin());
00655
if (request.isAutoCommit())
00656 {
00657 Connection c = null;
00658
try
00659 {
00660 c = cm.
getConnection();
00661 }
00662
catch (
UnreachableBackendException e1)
00663 {
00664 String backendName = backend.getName();
00665 logger.error(
Translate.get(
00666
"loadbalancer.backend.disabling.unreachable", backendName));
00667 backend.disable();
00668
throw new UnreachableBackendException(
Translate.get(
00669
"loadbalancer.backend.unreacheable", backendName));
00670 }
00671
00672
00673
if (c == null)
00674
throw new UnreachableBackendException(
Translate.get(
00675
"loadbalancer.backend.no.connection", backend.getName()));
00676
00677
00678
int result;
00679
try
00680 {
00681 result = executeUpdateRequestOnBackend(request, backend, c);
00682 }
00683
catch (Exception e)
00684 {
00685
throw new SQLException(
Translate.get(
00686
"loadbalancer.request.failed.on.backend",
new String[]{
00687 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00688 backend.getName(), e.getMessage()}));
00689 }
00690 finally
00691 {
00692 cm.
releaseConnection(c);
00693 }
00694
return result;
00695 }
00696
else
00697 {
00698 Connection c = cm.retrieveConnection(request.getTransactionId());
00699
00700
00701
if (c == null)
00702
throw new SQLException(
Translate.get(
00703
"loadbalancer.unable.retrieve.connection",
00704
new String[]{String.valueOf(request.getTransactionId()),
00705 backend.getName()}));
00706
00707
00708
int result;
00709
try
00710 {
00711 result = executeUpdateRequestOnBackend(request, backend, c);
00712
return result;
00713 }
00714
catch (Exception e)
00715 {
00716
throw new SQLException(
Translate.get(
00717
"loadbalancer.request.failed.on.backend",
new String[]{
00718 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00719 backend.getName(), e.getMessage()}));
00720 }
00721 }
00722 }
00723
catch (RuntimeException e)
00724 {
00725 String msg = Translate.get(
"loadbalancer.request.failed.on.backend",
00726
new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00727 backend.getName(), e.getMessage()});
00728 logger.fatal(msg, e);
00729
throw new SQLException(msg);
00730 }
00731 }
00732
00743 private ControllerResultSet executeWriteRequestWithKeysOnBackend(
00744
AbstractWriteRequest request,
DatabaseBackend backend,
00745
MetadataCache metadataCache)
throws SQLException,
00746
UnreachableBackendException
00747 {
00748
if (backend == null)
00749
throw new SQLException(
Translate.get(
00750
"loadbalancer.execute.no.backend.available", request.getId()));
00751
00752
if (!backend.getDriverCompliance().supportGetGeneratedKeys())
00753
throw new SQLException(
Translate.get(
00754
"loadbalancer.backend.autogeneratedkeys.unsupported", backend
00755 .getName()));
00756
00757
try
00758 {
00759
AbstractConnectionManager cm = backend.getConnectionManager(request
00760 .getLogin());
00761
if (request.isAutoCommit())
00762 {
00763 Connection c = null;
00764
try
00765 {
00766 c = cm.
getConnection();
00767 }
00768
catch (
UnreachableBackendException e1)
00769 {
00770 String backendName = backend.getName();
00771 logger.error(
Translate.get(
00772
"loadbalancer.backend.disabling.unreachable", backendName));
00773 backend.disable();
00774
throw new UnreachableBackendException(
Translate.get(
00775
"loadbalancer.backend.unreacheable", backendName));
00776 }
00777
00778
00779
if (c == null)
00780
throw new UnreachableBackendException(
Translate.get(
00781
"loadbalancer.backend.no.connection", backend.getName()));
00782
00783
00784
ControllerResultSet result;
00785
try
00786 {
00787 result =
new ControllerResultSet(request,
00788 executeUpdateRequestOnBackendWithKeys(request, backend, c),
00789 metadataCache);
00790 }
00791
catch (Exception e)
00792 {
00793
throw new SQLException(
Translate.get(
00794
"loadbalancer.request.failed.on.backend",
new String[]{
00795 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00796 backend.getName(), e.getMessage()}));
00797 }
00798 finally
00799 {
00800 cm.
releaseConnection(c);
00801 }
00802
return result;
00803 }
00804
else
00805 {
00806 Connection c = cm.retrieveConnection(request.getTransactionId());
00807
00808
00809
if (c == null)
00810
throw new SQLException(
Translate.get(
00811
"loadbalancer.unable.retrieve.connection",
00812
new String[]{String.valueOf(request.getTransactionId()),
00813 backend.getName()}));
00814
00815
00816
try
00817 {
00818
return new ControllerResultSet(request,
00819 executeUpdateRequestOnBackendWithKeys(request, backend, c),
00820 metadataCache);
00821 }
00822
catch (Exception e)
00823 {
00824
throw new SQLException(
Translate.get(
00825
"loadbalancer.request.failed.on.backend",
new String[]{
00826 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00827 backend.getName(), e.getMessage()}));
00828 }
00829 }
00830 }
00831
catch (RuntimeException e)
00832 {
00833 String msg = Translate
00834 .get(
"loadbalancer.request.failed",
new String[]{
00835 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00836 e.getMessage()});
00837 logger.fatal(msg, e);
00838
throw new SQLException(msg);
00839 }
00840 }
00841
00850 private int executeWriteStoredProcedureOnBackend(
StoredProcedure proc,
00851
DatabaseBackend backend)
throws SQLException,
UnreachableBackendException
00852 {
00853
00854
AbstractConnectionManager cm = backend
00855 .getConnectionManager(proc.getLogin());
00856
00857
00858
if (cm == null)
00859 {
00860 String msg =
Translate.get(
"loadbalancer.connectionmanager.not.found",
00861
new String[]{proc.getLogin(), backend.getName()});
00862 logger.error(msg);
00863
throw new SQLException(msg);
00864 }
00865
00866
00867
if (proc.isAutoCommit())
00868 {
00869
00870 Connection c = null;
00871
try
00872 {
00873 c = cm.getConnection();
00874 }
00875
catch (UnreachableBackendException e1)
00876 {
00877 logger.error(
Translate.get(
00878
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00879 backend.disable();
00880
throw new UnreachableBackendException(
Translate.get(
00881
"loadbalancer.backend.unreacheable", backend.getName()));
00882 }
00883
00884
00885
if (c == null)
00886
throw new UnreachableBackendException(Translate.get(
00887
"loadbalancer.backend.no.connection", backend.getName()));
00888
00889
00890
int result;
00891
try
00892 {
00893 result = AbstractLoadBalancer.executeWriteStoredProcedureOnBackend(
00894 proc, backend, c);
00895
00896
00897
00898 }
00899
catch (Exception e)
00900 {
00901
throw new SQLException(Translate.get(
00902
"loadbalancer.storedprocedure.failed.on.backend",
new String[]{
00903 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00904 backend.getName(), e.getMessage()}));
00905 }
00906 finally
00907 {
00908 cm.releaseConnection(c);
00909 }
00910
if (logger.isDebugEnabled())
00911 logger.debug(Translate.get(
"loadbalancer.storedprocedure.on",
00912
new String[]{String.valueOf(proc.getId()), backend.getName()}));
00913
return result;
00914 }
00915
else
00916 {
00917 Connection c;
00918
long tid = proc.getTransactionId();
00919 Long lTid =
new Long(tid);
00920
00921
if (!backend.isStartedTransaction(lTid))
00922 {
00923
try
00924 {
00925 c = cm.getConnection(tid);
00926 }
00927
catch (UnreachableBackendException e1)
00928 {
00929 logger.error(Translate.get(
00930
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00931 backend.disable();
00932
throw new SQLException(Translate.get(
00933
"loadbalancer.backend.unreacheable", backend.getName()));
00934 }
00935
00936
00937
if (c == null)
00938
throw new SQLException(Translate.get(
00939
"loadbalancer.unable.get.connection",
new String[]{
00940 String.valueOf(tid), backend.getName()}));
00941
00942
00943 backend.startTransaction(lTid);
00944 c.setAutoCommit(
false);
00945 }
00946
else
00947 {
00948 c = cm.retrieveConnection(tid);
00949
00950
00951
if (c == null)
00952
throw new SQLException(Translate.get(
00953
"loadbalancer.unable.retrieve.connection",
new String[]{
00954 String.valueOf(tid), backend.getName()}));
00955 }
00956
00957
00958
int result;
00959
try
00960 {
00961 result = AbstractLoadBalancer.executeWriteStoredProcedureOnBackend(
00962 proc, backend, c);
00963
00964
00965
00966 }
00967
catch (Exception e)
00968 {
00969
throw new SQLException(Translate.get(
00970
"loadbalancer.storedprocedure.failed.on.backend",
new String[]{
00971 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00972 backend.getName(), e.getMessage()}));
00973 }
00974
if (logger.isDebugEnabled())
00975 logger.debug(Translate.get(
"loadbalancer.execute.transaction.on",
00976
new String[]{String.valueOf(tid), String.valueOf(proc.getId()),
00977 backend.getName()}));
00978
return result;
00979 }
00980 }
00981
00982
00983
00984
00985
00992 public void begin(
TransactionMarkerMetaData tm)
throws SQLException
00993 {
00994 Long lTid =
new Long(tm.getTransactionId());
00995
if (backendPerTransactionId.containsKey(lTid))
00996
throw new SQLException(
Translate.get(
00997
"loadbalancer.transaction.already.started", lTid.toString()));
00998
00999
DatabaseBackend backend = chooseBackendForReadRequest(
new UnknownRequest(
01000
"begin",
false, 0,
"\n"));
01001 backendPerTransactionId.put(lTid, backend);
01002 backend.
startTransaction(lTid);
01003 }
01004
01008 public void commit(
TransactionMarkerMetaData tm)
throws SQLException
01009 {
01010
long tid = tm.getTransactionId();
01011 Long lTid =
new Long(tid);
01012
DatabaseBackend db = (
DatabaseBackend) backendPerTransactionId.remove(lTid);
01013
01014
AbstractConnectionManager cm = db.
getConnectionManager(tm.getLogin());
01015 Connection c = cm.
retrieveConnection(tid);
01016
01017
01018
if (c == null)
01019 {
01020 db.
stopTransaction(lTid);
01021
01022
throw new SQLException(
Translate.get(
01023
"loadbalancer.unable.retrieve.connection",
new String[]{
01024 String.valueOf(tid), db.
getName()}));
01025 }
01026
01027
01028
try
01029 {
01030 c.commit();
01031 c.setAutoCommit(
true);
01032 }
01033
catch (Exception e)
01034 {
01035 String msg =
Translate.get(
"loadbalancer.commit.failed",
new String[]{
01036 String.valueOf(tid), db.getName(), e.getMessage()});
01037 logger.error(msg);
01038
throw new SQLException(msg);
01039 }
01040 finally
01041 {
01042 cm.releaseConnection(tid);
01043 db.stopTransaction(lTid);
01044 }
01045 }
01046
01050 public void rollback(
TransactionMarkerMetaData tm)
throws SQLException
01051 {
01052
long tid = tm.getTransactionId();
01053 Long lTid =
new Long(tid);
01054
DatabaseBackend db = (
DatabaseBackend) backendPerTransactionId.remove(lTid);
01055
01056
AbstractConnectionManager cm = db.
getConnectionManager(tm.getLogin());
01057 Connection c = cm.
retrieveConnection(tid);
01058
01059
01060
if (c == null)
01061 {
01062 db.
stopTransaction(lTid);
01063
01064
throw new SQLException(
Translate.get(
01065
"loadbalancer.unable.retrieve.connection",
new String[]{
01066 String.valueOf(tid), db.
getName()}));
01067 }
01068
01069
01070
try
01071 {
01072 c.rollback();
01073 c.setAutoCommit(
true);
01074 }
01075
catch (Exception e)
01076 {
01077 String msg =
Translate.get(
"loadbalancer.rollback.failed",
new String[]{
01078 String.valueOf(tid), db.getName(), e.getMessage()});
01079 logger.error(msg);
01080
throw new SQLException(msg);
01081 }
01082 finally
01083 {
01084 cm.releaseConnection(tid);
01085 db.stopTransaction(lTid);
01086 }
01087 }
01088
01099 public void enableBackend(
DatabaseBackend db,
boolean writeEnabled)
01100
throws SQLException
01101 {
01102 logger.info(
Translate.get(
"loadbalancer.backend.enabling", db.getName()));
01103
if (!db.isInitialized())
01104 db.initializeConnections();
01105 db.enableRead();
01106
if (writeEnabled)
01107 db.enableWrite();
01108 }
01109
01119 public void disableBackend(
DatabaseBackend db)
throws SQLException
01120 {
01121 logger.info(
Translate.get(
"loadbalancer.backend.disabling", db.getName()));
01122 db.disable();
01123
if (db.isInitialized())
01124 db.finalizeConnections();
01125 }
01126
01130 public String getXmlImpl()
01131 {
01132 StringBuffer info =
new StringBuffer();
01133 info.append(
"<" +
DatabasesXmlTags.ELT_ParallelDB +
">");
01134 info.append(getParallelDBXml());
01135 info.append(
"</" +
DatabasesXmlTags.ELT_ParallelDB +
">");
01136
return info.toString();
01137 }
01138
01144
public abstract String getParallelDBXml();
01145
01154
public abstract DatabaseBackend chooseBackendForReadRequest(
01155
AbstractRequest request)
throws SQLException;
01156
01165
public abstract DatabaseBackend chooseBackendForWriteRequest(
01166
AbstractWriteRequest request)
throws SQLException;
01167
01168 }