00001
00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb0;
00026
00027
import java.sql.Connection;
00028
import java.sql.SQLException;
00029
import java.util.ArrayList;
00030
00031
import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
00032
import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
00033
import org.objectweb.cjdbc.common.i18n.Translate;
00034
import org.objectweb.cjdbc.common.log.Trace;
00035
import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00036
import org.objectweb.cjdbc.common.sql.CreateRequest;
00037
import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00038
import org.objectweb.cjdbc.common.sql.SelectRequest;
00039
import org.objectweb.cjdbc.common.sql.StoredProcedure;
00040
import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema;
00041
import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock;
00042
import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00043
import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00044
import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
00045
import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
00046
import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00047
import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00048
import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableException;
00049
import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy;
00050
import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule;
00051
import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask;
00052
import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00053
import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask;
00054
import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00055
import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00056
import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00057
import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00058
00070 public class RAIDb0 extends AbstractLoadBalancer
00071 {
00072
00073
00074
00075
00076
00077
00078 private ArrayList
backendThreads;
00079 private ReadPrioritaryFIFOWriteLock
backendThreadsRWLock =
new ReadPrioritaryFIFOWriteLock();
00080 private CreateTablePolicy
createTablePolicy;
00081
00082 protected static Trace
logger = Trace
00083 .getLogger(
"org.objectweb.cjdbc.controller.loadbalancer.RAIDb0");
00084
00085
00086
00087
00088
00097 public RAIDb0(
VirtualDatabase vdb, CreateTablePolicy createTablePolicy)
00098
throws SQLException
00099 {
00100 super(vdb,
RAIDbLevels.RAIDb0,
ParsingGranularities.TABLE);
00101
backendThreads =
new ArrayList();
00102
this.createTablePolicy =
createTablePolicy;
00103 }
00104
00105
00106
00107
00108
00119 public ControllerResultSet execReadRequest(
SelectRequest request,
00120
MetadataCache metadataCache)
throws SQLException
00121 {
00122
try
00123 {
00124 vdb.
acquireReadLockBackendLists();
00125 }
00126
catch (InterruptedException e)
00127 {
00128 String msg =
Translate.get(
00129
"loadbalancer.backendlist.acquire.readlock.failed", e);
00130
logger.error(msg);
00131
throw new SQLException(msg);
00132 }
00133
00134
try
00135 {
00136
ControllerResultSet rs = null;
00137 ArrayList fromTables = request.getFrom();
00138
00139
if (fromTables == null)
00140
throw new SQLException(
Translate.get(
"loadbalancer.from.not.found",
00141 request.getSQLShortForm(vdb.
getSQLShortFormLength())));
00142
00143
00144 ArrayList backends = vdb.
getBackends();
00145
int size = backends.size();
00146
00147
DatabaseBackend backend = null;
00148
00149
for (
int i = 0; i < size; i++)
00150 {
00151 backend = (
DatabaseBackend) backends.get(i);
00152
if (backend.
isReadEnabled() && backend.
hasTables(fromTables))
00153
break;
00154 }
00155
if (
logger.isDebugEnabled())
00156 {
00157
logger.debug(
"Backend " + backend.
getName()
00158 +
" has all tables which are:");
00159
for (
int i = 0; i < fromTables.size(); i++)
00160 {
00161
logger.debug(fromTables.get(i));
00162 }
00163 }
00164
00165
00166
try
00167 {
00168 rs =
executeRequestOnBackend(request, backend, metadataCache);
00169 }
00170
catch (SQLException se)
00171 {
00172 String msg =
Translate.get(
"loadbalancer.request.failed",
new String[]{
00173 String.valueOf(request.getId()), se.getMessage()});
00174
if (
logger.isInfoEnabled())
00175
logger.info(msg);
00176
throw new SQLException(msg);
00177 }
00178
00179
return rs;
00180 }
00181
catch (RuntimeException e)
00182 {
00183 String msg =
Translate
00184 .get(
"loadbalancer.request.failed",
new String[]{
00185 request.getSQLShortForm(vdb.
getSQLShortFormLength()),
00186 e.getMessage()});
00187 logger.fatal(msg, e);
00188
throw new SQLException(msg);
00189 }
00190 finally
00191 {
00192 vdb.releaseReadLockBackendLists();
00193 }
00194 }
00195
00204 public int execWriteRequest(
AbstractWriteRequest request)
throws SQLException
00205 {
00206
00207 handleMacros(request);
00208
00209
try
00210 {
00211 vdb.acquireReadLockBackendLists();
00212 }
00213
catch (InterruptedException e)
00214 {
00215 String msg =
Translate.get(
00216
"loadbalancer.backendlist.acquire.readlock.failed", e);
00217 logger.error(msg);
00218
throw new SQLException(msg);
00219 }
00220
00221
try
00222 {
00223 String table = request.getTableName();
00224
AbstractConnectionManager cm = null;
00225
00226
if (table == null)
00227
throw new SQLException(
Translate.get(
00228
"loadbalancer.request.target.table.not.found", request
00229 .getSQLShortForm(vdb.getSQLShortFormLength())));
00230
00231
00232 ArrayList backends = vdb.getBackends();
00233
int size = backends.size();
00234
00235
DatabaseBackend backend = null;
00236
00237
if (request.isCreate())
00238 {
00239
CreateTableRule rule = createTablePolicy.getTableRule(request
00240 .getTableName());
00241
if (rule == null)
00242 rule = createTablePolicy.getDefaultRule();
00243
00244
00245 ArrayList choosen;
00246
try
00247 {
00248 choosen = rule.
getBackends(backends);
00249 }
00250
catch (
CreateTableException e)
00251 {
00252
throw new SQLException(
Translate.get(
00253
"loadbalancer.create.table.rule.failed", e.getMessage()));
00254 }
00255
00256
00257
if (choosen != null)
00258 backend = (
DatabaseBackend) choosen.get(0);
00259
if (backend != null)
00260 cm = backend.
getConnectionManager(request.getLogin());
00261 }
00262
else
00263 {
00264
for (
int i = 0; i < size; i++)
00265 {
00266 backend = (
DatabaseBackend) backends.get(i);
00267
if (backend.
isWriteEnabled() && backend.
hasTable(table))
00268 {
00269 cm = backend.
getConnectionManager(request.getLogin());
00270
break;
00271 }
00272 }
00273 }
00274
00275
00276
if (cm == null)
00277
throw new SQLException(
Translate.get(
00278
"loadbalancer.backend.no.required.table", table));
00279
00280
00281
00282
if (request.isAutoCommit())
00283 {
00284
00285
00286
if (backend.
isDisabling())
00287
throw new SQLException(
Translate.get(
00288
"loadbalancer.backend.is.disabling",
new String[]{
00289 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00290 backend.
getName()}));
00291
00292
00293 Connection c = null;
00294
try
00295 {
00296 c = cm.
getConnection();
00297 }
00298
catch (
UnreachableBackendException e1)
00299 {
00300 logger.error(
Translate.get(
00301
"loadbalancer.backend.disabling.unreachable", backend.
getName()));
00302 backend.
disable();
00303
throw new SQLException(
Translate.get(
00304
"loadbalancer.backend.unreacheable", backend.
getName()));
00305 }
00306
00307
00308
if (c == null)
00309
throw new SQLException(
Translate.get(
00310
"loadbalancer.backend.no.connection", backend.
getName()));
00311
00312
int result;
00313
try
00314 {
00315 result = executeUpdateRequestOnBackend(request, backend, c);
00316 updateSchema(backend, request);
00317 }
00318
catch (Exception e)
00319 {
00320
throw new SQLException(
Translate.get(
"loadbalancer.request.failed",
00321
new String[]{
00322 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00323 e.getMessage()}));
00324 }
00325 finally
00326 {
00327 cm.releaseConnection(c);
00328 }
00329
if (logger.isDebugEnabled())
00330 logger.debug(
Translate.get(
"loadbalancer.execute.on",
new String[]{
00331 String.valueOf(request.getId()), backend.getName()}));
00332
return result;
00333 }
00334
else
00335 {
00336 Connection c;
00337
long tid = request.getTransactionId();
00338 Long lTid =
new Long(tid);
00339
00340
if (!backend.isStartedTransaction(lTid))
00341 {
00342
00343
if (backend.isDisabling())
00344
throw new SQLException(
Translate.get(
00345
"loadbalancer.backend.is.disabling",
new String[]{
00346 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00347 backend.getName()}));
00348
00349
00350
try
00351 {
00352 c = cm.getConnection(tid);
00353 }
00354
catch (UnreachableBackendException e1)
00355 {
00356 logger.error(Translate
00357 .get(
"loadbalancer.backend.disabling.unreachable", backend
00358 .getName()));
00359 backend.disable();
00360
throw new SQLException(Translate.get(
00361
"loadbalancer.backend.unreacheable", backend.getName()));
00362 }
00363
00364
00365
if (c == null)
00366
throw new SQLException(Translate.get(
00367
"loadbalancer.unable.get.connection",
new String[]{
00368 String.valueOf(tid), backend.getName()}));
00369
00370
00371 backend.startTransaction(lTid);
00372 c.setAutoCommit(
false);
00373 }
00374
else
00375 {
00376 c = cm.retrieveConnection(tid);
00377
00378
00379
if (c == null)
00380
throw new SQLException(Translate.get(
00381
"loadbalancer.unable.retrieve.connection",
new String[]{
00382 String.valueOf(tid), backend.getName()}));
00383 }
00384
00385
00386
int result;
00387
try
00388 {
00389 result = executeUpdateRequestOnBackend(request, backend, c);
00390 updateSchema(backend, request);
00391 }
00392
catch (Exception e)
00393 {
00394
throw new SQLException(Translate.get(
"loadbalancer.request.failed",
00395
new String[]{
00396 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00397 e.getMessage()}));
00398 }
00399
if (logger.isDebugEnabled())
00400 logger.debug(Translate.get(
"loadbalancer.execute.on",
new String[]{
00401 String.valueOf(request.getId()), backend.getName()}));
00402
return result;
00403 }
00404 }
00405
catch (RuntimeException e)
00406 {
00407 String msg = Translate
00408 .get(
"loadbalancer.request.failed",
new String[]{
00409 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00410 e.getMessage()});
00411 logger.fatal(msg, e);
00412
throw new SQLException(msg);
00413 }
00414 finally
00415 {
00416 vdb.releaseReadLockBackendLists();
00417 }
00418 }
00419
00421 private void updateSchema(
DatabaseBackend b,
AbstractWriteRequest request)
00422 {
00423
DatabaseSchema dbs = b.
getDatabaseSchema();
00424
if (dbs == null)
00425
return;
00426
00427
if (request.
isCreate())
00428 {
00429
00430 dbs.
addTable(((
CreateRequest) request).getDatabaseTable());
00431
if (logger.isDebugEnabled())
00432 logger.debug(
Translate.get(
"loadbalancer.schema.add.table", request
00433 .
getTableName()));
00434 }
00435
else if (request.
isDrop())
00436 {
00437
00438 dbs.
removeTable(dbs.
getTable(request.
getTableName()));
00439
if (logger.isDebugEnabled())
00440 logger.debug(
Translate.get(
"loadbalancer.schema.remove.table", request
00441 .
getTableName()));
00442 }
00443 }
00444
00449 public ControllerResultSet execWriteRequestWithKeys(
00450
AbstractWriteRequest request,
MetadataCache metadataCache)
00451
throws SQLException
00452 {
00453
00454 handleMacros(request);
00455
00456
try
00457 {
00458 vdb.acquireReadLockBackendLists();
00459 }
00460
catch (InterruptedException e)
00461 {
00462 String msg =
Translate.get(
00463
"loadbalancer.backendlist.acquire.readlock.failed", e);
00464 logger.error(msg);
00465
throw new SQLException(msg);
00466 }
00467
00468
try
00469 {
00470 String table = request.getTableName();
00471
AbstractConnectionManager cm = null;
00472
00473
if (table == null)
00474
throw new SQLException(
Translate.get(
00475
"loadbalancer.request.target.table.not.found", request
00476 .getSQLShortForm(vdb.getSQLShortFormLength())));
00477
00478
00479 ArrayList backends = vdb.getBackends();
00480
int size = backends.size();
00481
00482
DatabaseBackend backend = null;
00483
00484
if (request.isCreate())
00485 {
00486
CreateTableRule rule = createTablePolicy.getTableRule(request
00487 .getTableName());
00488
if (rule == null)
00489 rule = createTablePolicy.getDefaultRule();
00490
00491
00492 ArrayList choosen;
00493
try
00494 {
00495 choosen = rule.
getBackends(backends);
00496 }
00497
catch (
CreateTableException e)
00498 {
00499
throw new SQLException(
Translate.get(
00500
"loadbalancer.create.table.rule.failed", e.getMessage()));
00501 }
00502
00503
00504
if (choosen != null)
00505 backend = (
DatabaseBackend) choosen.get(0);
00506
if (backend != null)
00507 cm = backend.
getConnectionManager(request.getLogin());
00508 }
00509
else
00510 {
00511
for (
int i = 0; i < size; i++)
00512 {
00513 backend = (
DatabaseBackend) backends.get(i);
00514
if (backend.
isWriteEnabled() && backend.
hasTable(table))
00515 {
00516 cm = backend.
getConnectionManager(request.getLogin());
00517
break;
00518 }
00519 }
00520 }
00521
00522
00523
if (cm == null)
00524
throw new SQLException(
Translate.get(
00525
"loadbalancer.backend.no.required.table", table));
00526
00527
if (!backend.
getDriverCompliance().
supportGetGeneratedKeys())
00528
throw new SQLException(
Translate.get(
00529
"loadbalancer.backend.autogeneratedkeys.unsupported", backend
00530 .
getName()));
00531
00532
00533
00534
if (request.isAutoCommit())
00535 {
00536
00537
00538
if (backend.
isDisabling())
00539
throw new SQLException(
Translate.get(
00540
"loadbalancer.backend.is.disabling",
new String[]{
00541 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00542 backend.
getName()}));
00543
00544
00545 Connection c = null;
00546
try
00547 {
00548 c = cm.
getConnection();
00549 }
00550
catch (
UnreachableBackendException e1)
00551 {
00552 logger.error(
Translate.get(
00553
"loadbalancer.backend.disabling.unreachable", backend.
getName()));
00554 backend.
disable();
00555
throw new SQLException(
Translate.get(
00556
"loadbalancer.backend.unreacheable", backend.
getName()));
00557 }
00558
00559
00560
if (c == null)
00561
throw new SQLException(
Translate.get(
00562
"loadbalancer.backend.no.connection", backend.
getName()));
00563
00564
00565
ControllerResultSet result;
00566
try
00567 {
00568 result =
new ControllerResultSet(request,
00569 executeUpdateRequestOnBackendWithKeys(request, backend, c),
00570 metadataCache);
00571 updateSchema(backend, request);
00572 }
00573
catch (Exception e)
00574 {
00575
throw new SQLException(
Translate.get(
"loadbalancer.request.failed",
00576
new String[]{
00577 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00578 e.getMessage()}));
00579 }
00580 finally
00581 {
00582 backend.removePendingRequest(request);
00583 cm.releaseConnection(c);
00584 }
00585
if (logger.isDebugEnabled())
00586 logger.debug(
Translate.get(
"loadbalancer.execute.on",
new String[]{
00587 String.valueOf(request.getId()), backend.getName()}));
00588
return result;
00589 }
00590
else
00591 {
00592 Connection c;
00593
long tid = request.getTransactionId();
00594 Long lTid =
new Long(tid);
00595
00596
if (!backend.isStartedTransaction(lTid))
00597 {
00598
00599
if (backend.isDisabling())
00600
throw new SQLException(
Translate.get(
00601
"loadbalancer.backend.is.disabling",
new String[]{
00602 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00603 backend.getName()}));
00604
00605
00606
try
00607 {
00608 c = cm.getConnection(tid);
00609 }
00610
catch (UnreachableBackendException e1)
00611 {
00612 logger.error(Translate
00613 .get(
"loadbalancer.backend.disabling.unreachable", backend
00614 .getName()));
00615 backend.disable();
00616
throw new SQLException(Translate.get(
00617
"loadbalancer.backend.unreacheable", backend.getName()));
00618 }
00619
00620
00621
if (c == null)
00622
throw new SQLException(Translate.get(
00623
"loadbalancer.unable.get.connection",
new String[]{
00624 String.valueOf(tid), backend.getName()}));
00625
00626
00627 backend.startTransaction(lTid);
00628 c.setAutoCommit(
false);
00629 }
00630
else
00631 {
00632 c = cm.retrieveConnection(tid);
00633
00634
00635
if (c == null)
00636
throw new SQLException(Translate.get(
00637
"loadbalancer.unable.retrieve.connection",
new String[]{
00638 String.valueOf(tid), backend.getName()}));
00639 }
00640
00641
00642 ControllerResultSet result;
00643
try
00644 {
00645 result =
new ControllerResultSet(request,
00646 executeUpdateRequestOnBackendWithKeys(request, backend, c),
00647 metadataCache);
00648 updateSchema(backend, request);
00649 }
00650
catch (Exception e)
00651 {
00652
throw new SQLException(Translate.get(
"loadbalancer.request.failed",
00653
new String[]{
00654 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00655 e.getMessage()}));
00656 }
00657
if (logger.isDebugEnabled())
00658 logger.debug(Translate.get(
"loadbalancer.execute.on",
new String[]{
00659 String.valueOf(request.getId()), backend.getName()}));
00660
return result;
00661 }
00662 }
00663
catch (RuntimeException e)
00664 {
00665 String msg = Translate
00666 .get(
"loadbalancer.request.failed",
new String[]{
00667 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00668 e.getMessage()});
00669 logger.fatal(msg, e);
00670
throw new SQLException(msg);
00671 }
00672 finally
00673 {
00674 vdb.releaseReadLockBackendLists();
00675 }
00676 }
00677
00687 protected ControllerResultSet executeRequestOnBackend(
SelectRequest request,
00688
DatabaseBackend backend,
MetadataCache metadataCache)
throws SQLException
00689 {
00690
00691 handleMacros(request);
00692
00693
00694
AbstractConnectionManager cm = backend.getConnectionManager(request
00695 .getLogin());
00696
00697
00698
if (cm == null)
00699 {
00700 String msg =
Translate.get(
"loadbalancer.connectionmanager.not.found",
00701
new String[]{request.getLogin(), backend.getName()});
00702 logger.error(msg);
00703
throw new SQLException(msg);
00704 }
00705
00706
00707
if (request.isAutoCommit())
00708 {
00709
ControllerResultSet rs = null;
00710
boolean badConnection;
00711
do
00712 {
00713 badConnection =
false;
00714
00715 Connection c = null;
00716
try
00717 {
00718 c = cm.getConnection();
00719 }
00720
catch (UnreachableBackendException e1)
00721 {
00722 logger.error(
Translate.get(
00723
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00724 backend.disable();
00725
throw new SQLException(
Translate.get(
00726
"loadbalancer.backend.unreacheable", backend.getName()));
00727 }
00728
00729
00730
if (c == null)
00731
throw new SQLException(Translate.get(
00732
"loadbalancer.backend.no.connection", backend.getName()));
00733
00734
00735
try
00736 {
00737 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00738 cm.releaseConnection(c);
00739 }
00740
catch (SQLException e)
00741 {
00742 cm.releaseConnection(c);
00743
throw new SQLException(Translate.get(
00744
"loadbalancer.request.failed.on.backend",
new String[]{
00745 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00746 backend.getName(), e.getMessage()}));
00747 }
00748
catch (BadConnectionException e)
00749 {
00750 cm.deleteConnection(c);
00751 badConnection =
true;
00752 }
00753 }
00754
while (badConnection);
00755
if (logger.isDebugEnabled())
00756 logger.debug(Translate.get(
"loadbalancer.execute.on",
new String[]{
00757 String.valueOf(request.getId()), backend.getName()}));
00758
return rs;
00759 }
00760
else
00761 {
00762 Connection c;
00763
long tid = request.getTransactionId();
00764 Long lTid =
new Long(tid);
00765
00766
if (!backend.isStartedTransaction(lTid))
00767 {
00768
try
00769 {
00770 c = cm.getConnection(tid);
00771 }
00772
catch (UnreachableBackendException e1)
00773 {
00774 logger.error(Translate.get(
00775
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00776 backend.disable();
00777
throw new SQLException(Translate.get(
00778
"loadbalancer.backend.unreacheable", backend.getName()));
00779 }
00780
00781
00782
if (c == null)
00783
throw new SQLException(Translate.get(
00784
"loadbalancer.unable.get.connection",
new String[]{
00785 String.valueOf(tid), backend.getName()}));
00786
00787
00788 backend.startTransaction(lTid);
00789 c.setAutoCommit(
false);
00790 }
00791
else
00792 {
00793 c = cm.retrieveConnection(tid);
00794
00795
00796
if (c == null)
00797
throw new SQLException(Translate.get(
00798
"loadbalancer.unable.retrieve.connection",
new String[]{
00799 String.valueOf(tid), backend.getName()}));
00800 }
00801
00802
00803 ControllerResultSet rs = null;
00804
try
00805 {
00806 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00807 }
00808
catch (SQLException e)
00809 {
00810
throw new SQLException(Translate.get(
00811
"loadbalancer.request.failed.on.backend",
new String[]{
00812 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00813 backend.getName(), e.getMessage()}));
00814 }
00815
catch (BadConnectionException e)
00816 {
00817 cm.deleteConnection(tid);
00818
throw new SQLException(Translate
00819 .get(
"loadbalancer.connection.failed",
new String[]{
00820 String.valueOf(tid), backend.getName(), e.getMessage()}));
00821 }
00822
if (logger.isDebugEnabled())
00823 logger.debug(Translate.get(
"loadbalancer.execute.transaction.on",
00824
new String[]{String.valueOf(tid), String.valueOf(request.getId()),
00825 backend.getName()}));
00826
return rs;
00827 }
00828 }
00829
00834 public ControllerResultSet execReadOnlyReadStoredProcedure(
00835
StoredProcedure proc,
MetadataCache metadataCache)
throws SQLException
00836 {
00837
throw new SQLException(
00838
"Stored procedure calls are not supported with RAIDb-0 load balancers.");
00839 }
00840
00845 public ControllerResultSet execReadStoredProcedure(
StoredProcedure proc,
00846
MetadataCache metadataCache)
throws SQLException
00847 {
00848
throw new SQLException(
00849
"Stored procedure calls are not supported with RAIDb-0 load balancers.");
00850 }
00851
00855 public int execWriteStoredProcedure(
StoredProcedure proc)
throws SQLException
00856 {
00857
throw new SQLException(
00858
"Stored procedure calls are not supported with RAIDb-0 load balancers.");
00859 }
00860
00861
00862
00863
00864
00871 public final void begin(
TransactionMarkerMetaData tm)
throws SQLException
00872 {
00873 }
00874
00881 public void commit(
TransactionMarkerMetaData tm)
throws SQLException
00882 {
00883
try
00884 {
00885 backendThreadsRWLock.acquireRead();
00886 }
00887
catch (InterruptedException e)
00888 {
00889 String msg =
Translate.get(
00890
"loadbalancer.backendlist.acquire.readlock.failed", e);
00891 logger.error(msg);
00892
throw new SQLException(msg);
00893 }
00894
00895
int nbOfThreads = backendThreads.size();
00896 ArrayList commitList =
new ArrayList();
00897 Long lTid =
new Long(tm.getTransactionId());
00898
00899
00900
for (
int i = 0; i < nbOfThreads; i++)
00901 {
00902
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads.get(i);
00903
if (thread.
getBackend().
isStartedTransaction(lTid))
00904 commitList.add(thread);
00905 }
00906
00907 nbOfThreads = commitList.size();
00908
00909
CommitTask task =
new CommitTask(nbOfThreads,
00910 nbOfThreads, tm.getTimeout(), tm.getLogin(), tm.getTransactionId());
00911
00912
synchronized (task)
00913 {
00914
00915
for (
int i = 0; i < nbOfThreads; i++)
00916 {
00917
BackendWorkerThread thread = (
BackendWorkerThread) commitList.get(i);
00918
synchronized (thread)
00919 {
00920 thread.
addTask(task);
00921 thread.notify();
00922 }
00923 }
00924
00925 backendThreadsRWLock.releaseRead();
00926
00927
00928
try
00929 {
00930
00931
long timeout = tm.getTimeout();
00932
if (timeout > 0)
00933 {
00934
long start = System.currentTimeMillis();
00935 task.wait(timeout);
00936
long end = System.currentTimeMillis();
00937
long remaining = timeout - (end - start);
00938
if (remaining <= 0)
00939 {
00940 String msg =
Translate.get(
"loadbalancer.commit.timeout",
00941
new String[]{String.valueOf(tm.getTransactionId()),
00942 String.valueOf(task.
getSuccess()),
00943 String.valueOf(task.
getFailed())});
00944 logger.warn(msg);
00945
throw new SQLException(msg);
00946 }
00947 }
00948
else
00949 task.wait();
00950 }
00951
catch (InterruptedException e)
00952 {
00953
throw new SQLException(
Translate.get(
"loadbalancer.commit.timeout",
00954
new String[]{String.valueOf(tm.getTransactionId()),
00955 String.valueOf(task.
getSuccess()),
00956 String.valueOf(task.
getFailed())}));
00957 }
00958
00959
if (task.getSuccess() > 0)
00960
return;
00961
else
00962 {
00963 ArrayList exceptions = task.getExceptions();
00964
if (exceptions == null)
00965
throw new SQLException(
Translate.get(
00966
"loadbalancer.commit.all.failed", tm.getTransactionId()));
00967
else
00968 {
00969 String errorMsg =
Translate.get(
"loadbalancer.commit.failed.stack",
00970 tm.getTransactionId())
00971 +
"\n";
00972
for (
int i = 0; i < exceptions.size(); i++)
00973 errorMsg += ((SQLException) exceptions.get(i)).getMessage() +
"\n";
00974 logger.error(errorMsg);
00975
throw new SQLException(errorMsg);
00976 }
00977 }
00978 }
00979 }
00980
00987 public void rollback(
TransactionMarkerMetaData tm)
throws SQLException
00988 {
00989
try
00990 {
00991 backendThreadsRWLock.acquireRead();
00992 }
00993
catch (InterruptedException e)
00994 {
00995 String msg =
Translate.get(
00996
"loadbalancer.backendlist.acquire.readlock.failed", e);
00997 logger.error(msg);
00998
throw new SQLException(msg);
00999 }
01000
int nbOfThreads = backendThreads.size();
01001 ArrayList rollbackList =
new ArrayList();
01002 Long lTid =
new Long(tm.getTransactionId());
01003
01004
01005
for (
int i = 0; i < nbOfThreads; i++)
01006 {
01007
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads.get(i);
01008
if (thread.
getBackend().
isStartedTransaction(lTid))
01009 rollbackList.add(thread);
01010 }
01011
01012 nbOfThreads = rollbackList.size();
01013
01014
01015
RollbackTask task =
new RollbackTask(nbOfThreads,
01016
01017 nbOfThreads, tm.getTimeout(), tm.getLogin(), tm.getTransactionId());
01018
01019
synchronized (task)
01020 {
01021
01022
for (
int i = 0; i < nbOfThreads; i++)
01023 {
01024
BackendWorkerThread thread = (
BackendWorkerThread) rollbackList.get(i);
01025
synchronized (thread)
01026 {
01027 thread.
addTask(task);
01028 thread.notify();
01029 }
01030 }
01031
01032 backendThreadsRWLock.releaseRead();
01033
01034
01035
try
01036 {
01037
01038
long timeout = tm.getTimeout();
01039
if (timeout > 0)
01040 {
01041
long start = System.currentTimeMillis();
01042 task.wait(timeout);
01043
long end = System.currentTimeMillis();
01044
long remaining = timeout - (end - start);
01045
if (remaining <= 0)
01046 {
01047 String msg =
Translate.get(
"loadbalancer.rollback.timeout",
01048
new String[]{String.valueOf(tm.getTransactionId()),
01049 String.valueOf(task.
getSuccess()),
01050 String.valueOf(task.
getFailed())});
01051 logger.warn(msg);
01052
throw new SQLException(msg);
01053 }
01054 }
01055
else
01056 task.wait();
01057 }
01058
catch (InterruptedException e)
01059 {
01060
throw new SQLException(
Translate.get(
"loadbalancer.rollback.timeout",
01061
new String[]{String.valueOf(tm.getTransactionId()),
01062 String.valueOf(task.
getSuccess()),
01063 String.valueOf(task.
getFailed())}));
01064 }
01065
01066
if (task.getSuccess() > 0)
01067
return;
01068
else
01069 {
01070 ArrayList exceptions = task.getExceptions();
01071
if (exceptions == null)
01072
throw new SQLException(
Translate.get(
01073
"loadbalancer.rollback.all.failed", tm.getTransactionId()));
01074
else
01075 {
01076 String errorMsg =
Translate.get(
"loadbalancer.rollback.failed.stack",
01077 tm.getTransactionId())
01078 +
"\n";
01079
for (
int i = 0; i < exceptions.size(); i++)
01080 errorMsg += ((SQLException) exceptions.get(i)).getMessage() +
"\n";
01081 logger.error(errorMsg);
01082
throw new SQLException(errorMsg);
01083 }
01084 }
01085 }
01086 }
01087
01088
01089
01090
01091
01104 public void enableBackend(
DatabaseBackend db,
boolean writeEnabled)
01105
throws SQLException
01106 {
01107
01108
BackendWorkerThread thread =
new BackendWorkerThread(db,
this);
01109
try
01110 {
01111 backendThreadsRWLock.acquireWrite();
01112 }
01113
catch (InterruptedException e)
01114 {
01115 String msg =
Translate.get(
01116
"loadbalancer.backendlist.acquire.writelock.failed", e);
01117 logger.error(msg);
01118
throw new SQLException(msg);
01119 }
01120 backendThreads.add(thread);
01121 backendThreadsRWLock.releaseWrite();
01122 thread.start();
01123 logger.info(
Translate.get(
"loadbalancer.backend.workerthread.add", db
01124 .getName()));
01125
01126
if (!db.isInitialized())
01127 db.initializeConnections();
01128 db.enableRead();
01129
if (writeEnabled)
01130 db.enableWrite();
01131 }
01132
01144 public synchronized void disableBackend(
DatabaseBackend db)
01145
throws SQLException
01146 {
01147
int nbOfThreads = backendThreads.size();
01148
01149
01150
for (
int i = 0; i < nbOfThreads; i++)
01151 {
01152
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads.get(i);
01153
if (thread.
getBackend().
equals(db))
01154 {
01155 logger.info(
Translate.get(
"loadbalancer.backend.workerthread.remove",
01156 db.getName()));
01157
01158
01159
try
01160 {
01161 backendThreadsRWLock.acquireWrite();
01162 }
01163
catch (InterruptedException e)
01164 {
01165 String msg =
Translate.get(
01166
"loadbalancer.backendlist.acquire.writelock.failed", e);
01167 logger.error(msg);
01168
throw new SQLException(msg);
01169 }
01170 backendThreads.remove(thread);
01171 backendThreadsRWLock.releaseWrite();
01172
01173
synchronized (thread)
01174 {
01175
01176 thread.
addPriorityTask(
new KillThreadTask(1, 1));
01177 thread.notify();
01178 }
01179
break;
01180 }
01181 }
01182
01183 db.disable();
01184
if (db.isInitialized())
01185 db.finalizeConnections();
01186 }
01187
01192 public void setWeight(String name,
int w)
throws SQLException
01193 {
01194
throw new SQLException(
"Weight is not supported with this load balancer");
01195 }
01196
01197
01198
01199
01200
01206 public String getInformation()
01207 {
01208
return "RAIDb-0 Request load balancer\n";
01209 }
01210
01214 public String getXmlImpl()
01215 {
01216 StringBuffer info =
new StringBuffer();
01217 info.append(
"<" +
DatabasesXmlTags.ELT_RAIDb_0 +
">");
01218 createTablePolicy.getXml();
01219 info.append(
"</" +
DatabasesXmlTags.ELT_RAIDb_0 +
">");
01220
return info.toString();
01221 }
01222
01223 }