00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb0;
00026
00027 import java.sql.Connection;
00028 import java.sql.SQLException;
00029 import java.util.ArrayList;
00030
00031 import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
00032 import org.objectweb.cjdbc.common.exceptions.NoMoreBackendException;
00033 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException;
00034 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
00035 import org.objectweb.cjdbc.common.i18n.Translate;
00036 import org.objectweb.cjdbc.common.log.Trace;
00037 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00038 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00039 import org.objectweb.cjdbc.common.sql.SelectRequest;
00040 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00041 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock;
00042 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00043 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00044 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
00045 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
00046 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00047 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00048 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableException;
00049 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy;
00050 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule;
00051 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask;
00052 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00053 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask;
00054 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00055 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00056 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00057 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070 public class RAIDb0 extends AbstractLoadBalancer
00071 {
00072
00073
00074
00075
00076
00077
00078 private ArrayList backendThreads;
00079 private ReadPrioritaryFIFOWriteLock backendThreadsRWLock = new ReadPrioritaryFIFOWriteLock();
00080 private CreateTablePolicy createTablePolicy;
00081
00082 protected static Trace logger = Trace
00083 .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb0");
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097 public RAIDb0(VirtualDatabase vdb, CreateTablePolicy createTablePolicy)
00098 throws Exception
00099 {
00100 super(vdb, RAIDbLevels.RAIDb0, ParsingGranularities.TABLE);
00101 backendThreads = new ArrayList();
00102 this.createTablePolicy = createTablePolicy;
00103 }
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119 public ControllerResultSet execReadRequest(SelectRequest request,
00120 MetadataCache metadataCache) throws SQLException
00121 {
00122 try
00123 {
00124 vdb.acquireReadLockBackendLists();
00125
00126
00127 }
00128 catch (InterruptedException e)
00129 {
00130 String msg = Translate.get(
00131 "loadbalancer.backendlist.acquire.readlock.failed", e);
00132 logger.error(msg);
00133 throw new SQLException(msg);
00134 }
00135
00136 try
00137 {
00138 ControllerResultSet rs = null;
00139 ArrayList fromTables = request.getFrom();
00140
00141 if (fromTables == null)
00142 throw new SQLException(Translate.get("loadbalancer.from.not.found",
00143 request.getSQLShortForm(vdb.getSQLShortFormLength())));
00144
00145
00146 ArrayList backends = vdb.getBackends();
00147 int size = backends.size();
00148 int enabledBackends = 0;
00149
00150 DatabaseBackend backend = null;
00151
00152 for (int i = 0; i < size; i++)
00153 {
00154 backend = (DatabaseBackend) backends.get(i);
00155 if (backend.isReadEnabled())
00156 enabledBackends++;
00157 if (backend.isReadEnabled() && backend.hasTables(fromTables))
00158 break;
00159 else
00160 backend = null;
00161 }
00162
00163 if (backend == null)
00164 {
00165 if (enabledBackends == 0)
00166 throw new NoMoreBackendException(Translate.get(
00167 "loadbalancer.execute.no.backend.enabled", request.getId()));
00168 else
00169 throw new SQLException(Translate.get(
00170 "loadbalancer.backend.no.required.tables", fromTables.toString()));
00171 }
00172
00173 if (logger.isDebugEnabled())
00174 {
00175 logger.debug("Backend " + backend.getName()
00176 + " has all tables which are:");
00177 for (int i = 0; i < fromTables.size(); i++)
00178 {
00179 logger.debug(fromTables.get(i));
00180 }
00181 }
00182
00183
00184 try
00185 {
00186 rs = executeRequestOnBackend(request, backend, metadataCache);
00187 }
00188 catch (SQLException se)
00189 {
00190 String msg = Translate.get("loadbalancer.request.failed", new String[]{
00191 String.valueOf(request.getId()), se.getMessage()});
00192 if (logger.isInfoEnabled())
00193 logger.info(msg);
00194 throw new SQLException(msg);
00195 }
00196
00197 return rs;
00198 }
00199 catch (RuntimeException e)
00200 {
00201 String msg = Translate
00202 .get("loadbalancer.request.failed", new String[]{
00203 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00204 e.getMessage()});
00205 logger.fatal(msg, e);
00206 throw new SQLException(msg);
00207 }
00208 finally
00209 {
00210 vdb.releaseReadLockBackendLists();
00211
00212
00213 }
00214 }
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224 public int execWriteRequest(AbstractWriteRequest request) throws SQLException
00225 {
00226
00227 handleMacros(request);
00228
00229 try
00230 {
00231 vdb.acquireReadLockBackendLists();
00232
00233
00234 }
00235 catch (InterruptedException e)
00236 {
00237 String msg = Translate.get(
00238 "loadbalancer.backendlist.acquire.readlock.failed", e);
00239 logger.error(msg);
00240 throw new SQLException(msg);
00241 }
00242
00243 try
00244 {
00245 String table = request.getTableName();
00246 AbstractConnectionManager cm = null;
00247
00248 if (table == null)
00249 throw new SQLException(Translate.get(
00250 "loadbalancer.request.target.table.not.found", request
00251 .getSQLShortForm(vdb.getSQLShortFormLength())));
00252
00253
00254 ArrayList backends = vdb.getBackends();
00255 int size = backends.size();
00256
00257 DatabaseBackend backend = null;
00258
00259 if (request.isCreate())
00260 {
00261 CreateTableRule rule = createTablePolicy.getTableRule(request
00262 .getTableName());
00263 if (rule == null)
00264 rule = createTablePolicy.getDefaultRule();
00265
00266
00267 ArrayList choosen;
00268 try
00269 {
00270 choosen = rule.getBackends(backends);
00271 }
00272 catch (CreateTableException e)
00273 {
00274 throw new SQLException(Translate.get(
00275 "loadbalancer.create.table.rule.failed", e.getMessage()));
00276 }
00277
00278
00279 if (choosen != null)
00280 backend = (DatabaseBackend) choosen.get(0);
00281 if (backend != null)
00282 cm = backend.getConnectionManager(request.getLogin());
00283 }
00284 else
00285 {
00286 for (int i = 0; i < size; i++)
00287 {
00288 backend = (DatabaseBackend) backends.get(i);
00289 if (backend.isWriteEnabled() && backend.hasTable(table))
00290 {
00291 cm = backend.getConnectionManager(request.getLogin());
00292 break;
00293 }
00294 }
00295 }
00296
00297
00298 if (cm == null)
00299 throw new SQLException(Translate.get(
00300 "loadbalancer.backend.no.required.table", table));
00301
00302
00303
00304 if (request.isAutoCommit())
00305 {
00306
00307
00308 if (backend.isDisabling())
00309 throw new SQLException(Translate.get(
00310 "loadbalancer.backend.is.disabling", new String[]{
00311 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00312 backend.getName()}));
00313
00314
00315 Connection c = null;
00316 try
00317 {
00318 c = cm.getConnection();
00319 }
00320 catch (UnreachableBackendException e1)
00321 {
00322 logger.error(Translate.get(
00323 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00324 disableBackend(backend);
00325 throw new SQLException(Translate.get(
00326 "loadbalancer.backend.unreacheable", backend.getName()));
00327 }
00328
00329
00330 if (c == null)
00331 throw new SQLException(Translate.get(
00332 "loadbalancer.backend.no.connection", backend.getName()));
00333
00334 int result;
00335 try
00336 {
00337 result = executeUpdateRequestOnBackend(request, backend, c);
00338 }
00339 catch (Exception e)
00340 {
00341 throw new SQLException(Translate.get("loadbalancer.request.failed",
00342 new String[]{
00343 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00344 e.getMessage()}));
00345 }
00346 finally
00347 {
00348 cm.releaseConnection(c);
00349 }
00350 if (logger.isDebugEnabled())
00351 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00352 String.valueOf(request.getId()), backend.getName()}));
00353 return result;
00354 }
00355 else
00356 {
00357 Connection c;
00358 long tid = request.getTransactionId();
00359 Long lTid = new Long(tid);
00360
00361 try
00362 {
00363 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00364 }
00365 catch (UnreachableBackendException e1)
00366 {
00367 logger.error(Translate.get(
00368 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00369 disableBackend(backend);
00370 throw new SQLException(Translate.get(
00371 "loadbalancer.backend.unreacheable", backend.getName()));
00372 }
00373 catch (NoTransactionStartWhenDisablingException e)
00374 {
00375 String msg = Translate.get("loadbalancer.backend.is.disabling",
00376 new String[]{
00377 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00378 backend.getName()});
00379 logger.error(msg);
00380 throw new SQLException(msg);
00381 }
00382
00383
00384 if (c == null)
00385 throw new SQLException(Translate.get(
00386 "loadbalancer.unable.retrieve.connection", new String[]{
00387 String.valueOf(tid), backend.getName()}));
00388
00389
00390 int result;
00391 try
00392 {
00393 result = executeUpdateRequestOnBackend(request, backend, c);
00394 }
00395 catch (Exception e)
00396 {
00397 throw new SQLException(Translate.get("loadbalancer.request.failed",
00398 new String[]{
00399 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00400 e.getMessage()}));
00401 }
00402 if (logger.isDebugEnabled())
00403 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00404 String.valueOf(request.getId()), backend.getName()}));
00405 return result;
00406 }
00407 }
00408 catch (RuntimeException e)
00409 {
00410 String msg = Translate
00411 .get("loadbalancer.request.failed", new String[]{
00412 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00413 e.getMessage()});
00414 logger.fatal(msg, e);
00415 throw new SQLException(msg);
00416 }
00417 finally
00418 {
00419 vdb.releaseReadLockBackendLists();
00420
00421
00422 }
00423 }
00424
00425
00426
00427
00428
00429 public ControllerResultSet execWriteRequestWithKeys(
00430 AbstractWriteRequest request, MetadataCache metadataCache)
00431 throws SQLException
00432 {
00433
00434 handleMacros(request);
00435
00436 try
00437 {
00438 vdb.acquireReadLockBackendLists();
00439
00440
00441 }
00442 catch (InterruptedException e)
00443 {
00444 String msg = Translate.get(
00445 "loadbalancer.backendlist.acquire.readlock.failed", e);
00446 logger.error(msg);
00447 throw new SQLException(msg);
00448 }
00449
00450 try
00451 {
00452 String table = request.getTableName();
00453 AbstractConnectionManager cm = null;
00454
00455 if (table == null)
00456 throw new SQLException(Translate.get(
00457 "loadbalancer.request.target.table.not.found", request
00458 .getSQLShortForm(vdb.getSQLShortFormLength())));
00459
00460
00461 ArrayList backends = vdb.getBackends();
00462 int size = backends.size();
00463
00464 DatabaseBackend backend = null;
00465
00466 if (request.isCreate())
00467 {
00468 CreateTableRule rule = createTablePolicy.getTableRule(request
00469 .getTableName());
00470 if (rule == null)
00471 rule = createTablePolicy.getDefaultRule();
00472
00473
00474 ArrayList choosen;
00475 try
00476 {
00477 choosen = rule.getBackends(backends);
00478 }
00479 catch (CreateTableException e)
00480 {
00481 throw new SQLException(Translate.get(
00482 "loadbalancer.create.table.rule.failed", e.getMessage()));
00483 }
00484
00485
00486 if (choosen != null)
00487 backend = (DatabaseBackend) choosen.get(0);
00488 if (backend != null)
00489 cm = backend.getConnectionManager(request.getLogin());
00490 }
00491 else
00492 {
00493 for (int i = 0; i < size; i++)
00494 {
00495 backend = (DatabaseBackend) backends.get(i);
00496 if (backend.isWriteEnabled() && backend.hasTable(table))
00497 {
00498 cm = backend.getConnectionManager(request.getLogin());
00499 break;
00500 }
00501 }
00502 }
00503
00504
00505 if (cm == null)
00506 throw new SQLException(Translate.get(
00507 "loadbalancer.backend.no.required.table", table));
00508
00509 if (!backend.getDriverCompliance().supportGetGeneratedKeys())
00510 throw new SQLException(Translate.get(
00511 "loadbalancer.backend.autogeneratedkeys.unsupported", backend
00512 .getName()));
00513
00514
00515
00516 if (request.isAutoCommit())
00517 {
00518
00519
00520 if (backend.isDisabling())
00521 throw new SQLException(Translate.get(
00522 "loadbalancer.backend.is.disabling", new String[]{
00523 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00524 backend.getName()}));
00525
00526
00527 Connection c = null;
00528 try
00529 {
00530 c = cm.getConnection();
00531 }
00532 catch (UnreachableBackendException e1)
00533 {
00534 logger.error(Translate.get(
00535 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00536 disableBackend(backend);
00537 throw new SQLException(Translate.get(
00538 "loadbalancer.backend.unreacheable", backend.getName()));
00539 }
00540
00541
00542 if (c == null)
00543 throw new SQLException(Translate.get(
00544 "loadbalancer.backend.no.connection", backend.getName()));
00545
00546
00547 ControllerResultSet result;
00548 try
00549 {
00550 result = executeUpdateRequestOnBackendWithKeys(request, backend, c,
00551 metadataCache);
00552 }
00553 catch (Exception e)
00554 {
00555 throw new SQLException(Translate.get("loadbalancer.request.failed",
00556 new String[]{
00557 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00558 e.getMessage()}));
00559 }
00560 finally
00561 {
00562 backend.removePendingRequest(request);
00563 cm.releaseConnection(c);
00564 }
00565 if (logger.isDebugEnabled())
00566 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00567 String.valueOf(request.getId()), backend.getName()}));
00568 return result;
00569 }
00570 else
00571 {
00572 Connection c;
00573 long tid = request.getTransactionId();
00574 Long lTid = new Long(tid);
00575
00576 try
00577 {
00578 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00579 }
00580 catch (UnreachableBackendException e1)
00581 {
00582 logger.error(Translate.get(
00583 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00584 disableBackend(backend);
00585 throw new SQLException(Translate.get(
00586 "loadbalancer.backend.unreacheable", backend.getName()));
00587 }
00588 catch (NoTransactionStartWhenDisablingException e)
00589 {
00590 String msg = Translate.get("loadbalancer.backend.is.disabling",
00591 new String[]{
00592 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00593 backend.getName()});
00594 logger.error(msg);
00595 throw new SQLException(msg);
00596 }
00597
00598
00599 if (c == null)
00600 throw new SQLException(Translate.get(
00601 "loadbalancer.unable.retrieve.connection", new String[]{
00602 String.valueOf(tid), backend.getName()}));
00603
00604
00605 ControllerResultSet result;
00606 try
00607 {
00608 result = executeUpdateRequestOnBackendWithKeys(request, backend, c,
00609 metadataCache);
00610 }
00611 catch (Exception e)
00612 {
00613 throw new SQLException(Translate.get("loadbalancer.request.failed",
00614 new String[]{
00615 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00616 e.getMessage()}));
00617 }
00618 if (logger.isDebugEnabled())
00619 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00620 String.valueOf(request.getId()), backend.getName()}));
00621 return result;
00622 }
00623 }
00624 catch (RuntimeException e)
00625 {
00626 String msg = Translate
00627 .get("loadbalancer.request.failed", new String[]{
00628 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00629 e.getMessage()});
00630 logger.fatal(msg, e);
00631 throw new SQLException(msg);
00632 }
00633 finally
00634 {
00635 vdb.releaseReadLockBackendLists();
00636
00637
00638 }
00639 }
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650 protected ControllerResultSet executeRequestOnBackend(SelectRequest request,
00651 DatabaseBackend backend, MetadataCache metadataCache) throws SQLException
00652 {
00653
00654 handleMacros(request);
00655
00656
00657 AbstractConnectionManager cm = backend.getConnectionManager(request
00658 .getLogin());
00659
00660
00661 if (cm == null)
00662 {
00663 String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00664 new String[]{request.getLogin(), backend.getName()});
00665 logger.error(msg);
00666 throw new SQLException(msg);
00667 }
00668
00669
00670 if (request.isAutoCommit())
00671 {
00672 ControllerResultSet rs = null;
00673 boolean badConnection;
00674 do
00675 {
00676 badConnection = false;
00677
00678 Connection c = null;
00679 try
00680 {
00681 c = cm.getConnection();
00682 }
00683 catch (UnreachableBackendException e1)
00684 {
00685 logger.error(Translate.get(
00686 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00687 disableBackend(backend);
00688 throw new SQLException(Translate.get(
00689 "loadbalancer.backend.unreacheable", backend.getName()));
00690 }
00691
00692
00693 if (c == null)
00694 throw new SQLException(Translate.get(
00695 "loadbalancer.backend.no.connection", backend.getName()));
00696
00697
00698 try
00699 {
00700 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00701 cm.releaseConnection(c);
00702 }
00703 catch (SQLException e)
00704 {
00705 cm.releaseConnection(c);
00706 throw new SQLException(Translate.get(
00707 "loadbalancer.request.failed.on.backend", new String[]{
00708 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00709 backend.getName(), e.getMessage()}));
00710 }
00711 catch (BadConnectionException e)
00712 {
00713 cm.deleteConnection(c);
00714 badConnection = true;
00715 }
00716 }
00717 while (badConnection);
00718 if (logger.isDebugEnabled())
00719 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00720 String.valueOf(request.getId()), backend.getName()}));
00721 return rs;
00722 }
00723 else
00724 {
00725 Connection c;
00726 long tid = request.getTransactionId();
00727 Long lTid = new Long(tid);
00728
00729 try
00730 {
00731 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00732 }
00733 catch (UnreachableBackendException e1)
00734 {
00735 logger.error(Translate.get(
00736 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00737 disableBackend(backend);
00738 throw new SQLException(Translate.get(
00739 "loadbalancer.backend.unreacheable", backend.getName()));
00740 }
00741 catch (NoTransactionStartWhenDisablingException e)
00742 {
00743 String msg = Translate.get("loadbalancer.backend.is.disabling",
00744 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00745 backend.getName()});
00746 logger.error(msg);
00747 throw new SQLException(msg);
00748 }
00749
00750
00751 if (c == null)
00752 throw new SQLException(Translate.get(
00753 "loadbalancer.unable.retrieve.connection", new String[]{
00754 String.valueOf(tid), backend.getName()}));
00755
00756
00757 ControllerResultSet rs = null;
00758 try
00759 {
00760 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00761 }
00762 catch (SQLException e)
00763 {
00764 throw new SQLException(Translate.get(
00765 "loadbalancer.request.failed.on.backend", new String[]{
00766 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00767 backend.getName(), e.getMessage()}));
00768 }
00769 catch (BadConnectionException e)
00770 {
00771 cm.deleteConnection(tid);
00772 throw new SQLException(Translate
00773 .get("loadbalancer.connection.failed", new String[]{
00774 String.valueOf(tid), backend.getName(), e.getMessage()}));
00775 }
00776 if (logger.isDebugEnabled())
00777 logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00778 new String[]{String.valueOf(tid), String.valueOf(request.getId()),
00779 backend.getName()}));
00780 return rs;
00781 }
00782 }
00783
00784
00785
00786
00787
00788 public ControllerResultSet execReadOnlyReadStoredProcedure(
00789 StoredProcedure proc, MetadataCache metadataCache) throws SQLException
00790 {
00791 throw new SQLException(
00792 "Stored procedure calls are not supported with RAIDb-0 load balancers.");
00793 }
00794
00795
00796
00797
00798
00799 public ControllerResultSet execReadStoredProcedure(StoredProcedure proc,
00800 MetadataCache metadataCache) throws SQLException
00801 {
00802 throw new SQLException(
00803 "Stored procedure calls are not supported with RAIDb-0 load balancers.");
00804 }
00805
00806
00807
00808
00809 public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException
00810 {
00811 throw new SQLException(
00812 "Stored procedure calls are not supported with RAIDb-0 load balancers.");
00813 }
00814
00815
00816
00817
00818
00819
00820
00821
00822
00823
00824
00825 public final void begin(TransactionMarkerMetaData tm) throws SQLException
00826 {
00827 }
00828
00829
00830
00831
00832
00833
00834
00835 public void commit(TransactionMarkerMetaData tm) throws SQLException
00836 {
00837 try
00838 {
00839 backendThreadsRWLock.acquireRead();
00840 }
00841 catch (InterruptedException e)
00842 {
00843 String msg = Translate.get(
00844 "loadbalancer.backendlist.acquire.readlock.failed", e);
00845 logger.error(msg);
00846 throw new SQLException(msg);
00847 }
00848
00849 int nbOfThreads = backendThreads.size();
00850 ArrayList commitList = new ArrayList();
00851 long tid = tm.getTransactionId();
00852 Long lTid = new Long(tid);
00853
00854
00855 for (int i = 0; i < nbOfThreads; i++)
00856 {
00857 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i);
00858 if (thread.getBackend().isStartedTransaction(lTid))
00859 commitList.add(thread);
00860 }
00861
00862 nbOfThreads = commitList.size();
00863 if (nbOfThreads == 0)
00864 {
00865
00866
00867 backendThreadsRWLock.releaseRead();
00868 return;
00869 }
00870
00871
00872 CommitTask task = new CommitTask(nbOfThreads,
00873 nbOfThreads, tm.getTimeout(), tm.getLogin(), tid);
00874
00875 synchronized (task)
00876 {
00877
00878 for (int i = 0; i < nbOfThreads; i++)
00879 {
00880 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i);
00881 synchronized (thread)
00882 {
00883 thread.addTask(task, tid);
00884 thread.notify();
00885 }
00886 }
00887
00888 backendThreadsRWLock.releaseRead();
00889
00890
00891 try
00892 {
00893
00894 long timeout = tm.getTimeout();
00895 if (timeout > 0)
00896 {
00897 long start = System.currentTimeMillis();
00898 task.wait(timeout);
00899 long end = System.currentTimeMillis();
00900 long remaining = timeout - (end - start);
00901 if (remaining <= 0)
00902 {
00903 if (task.setExpiredTimeout())
00904 {
00905 String msg = Translate.get("loadbalancer.commit.timeout",
00906 new String[]{String.valueOf(tid),
00907 String.valueOf(task.getSuccess()),
00908 String.valueOf(task.getFailed())});
00909 logger.warn(msg);
00910 throw new SQLException(msg);
00911 }
00912
00913 }
00914 }
00915 else
00916 task.wait();
00917 }
00918 catch (InterruptedException e)
00919 {
00920 if (task.setExpiredTimeout())
00921 {
00922 String msg = Translate.get("loadbalancer.commit.timeout",
00923 new String[]{String.valueOf(tid),
00924 String.valueOf(task.getSuccess()),
00925 String.valueOf(task.getFailed())});
00926 logger.warn(msg);
00927 throw new SQLException(msg);
00928 }
00929
00930 }
00931
00932 if (task.getSuccess() > 0)
00933 return;
00934 else
00935 {
00936 ArrayList exceptions = task.getExceptions();
00937 if (exceptions == null)
00938 throw new SQLException(Translate.get(
00939 "loadbalancer.commit.all.failed", tid));
00940 else
00941 {
00942 String errorMsg = Translate.get("loadbalancer.commit.failed.stack",
00943 tid)
00944 + "\n";
00945 for (int i = 0; i < exceptions.size(); i++)
00946 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
00947 logger.error(errorMsg);
00948 throw new SQLException(errorMsg);
00949 }
00950 }
00951 }
00952 }
00953
00954
00955
00956
00957
00958
00959
00960 public void rollback(TransactionMarkerMetaData tm) throws SQLException
00961 {
00962 try
00963 {
00964 backendThreadsRWLock.acquireRead();
00965 }
00966 catch (InterruptedException e)
00967 {
00968 String msg = Translate.get(
00969 "loadbalancer.backendlist.acquire.readlock.failed", e);
00970 logger.error(msg);
00971 throw new SQLException(msg);
00972 }
00973 int nbOfThreads = backendThreads.size();
00974 ArrayList rollbackList = new ArrayList();
00975 long tid = tm.getTransactionId();
00976 Long lTid = new Long(tid);
00977
00978
00979 for (int i = 0; i < nbOfThreads; i++)
00980 {
00981 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i);
00982 if (thread.getBackend().isStartedTransaction(lTid))
00983 rollbackList.add(thread);
00984 }
00985
00986 nbOfThreads = rollbackList.size();
00987 if (nbOfThreads == 0)
00988 {
00989
00990
00991 backendThreadsRWLock.releaseRead();
00992 return;
00993 }
00994
00995
00996 RollbackTask task = new RollbackTask(nbOfThreads,
00997
00998 nbOfThreads, tm.getTimeout(), tm.getLogin(), tid);
00999
01000 synchronized (task)
01001 {
01002
01003 for (int i = 0; i < nbOfThreads; i++)
01004 {
01005 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i);
01006 synchronized (thread)
01007 {
01008 thread.addTask(task, tid);
01009 thread.notify();
01010 }
01011 }
01012
01013 backendThreadsRWLock.releaseRead();
01014
01015
01016 try
01017 {
01018
01019 long timeout = tm.getTimeout();
01020 if (timeout > 0)
01021 {
01022 long start = System.currentTimeMillis();
01023 task.wait(timeout);
01024 long end = System.currentTimeMillis();
01025 long remaining = timeout - (end - start);
01026 if (remaining <= 0)
01027 {
01028 if (task.setExpiredTimeout())
01029 {
01030 String msg = Translate.get("loadbalancer.rollback.timeout",
01031 new String[]{String.valueOf(tid),
01032 String.valueOf(task.getSuccess()),
01033 String.valueOf(task.getFailed())});
01034 logger.warn(msg);
01035 throw new SQLException(msg);
01036 }
01037
01038 }
01039 }
01040 else
01041 task.wait();
01042 }
01043 catch (InterruptedException e)
01044 {
01045 if (task.setExpiredTimeout())
01046 {
01047 String msg = Translate.get("loadbalancer.rollback.timeout",
01048 new String[]{String.valueOf(tid),
01049 String.valueOf(task.getSuccess()),
01050 String.valueOf(task.getFailed())});
01051 logger.warn(msg);
01052 throw new SQLException(msg);
01053 }
01054
01055 }
01056
01057 if (task.getSuccess() > 0)
01058 return;
01059 else
01060 {
01061 ArrayList exceptions = task.getExceptions();
01062 if (exceptions == null)
01063 throw new SQLException(Translate.get(
01064 "loadbalancer.rollback.all.failed", tid));
01065 else
01066 {
01067 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack",
01068 tid)
01069 + "\n";
01070 for (int i = 0; i < exceptions.size(); i++)
01071 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
01072 logger.error(errorMsg);
01073 throw new SQLException(errorMsg);
01074 }
01075 }
01076 }
01077 }
01078
01079
01080
01081
01082
01083
01084
01085
01086
01087
01088
01089
01090
01091
01092
01093
01094
01095 public void enableBackend(DatabaseBackend db, boolean writeEnabled)
01096 throws SQLException
01097 {
01098
01099 BackendWorkerThread thread = new BackendWorkerThread(db, this);
01100 try
01101 {
01102 backendThreadsRWLock.acquireWrite();
01103 }
01104 catch (InterruptedException e)
01105 {
01106 String msg = Translate.get(
01107 "loadbalancer.backendlist.acquire.writelock.failed", e);
01108 logger.error(msg);
01109 throw new SQLException(msg);
01110 }
01111 backendThreads.add(thread);
01112 backendThreadsRWLock.releaseWrite();
01113 thread.start();
01114 logger.info(Translate.get("loadbalancer.backend.workerthread.add", db
01115 .getName()));
01116
01117 if (!db.isInitialized())
01118 db.initializeConnections();
01119 db.enableRead();
01120 if (writeEnabled)
01121 db.enableWrite();
01122 }
01123
01124
01125
01126
01127
01128
01129
01130
01131
01132
01133
01134
01135 public synchronized void disableBackend(DatabaseBackend db)
01136 throws SQLException
01137 {
01138 int nbOfThreads = backendThreads.size();
01139
01140
01141 for (int i = 0; i < nbOfThreads; i++)
01142 {
01143 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i);
01144 if (thread.getBackend().equals(db))
01145 {
01146 logger.info(Translate.get("loadbalancer.backend.workerthread.remove",
01147 db.getName()));
01148
01149
01150 try
01151 {
01152 backendThreadsRWLock.acquireWrite();
01153 }
01154 catch (InterruptedException e)
01155 {
01156 String msg = Translate.get(
01157 "loadbalancer.backendlist.acquire.writelock.failed", e);
01158 logger.error(msg);
01159 throw new SQLException(msg);
01160 }
01161 backendThreads.remove(thread);
01162 backendThreadsRWLock.releaseWrite();
01163
01164 synchronized (thread)
01165 {
01166
01167 thread.addPriorityTask(new KillThreadTask(1, 1));
01168 thread.notify();
01169 }
01170 break;
01171 }
01172 }
01173
01174 db.disable();
01175 if (db.isInitialized())
01176 db.finalizeConnections();
01177 }
01178
01179
01180
01181
01182
01183 public void setWeight(String name, int w) throws SQLException
01184 {
01185 throw new SQLException("Weight is not supported with this load balancer");
01186 }
01187
01188
01189
01190
01191
01192
01193
01194
01195
01196
01197 public String getInformation()
01198 {
01199 return "RAIDb-0 Request load balancer\n";
01200 }
01201
01202
01203
01204
01205 public String getXmlImpl()
01206 {
01207 StringBuffer info = new StringBuffer();
01208 info.append("<" + DatabasesXmlTags.ELT_RAIDb_0 + ">");
01209 createTablePolicy.getXml();
01210 info.append("</" + DatabasesXmlTags.ELT_RAIDb_0 + ">");
01211 return info.toString();
01212 }
01213
01214 }