00001
00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb2;
00026
00027
import java.sql.Connection;
00028
import java.sql.SQLException;
00029
import java.util.ArrayList;
00030
00031
import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
00032
import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
00033
import org.objectweb.cjdbc.common.i18n.Translate;
00034
import org.objectweb.cjdbc.common.log.Trace;
00035
import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00036
import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00037
import org.objectweb.cjdbc.common.sql.SelectRequest;
00038
import org.objectweb.cjdbc.common.sql.StoredProcedure;
00039
import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock;
00040
import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00041
import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00042
import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
00043
import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
00044
import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00045
import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00046
import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00047
import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
00048
import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableException;
00049
import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy;
00050
import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule;
00051
import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
00052
import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask;
00053
import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00054
import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask;
00055
import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask;
00056
import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestTask;
00057
import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestWithKeysTask;
00058
import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask;
00059
import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00060
import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00061
import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00062
import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00063
00075 public abstract class RAIDb2 extends AbstractLoadBalancer
00076 {
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086 protected ArrayList
backendBlockingThreads;
00087 protected ArrayList
backendNonBlockingThreads;
00088 protected ReadPrioritaryFIFOWriteLock
backendBlockingThreadsRWLock =
new ReadPrioritaryFIFOWriteLock();
00089 protected ReadPrioritaryFIFOWriteLock
backendNonBlockingThreadsRWLock =
new ReadPrioritaryFIFOWriteLock();
00090
00091 protected WaitForCompletionPolicy
waitForCompletionPolicy;
00092 protected CreateTablePolicy
createTablePolicy;
00093
00094 protected static Trace
logger = Trace
00095 .getLogger(
"org.objectweb.cjdbc.controller.loadbalancer.raidb2");
00096
00097
00098
00099
00100
00112 public RAIDb2(
VirtualDatabase vdb,
00113 WaitForCompletionPolicy waitForCompletionPolicy,
00114 CreateTablePolicy createTablePolicy)
throws SQLException
00115 {
00116 super(vdb,
RAIDbLevels.RAIDb2,
ParsingGranularities.TABLE);
00117
00118
this.waitForCompletionPolicy =
waitForCompletionPolicy;
00119
backendBlockingThreads =
new ArrayList();
00120
backendNonBlockingThreads =
new ArrayList();
00121
this.createTablePolicy =
createTablePolicy;
00122 }
00123
00124
00125
00126
00127
00135 private int getNbToWait(
int nbOfThreads)
00136 {
00137
int nbToWait;
00138
switch (
waitForCompletionPolicy.getPolicy())
00139 {
00140
case WaitForCompletionPolicy.FIRST :
00141 nbToWait = 1;
00142
break;
00143
case WaitForCompletionPolicy.MAJORITY :
00144 nbToWait = nbOfThreads / 2 + 1;
00145
break;
00146
default :
00147
logger
00148 .warn(
Translate.get(
"loadbalancer.waitforcompletion.unsupported"));
00149
case WaitForCompletionPolicy.ALL :
00150 nbToWait = nbOfThreads;
00151
break;
00152 }
00153
return nbToWait;
00154 }
00155
00166 public int execWriteRequest(
AbstractWriteRequest request)
00167
throws AllBackendsFailedException, SQLException
00168 {
00169
return ((
WriteRequestTask)
execWriteRequest(request,
false, null))
00170 .getResult();
00171 }
00172
00183 public ControllerResultSet execWriteRequestWithKeys(
00184
AbstractWriteRequest request,
MetadataCache metadataCache)
00185
throws AllBackendsFailedException, SQLException
00186 {
00187
return ((
WriteRequestWithKeysTask)
execWriteRequest(request,
true,
00188 metadataCache)).getResult();
00189 }
00190
00204 private AbstractTask execWriteRequest(
AbstractWriteRequest request,
00205
boolean useKeys,
MetadataCache metadataCache)
00206
throws AllBackendsFailedException, SQLException
00207 {
00208 ArrayList backendThreads;
00209 ReadPrioritaryFIFOWriteLock lock;
00210
00211
00212
handleMacros(request);
00213
00214
00215
if (request.mightBlock())
00216 {
00217 backendThreads =
backendBlockingThreads;
00218 lock =
backendBlockingThreadsRWLock;
00219 }
00220
else
00221 {
00222 backendThreads =
backendNonBlockingThreads;
00223 lock =
backendNonBlockingThreadsRWLock;
00224
if ((
waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00225 && (request.getTransactionId() != 0))
00226
waitForAllWritesToComplete(request.getTransactionId());
00227 }
00228
00229
try
00230 {
00231 lock.acquireRead();
00232 }
00233
catch (InterruptedException e)
00234 {
00235 String msg =
Translate.get(
00236
"loadbalancer.backendlist.acquire.readlock.failed", e);
00237
logger.error(msg);
00238
throw new SQLException(msg);
00239 }
00240
00241
int nbOfThreads = backendThreads.size();
00242 ArrayList writeList =
new ArrayList();
00243 String tableName = request.getTableName();
00244
00245
if (request.isCreate())
00246 {
00247
CreateTableRule rule =
createTablePolicy.getTableRule(request
00248 .getTableName());
00249
if (rule == null)
00250 rule =
createTablePolicy.getDefaultRule();
00251
00252
00253 ArrayList chosen;
00254
try
00255 {
00256 chosen = rule.
getBackends(vdb.
getBackends());
00257 }
00258
catch (
CreateTableException e)
00259 {
00260
throw new SQLException(
Translate.get(
00261
"loadbalancer.create.table.rule.failed", e.getMessage()));
00262 }
00263
00264
00265
if (chosen != null)
00266 {
00267
for (
int i = 0; i < nbOfThreads; i++)
00268 {
00269
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads
00270 .get(i);
00271
if (chosen.contains(thread.
getBackend()))
00272 writeList.add(thread);
00273 }
00274 }
00275 }
00276
else
00277 {
00278
for (
int i = 0; i < nbOfThreads; i++)
00279 {
00280
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads
00281 .get(i);
00282
if (thread.
getBackend().
hasTable(tableName))
00283 writeList.add(thread);
00284 }
00285 }
00286
00287 nbOfThreads = writeList.size();
00288
if (nbOfThreads == 0)
00289 {
00290 String msg =
Translate.get(
"loadbalancer.execute.no.backend.found",
00291 request.getSQLShortForm(vdb.
getSQLShortFormLength()));
00292
logger.warn(msg);
00293
throw new SQLException(msg);
00294 }
00295
else
00296
logger.debug(
Translate.get(
"loadbalancer.execute.on.several",
00297
new String[]{String.valueOf(request.getId()),
00298 String.valueOf(nbOfThreads)}));
00299
00300
00301
AbstractTask task;
00302
if (useKeys)
00303 task =
new WriteRequestWithKeysTask(
getNbToWait(nbOfThreads),
00304 nbOfThreads, request, metadataCache);
00305
else
00306 task =
new WriteRequestTask(
getNbToWait(nbOfThreads), nbOfThreads,
00307 request);
00308
00309
synchronized (task)
00310 {
00311
if (waitForCompletionPolicy.
getPolicy() ==
WaitForCompletionPolicy.ALL)
00312 {
00313
for (
int i = 0; i < nbOfThreads; i++)
00314 {
00315
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads
00316 .get(i);
00317
synchronized (thread)
00318 {
00319 thread.
addTask(task);
00320 thread.notify();
00321 }
00322 }
00323 }
00324
else
00325 {
00326
00327
00328
00329
00330
00331
00332
if (request.mightBlock())
00333 {
00334
for (
int i = 0; i < nbOfThreads; i++)
00335 {
00336 BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00337 .get(i);
00338
synchronized (thread)
00339 {
00340 thread.addTask(task, request.getTransactionId());
00341 }
00342 }
00343 }
00344
else
00345 {
00346
for (
int i = 0; i < nbOfThreads; i++)
00347 {
00348 BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00349 .get(i);
00350
synchronized (thread)
00351 {
00352 thread.addTask(task);
00353 }
00354 }
00355 }
00356
00357
00358
for (
int i = 0; i < nbOfThreads; i++)
00359 {
00360 BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00361 .get(i);
00362
synchronized (thread)
00363 {
00364 thread.notify();
00365 }
00366 }
00367 }
00368
00369 lock.releaseRead();
00370
00371
00372
try
00373 {
00374
00375
long timeout = request.getTimeout() * 1000;
00376
if (timeout > 0)
00377 {
00378
long start = System.currentTimeMillis();
00379 task.wait(timeout);
00380
long end = System.currentTimeMillis();
00381
long remaining = timeout - (end - start);
00382
if (remaining <= 0)
00383 {
00384 String msg = Translate.get(
"loadbalancer.request.timeout",
00385
new String[]{String.valueOf(request.getId()),
00386 String.valueOf(task.getSuccess()),
00387 String.valueOf(task.getFailed())});
00388
00389
00390 lock.acquireRead();
00391 nbOfThreads = backendThreads.size();
00392
for (
int i = 0; i < nbOfThreads; i++)
00393 {
00394 BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00395 .get(i);
00396
synchronized (thread)
00397 {
00398 thread.removeTask(task);
00399 }
00400 }
00401 lock.releaseRead();
00402
00403
logger.warn(msg);
00404
throw new SQLException(msg);
00405 }
00406
00407 }
00408
else
00409 task.wait();
00410 }
00411
catch (InterruptedException e)
00412 {
00413
00414
try
00415 {
00416 lock.acquireRead();
00417 nbOfThreads = backendThreads.size();
00418 }
00419
catch (InterruptedException ignore)
00420 {
00421 nbOfThreads = 0;
00422 }
00423
for (
int i = 0; i < nbOfThreads; i++)
00424 {
00425 BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00426 .get(i);
00427
synchronized (thread)
00428 {
00429 thread.removeTask(task);
00430 }
00431 }
00432 lock.releaseRead();
00433
00434
throw new SQLException(Translate.get(
"loadbalancer.request.timeout",
00435
new String[]{String.valueOf(request.getId()),
00436 String.valueOf(task.getSuccess()),
00437 String.valueOf(task.getFailed())}));
00438 }
00439
00440
if (task.getSuccess() > 0)
00441
return task;
00442
else
00443 {
00444 ArrayList exceptions = task.getExceptions();
00445
if (exceptions == null)
00446
throw new AllBackendsFailedException(Translate.get(
00447
"loadbalancer.request.failed.all", request.getId()));
00448
else
00449 {
00450 String errorMsg = Translate.get(
"loadbalancer.request.failed.stack",
00451 request.getId())
00452 +
"\n";
00453
for (
int i = 0; i < exceptions.size(); i++)
00454 errorMsg += ((SQLException) exceptions.get(i)).getMessage() +
"\n";
00455 logger.error(errorMsg);
00456
throw new SQLException(errorMsg);
00457 }
00458 }
00459 }
00460 }
00461
00470
public abstract ControllerResultSet execReadRequest(SelectRequest request,
00471 MetadataCache metadataCache)
throws SQLException;
00472
00482 protected ControllerResultSet executeRequestOnBackend(
SelectRequest request,
00483
DatabaseBackend backend,
MetadataCache metadataCache)
00484
throws SQLException,
UnreachableBackendException
00485 {
00486
00487 handleMacros(request);
00488
00489
00490
AbstractConnectionManager cm = backend.getConnectionManager(request
00491 .getLogin());
00492
00493
00494
if (cm == null)
00495 {
00496 String msg =
Translate.get(
"loadbalancer.connectionmanager.not.found",
00497
new String[]{request.getLogin(), backend.getName()});
00498 logger.error(msg);
00499
throw new SQLException(msg);
00500 }
00501
00502
00503
if (request.isAutoCommit())
00504 {
00505
if (waitForCompletionPolicy.getPolicy() !=
WaitForCompletionPolicy.ALL)
00506
00507
00508
00509 waitForAllWritesToComplete(backend);
00510
00511
ControllerResultSet rs = null;
00512
boolean badConnection;
00513
do
00514 {
00515 badConnection =
false;
00516
00517 Connection c = null;
00518
try
00519 {
00520 c = cm.getConnection();
00521 }
00522
catch (UnreachableBackendException e1)
00523 {
00524 logger.error(
Translate.get(
00525
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00526 backend.disable();
00527
throw new UnreachableBackendException(
Translate.get(
00528
"loadbalancer.backend.unreacheable", backend.getName()));
00529 }
00530
00531
00532
if (c == null)
00533
throw new UnreachableBackendException(
00534
"No more connections on backend " + backend.getName());
00535
00536
00537
try
00538 {
00539 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00540 cm.releaseConnection(c);
00541 }
00542
catch (SQLException e)
00543 {
00544 cm.releaseConnection(c);
00545
throw new SQLException(Translate.get(
00546
"loadbalancer.request.failed.on.backend",
new String[]{
00547 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00548 backend.getName(), e.getMessage()}));
00549 }
00550
catch (BadConnectionException e)
00551 {
00552 cm.deleteConnection(c);
00553 badConnection =
true;
00554 }
00555 }
00556
while (badConnection);
00557
if (logger.isDebugEnabled())
00558 logger.debug(Translate.get(
"loadbalancer.execute.on",
new String[]{
00559 String.valueOf(request.getId()), backend.getName()}));
00560
return rs;
00561 }
00562
else
00563 {
00564 Connection c;
00565
long tid = request.getTransactionId();
00566 Long lTid =
new Long(tid);
00567
00568
00569
if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00570 waitForAllWritesToComplete(backend, request.getTransactionId());
00571
00572
if (!backend.isStartedTransaction(lTid))
00573 {
00574
try
00575 {
00576 c = cm.getConnection(tid);
00577 }
00578
catch (UnreachableBackendException e1)
00579 {
00580 logger.error(Translate.get(
00581
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00582 backend.disable();
00583
throw new UnreachableBackendException(Translate.get(
00584
"loadbalancer.backend.unreacheable", backend.getName()));
00585 }
00586
00587
00588
if (c == null)
00589
throw new SQLException(Translate.get(
00590
"loadbalancer.unable.get.connection",
new String[]{
00591 String.valueOf(tid), backend.getName()}));
00592
00593
00594 backend.startTransaction(lTid);
00595 c.setAutoCommit(
false);
00596 }
00597
else
00598 {
00599 c = cm.retrieveConnection(tid);
00600
00601
00602
if (c == null)
00603
throw new SQLException(Translate.get(
00604
"loadbalancer.unable.retrieve.connection",
new String[]{
00605 String.valueOf(tid), backend.getName()}));
00606 }
00607
00608
00609 ControllerResultSet rs = null;
00610
try
00611 {
00612 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00613 }
00614
catch (SQLException e)
00615 {
00616
throw new SQLException(Translate.get(
00617
"loadbalancer.request.failed.on.backend",
new String[]{
00618 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00619 backend.getName(), e.getMessage()}));
00620 }
00621
catch (BadConnectionException e)
00622 {
00623
00624 cm.deleteConnection(tid);
00625 String msg = Translate.get(
00626
"loadbalancer.backend.disabling.connection.failure", backend
00627 .getName());
00628 logger.error(msg);
00629 backend.disable();
00630
throw new SQLException(msg);
00631 }
00632
if (logger.isDebugEnabled())
00633 logger.debug(Translate.get(
"loadbalancer.execute.transaction.on",
00634
new String[]{String.valueOf(tid), String.valueOf(request.getId()),
00635 backend.getName()}));
00636
return rs;
00637 }
00638 }
00639
00649 protected ControllerResultSet executeStoredProcedureOnBackend(
00650
StoredProcedure proc,
DatabaseBackend backend,
MetadataCache metadataCache)
00651
throws SQLException,
UnreachableBackendException
00652 {
00653
00654 handleMacros(proc);
00655
00656
00657
AbstractConnectionManager cm = backend
00658 .getConnectionManager(proc.getLogin());
00659
00660
00661
if (cm == null)
00662 {
00663 String msg =
Translate.get(
"loadbalancer.connectionmanager.not.found",
00664
new String[]{proc.getLogin(), backend.getName()});
00665 logger.error(msg);
00666
throw new SQLException(msg);
00667 }
00668
00669
00670
if (proc.isAutoCommit())
00671 {
00672
if (waitForCompletionPolicy.getPolicy() !=
WaitForCompletionPolicy.ALL)
00673
00674
00675
00676 waitForAllWritesToComplete(backend);
00677
00678
00679 Connection c = null;
00680
try
00681 {
00682 c = cm.getConnection();
00683 }
00684
catch (UnreachableBackendException e1)
00685 {
00686 logger.error(
Translate.get(
00687
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00688 backend.disable();
00689
throw new UnreachableBackendException(
Translate.get(
00690
"loadbalancer.backend.unreacheable", backend.getName()));
00691 }
00692
00693
00694
if (c == null)
00695
throw new SQLException(Translate.get(
00696
"loadbalancer.backend.no.connection", backend.getName()));
00697
00698
00699 ControllerResultSet rs = null;
00700
try
00701 {
00702 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00703 backend, c, metadataCache);
00704 }
00705
catch (Exception e)
00706 {
00707
throw new SQLException(Translate.get(
00708
"loadbalancer.storedprocedure.failed.on.backend",
new String[]{
00709 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00710 backend.getName(), e.getMessage()}));
00711 }
00712 finally
00713 {
00714 cm.releaseConnection(c);
00715 }
00716
if (logger.isDebugEnabled())
00717 logger.debug(Translate.get(
"loadbalancer.storedprocedure.on",
00718
new String[]{String.valueOf(proc.getId()), backend.getName()}));
00719
return rs;
00720 }
00721
else
00722 {
00723 Connection c;
00724
long tid = proc.getTransactionId();
00725 Long lTid =
new Long(tid);
00726
00727
00728
if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00729 waitForAllWritesToComplete(backend, proc.getTransactionId());
00730
00731
if (!backend.isStartedTransaction(lTid))
00732 {
00733
try
00734 {
00735 c = cm.getConnection(tid);
00736 }
00737
catch (UnreachableBackendException e1)
00738 {
00739 logger.error(Translate.get(
00740
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00741 backend.disable();
00742
throw new SQLException(Translate.get(
00743
"loadbalancer.backend.unreacheable", backend.getName()));
00744 }
00745
00746
00747
if (c == null)
00748
throw new SQLException(Translate.get(
00749
"loadbalancer.unable.get.connection",
new String[]{
00750 String.valueOf(tid), backend.getName()}));
00751
00752
00753 backend.startTransaction(lTid);
00754 c.setAutoCommit(
false);
00755 }
00756
else
00757 {
00758 c = cm.retrieveConnection(tid);
00759
00760
00761
if (c == null)
00762
throw new SQLException(Translate.get(
00763
"loadbalancer.unable.retrieve.connection",
new String[]{
00764 String.valueOf(tid), backend.getName()}));
00765 }
00766
00767
00768 ControllerResultSet rs;
00769
try
00770 {
00771 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00772 backend, c, metadataCache);
00773 }
00774
catch (Exception e)
00775 {
00776
throw new SQLException(Translate.get(
00777
"loadbalancer.storedprocedure.failed.on.backend",
new String[]{
00778 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00779 backend.getName(), e.getMessage()}));
00780 }
00781
if (logger.isDebugEnabled())
00782 logger.debug(Translate.get(
"loadbalancer.execute.transaction.on",
00783
new String[]{String.valueOf(tid), String.valueOf(proc.getId()),
00784 backend.getName()}));
00785
return rs;
00786 }
00787 }
00788
00793 public ControllerResultSet execReadStoredProcedure(
StoredProcedure proc,
00794
MetadataCache metadataCache)
throws SQLException
00795 {
00796
ReadStoredProcedureTask task = (
ReadStoredProcedureTask) callStoredProcedure(
00797 proc,
true, metadataCache);
00798
return task.
getResult();
00799 }
00800
00804 public int execWriteStoredProcedure(
StoredProcedure proc)
throws SQLException
00805 {
00806
WriteStoredProcedureTask task = (
WriteStoredProcedureTask) callStoredProcedure(
00807 proc,
false, null);
00808
return task.
getResult();
00809 }
00810
00821 private AbstractTask callStoredProcedure(
StoredProcedure proc,
00822
boolean isRead,
MetadataCache metadataCache)
throws SQLException
00823 {
00824 ArrayList backendThreads = backendBlockingThreads;
00825 ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock;
00826
00827
try
00828 {
00829 lock.acquireRead();
00830 }
00831
catch (InterruptedException e)
00832 {
00833 String msg =
Translate.get(
00834
"loadbalancer.backendlist.acquire.readlock.failed", e);
00835 logger.error(msg);
00836
throw new SQLException(msg);
00837 }
00838
00839
int nbOfThreads = backendThreads.size();
00840
00841
00842
AbstractTask task;
00843
if (isRead)
00844 task =
new ReadStoredProcedureTask(getNbToWait(nbOfThreads), nbOfThreads,
00845 proc, metadataCache);
00846
else
00847 task =
new WriteStoredProcedureTask(getNbToWait(nbOfThreads),
00848 nbOfThreads, proc);
00849
00850
synchronized (task)
00851 {
00852
int nbOfBackends = 0;
00853
00854
00855
for (
int i = 0; i < nbOfThreads; i++)
00856 {
00857
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads
00858 .get(i);
00859
if (thread.
getBackend().
hasStoredProcedure(proc.getProcedureName()))
00860 {
00861 nbOfBackends++;
00862
synchronized (thread)
00863 {
00864
if ((waitForCompletionPolicy.getPolicy() == WaitForCompletionPolicy.ALL))
00865 thread.
addTask(task);
00866
else
00867 thread.
addTask(task, proc.getTransactionId());
00868 thread.notify();
00869 }
00870 }
00871
if (nbOfBackends == 0)
00872
throw new SQLException(
Translate.get(
00873
"loadbalancer.backend.no.required.storedprocedure", proc
00874 .getProcedureName()));
00875
else
00876 task.
setTotalNb(nbOfBackends);
00877 }
00878
00879 lock.releaseRead();
00880
00881
00882
try
00883 {
00884
00885
long timeout = proc.getTimeout() * 1000;
00886
if (timeout > 0)
00887 {
00888
long start = System.currentTimeMillis();
00889 task.wait(timeout);
00890
long end = System.currentTimeMillis();
00891
long remaining = timeout - (end - start);
00892
if (remaining <= 0)
00893 {
00894 String msg =
Translate.get(
"loadbalancer.storedprocedure.timeout",
00895
new String[]{String.valueOf(proc.getId()),
00896 String.valueOf(task.
getSuccess()),
00897 String.valueOf(task.
getFailed())});
00898
00899
00900 lock.acquireRead();
00901 nbOfThreads = backendThreads.size();
00902
for (
int i = 0; i < nbOfThreads; i++)
00903 {
00904
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads
00905 .get(i);
00906
synchronized (thread)
00907 {
00908 thread.
removeTask(task);
00909 }
00910 }
00911 lock.releaseRead();
00912
00913 logger.warn(msg);
00914
throw new SQLException(msg);
00915 }
00916
00917 }
00918
else
00919 task.wait();
00920 }
00921
catch (InterruptedException e)
00922 {
00923
00924
try
00925 {
00926 lock.acquireRead();
00927 nbOfThreads = backendThreads.size();
00928 }
00929
catch (InterruptedException ignore)
00930 {
00931 nbOfThreads = 0;
00932 }
00933
for (
int i = 0; i < nbOfThreads; i++)
00934 {
00935
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads
00936 .get(i);
00937
synchronized (thread)
00938 {
00939 thread.
removeTask(task);
00940 }
00941 }
00942 lock.releaseRead();
00943
00944
throw new SQLException(
Translate.get(
00945
"loadbalancer.storedprocedure.timeout",
new String[]{
00946 String.valueOf(proc.getId()),
00947 String.valueOf(task.
getSuccess()),
00948 String.valueOf(task.
getFailed())}));
00949 }
00950
00951
if (task.getSuccess() > 0)
00952
return task;
00953
else
00954 {
00955 ArrayList exceptions = task.getExceptions();
00956
if (exceptions == null)
00957
throw new SQLException(
Translate.get(
00958
"loadbalancer.storedprocedure.all.failed", proc.getId()));
00959
else
00960 {
00961 String errorMsg =
Translate.get(
00962
"loadbalancer.storedprocedure.failed.stack", proc.getId())
00963 +
"\n";
00964
for (
int i = 0; i < exceptions.size(); i++)
00965 errorMsg += ((SQLException) exceptions.get(i)).getMessage() +
"\n";
00966 logger.error(errorMsg);
00967
throw new SQLException(errorMsg);
00968 }
00969 }
00970 }
00971 }
00972
00973
00974
00975
00976
00983 public final void begin(
TransactionMarkerMetaData tm)
throws SQLException
00984 {
00985 }
00986
00993 public void commit(
TransactionMarkerMetaData tm)
throws SQLException
00994 {
00995
if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00996 waitForAllWritesToComplete(tm.getTransactionId());
00997
00998
try
00999 {
01000 backendNonBlockingThreadsRWLock.acquireRead();
01001 }
01002
catch (InterruptedException e)
01003 {
01004 String msg =
Translate.get(
01005
"loadbalancer.backendlist.acquire.readlock.failed", e);
01006 logger.error(msg);
01007
throw new SQLException(msg);
01008 }
01009
01010
int nbOfThreads = backendNonBlockingThreads.size();
01011 ArrayList commitList =
new ArrayList();
01012 Long iTid =
new Long(tm.getTransactionId());
01013
01014
01015
for (
int i = 0; i < nbOfThreads; i++)
01016 {
01017
BackendWorkerThread thread = (
BackendWorkerThread) backendNonBlockingThreads
01018 .get(i);
01019
if (thread.
getBackend().
isStartedTransaction(iTid))
01020 commitList.add(thread);
01021 }
01022
01023 nbOfThreads = commitList.size();
01024
if (nbOfThreads == 0)
01025 {
01026 backendNonBlockingThreadsRWLock.releaseRead();
01027
return;
01028 }
01029
01030
01031
CommitTask task =
new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm
01032 .getTimeout(), tm.getLogin(), tm.getTransactionId());
01033
01034
synchronized (task)
01035 {
01036
01037
for (
int i = 0; i < nbOfThreads; i++)
01038 {
01039
BackendWorkerThread thread = (
BackendWorkerThread) commitList.get(i);
01040
synchronized (thread)
01041 {
01042 thread.
addTask(task);
01043 thread.notify();
01044 }
01045 }
01046
01047 backendNonBlockingThreadsRWLock.releaseRead();
01048
01049
01050
try
01051 {
01052
01053
long timeout = tm.getTimeout();
01054
if (timeout > 0)
01055 {
01056
long start = System.currentTimeMillis();
01057 task.wait(timeout);
01058
long end = System.currentTimeMillis();
01059
long remaining = timeout - (end - start);
01060
if (remaining <= 0)
01061 {
01062 String msg =
Translate.get(
"loadbalancer.commit.timeout",
01063
new String[]{String.valueOf(tm.getTransactionId()),
01064 String.valueOf(task.
getSuccess()),
01065 String.valueOf(task.
getFailed())});
01066 logger.warn(msg);
01067
throw new SQLException(msg);
01068 }
01069 }
01070
else
01071 task.wait();
01072 }
01073
catch (InterruptedException e)
01074 {
01075
throw new SQLException(
Translate.get(
"loadbalancer.commit.timeout",
01076
new String[]{String.valueOf(tm.getTransactionId()),
01077 String.valueOf(task.
getSuccess()),
01078 String.valueOf(task.
getFailed())}));
01079 }
01080
01081
if (task.getSuccess() > 0)
01082
return;
01083
else
01084 {
01085 ArrayList exceptions = task.getExceptions();
01086
if (exceptions == null)
01087
throw new SQLException(
Translate.get(
01088
"loadbalancer.commit.all.failed", tm.getTransactionId()));
01089
else
01090 {
01091 String errorMsg =
Translate.get(
"loadbalancer.commit.failed.stack",
01092 tm.getTransactionId())
01093 +
"\n";
01094
for (
int i = 0; i < exceptions.size(); i++)
01095 errorMsg += ((SQLException) exceptions.get(i)).getMessage() +
"\n";
01096 logger.error(errorMsg);
01097
throw new SQLException(errorMsg);
01098 }
01099 }
01100 }
01101 }
01102
01109 public void rollback(
TransactionMarkerMetaData tm)
throws SQLException
01110 {
01111
if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
01112 waitForAllWritesToComplete(tm.getTransactionId());
01113
01114
try
01115 {
01116 backendNonBlockingThreadsRWLock.acquireRead();
01117 }
01118
catch (InterruptedException e)
01119 {
01120 String msg =
Translate.get(
01121
"loadbalancer.backendlist.acquire.readlock.failed", e);
01122 logger.error(msg);
01123
throw new SQLException(msg);
01124 }
01125
int nbOfThreads = backendNonBlockingThreads.size();
01126 ArrayList rollbackList =
new ArrayList();
01127 Long iTid =
new Long(tm.getTransactionId());
01128
01129
01130
for (
int i = 0; i < nbOfThreads; i++)
01131 {
01132
BackendWorkerThread thread = (
BackendWorkerThread) backendNonBlockingThreads
01133 .get(i);
01134
if (thread.
getBackend().
isStartedTransaction(iTid))
01135 rollbackList.add(thread);
01136 }
01137
01138 nbOfThreads = rollbackList.size();
01139
if (nbOfThreads == 0)
01140 {
01141 backendNonBlockingThreadsRWLock.releaseRead();
01142
return;
01143 }
01144
01145
01146
RollbackTask task =
new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads,
01147 tm.getTimeout(), tm.getLogin(), tm.getTransactionId());
01148
01149
synchronized (task)
01150 {
01151
01152
for (
int i = 0; i < nbOfThreads; i++)
01153 {
01154
BackendWorkerThread thread = (
BackendWorkerThread) rollbackList.get(i);
01155
synchronized (thread)
01156 {
01157 thread.
addTask(task);
01158 thread.notify();
01159 }
01160 }
01161
01162 backendNonBlockingThreadsRWLock.releaseRead();
01163
01164
01165
try
01166 {
01167
01168
long timeout = tm.getTimeout();
01169
if (timeout > 0)
01170 {
01171
long start = System.currentTimeMillis();
01172 task.wait(timeout);
01173
long end = System.currentTimeMillis();
01174
long remaining = timeout - (end - start);
01175
if (remaining <= 0)
01176 {
01177 String msg =
Translate.get(
"loadbalancer.rollback.timeout",
01178
new String[]{String.valueOf(tm.getTransactionId()),
01179 String.valueOf(task.
getSuccess()),
01180 String.valueOf(task.
getFailed())});
01181 logger.warn(msg);
01182
throw new SQLException(msg);
01183 }
01184 }
01185
else
01186 task.wait();
01187 }
01188
catch (InterruptedException e)
01189 {
01190
throw new SQLException(
Translate.get(
"loadbalancer.rollback.timeout",
01191
new String[]{String.valueOf(tm.getTransactionId()),
01192 String.valueOf(task.
getSuccess()),
01193 String.valueOf(task.
getFailed())}));
01194 }
01195
01196
if (task.getSuccess() > 0)
01197
return;
01198
else
01199 {
01200 ArrayList exceptions = task.getExceptions();
01201
if (exceptions == null)
01202
throw new SQLException(
Translate.get(
01203
"loadbalancer.rollback.all.failed", tm.getTransactionId()));
01204
else
01205 {
01206 String errorMsg =
Translate.get(
"loadbalancer.rollback.failed.stack",
01207 tm.getTransactionId())
01208 +
"\n";
01209
for (
int i = 0; i < exceptions.size(); i++)
01210 errorMsg += ((SQLException) exceptions.get(i)).getMessage() +
"\n";
01211 logger.error(errorMsg);
01212
throw new SQLException(errorMsg);
01213 }
01214 }
01215 }
01216 }
01217
01222 protected void waitForAllWritesToComplete(
long transactionId)
01223
throws SQLException
01224 {
01225
try
01226 {
01227 backendBlockingThreadsRWLock.acquireRead();
01228 }
01229
catch (InterruptedException e)
01230 {
01231 String msg =
Translate.get(
01232
"loadbalancer.backendlist.acquire.readlock.failed", e);
01233 logger.error(msg);
01234
throw new SQLException(msg);
01235 }
01236
01237
int nbOfThreads = backendBlockingThreads.size();
01238
01239
for (
int i = 0; i < nbOfThreads; i++)
01240 {
01241
BackendWorkerThread thread = (
BackendWorkerThread) backendBlockingThreads
01242 .get(i);
01243 thread.
waitForAllTasksToComplete(transactionId);
01244 }
01245
01246 backendBlockingThreadsRWLock.releaseRead();
01247 }
01248
01256 protected void waitForAllWritesToComplete(
DatabaseBackend backend,
01257
long transactionId)
throws SQLException
01258 {
01259
try
01260 {
01261 backendBlockingThreadsRWLock.acquireRead();
01262 }
01263
catch (InterruptedException e)
01264 {
01265 String msg =
Translate.get(
01266
"loadbalancer.backendlist.acquire.readlock.failed", e);
01267 logger.error(msg);
01268
throw new SQLException(msg);
01269 }
01270
01271
int nbOfThreads = backendBlockingThreads.size();
01272
01273
for (
int i = 0; i < nbOfThreads; i++)
01274 {
01275
BackendWorkerThread thread = (
BackendWorkerThread) backendBlockingThreads
01276 .get(i);
01277
if (thread.
getBackend() == backend)
01278 thread.
waitForAllTasksToComplete(transactionId);
01279 }
01280
01281 backendBlockingThreadsRWLock.releaseRead();
01282 }
01283
01290 protected void waitForAllWritesToComplete(
DatabaseBackend backend)
01291
throws SQLException
01292 {
01293
try
01294 {
01295 backendBlockingThreadsRWLock.acquireRead();
01296 }
01297
catch (InterruptedException e)
01298 {
01299 String msg =
Translate.get(
01300
"loadbalancer.backendlist.acquire.readlock.failed", e);
01301 logger.error(msg);
01302
throw new SQLException(msg);
01303 }
01304
01305
int nbOfThreads = backendBlockingThreads.size();
01306
01307
for (
int i = 0; i < nbOfThreads; i++)
01308 {
01309
BackendWorkerThread thread = (
BackendWorkerThread) backendBlockingThreads
01310 .get(i);
01311
if (thread.
getBackend() == backend)
01312 thread.
waitForAllTasksToComplete();
01313 }
01314
01315 backendBlockingThreadsRWLock.releaseRead();
01316 }
01317
01318
01319
01320
01321
01334 public void enableBackend(
DatabaseBackend db,
boolean writeEnabled)
01335
throws SQLException
01336 {
01337
if (writeEnabled)
01338 {
01339
01340
BackendWorkerThread blockingThread =
new BackendWorkerThread(db,
this);
01341 BackendWorkerThread nonBlockingThread =
new BackendWorkerThread(db,
this);
01342
01343
01344
try
01345 {
01346 backendBlockingThreadsRWLock.acquireWrite();
01347 }
01348
catch (InterruptedException e)
01349 {
01350 String msg =
Translate.get(
01351
"loadbalancer.backendlist.acquire.writelock.failed", e);
01352 logger.error(msg);
01353
throw new SQLException(msg);
01354 }
01355 backendBlockingThreads.add(blockingThread);
01356 backendBlockingThreadsRWLock.releaseWrite();
01357 blockingThread.start();
01358 logger.info(
Translate.get(
01359
"loadbalancer.backend.workerthread.blocking.add", db.getName()));
01360
01361
01362
try
01363 {
01364 backendNonBlockingThreadsRWLock.acquireWrite();
01365 }
01366
catch (InterruptedException e)
01367 {
01368 String msg =
Translate.get(
01369
"loadbalancer.backendlist.acquire.writelock.failed", e);
01370 logger.error(msg);
01371
throw new SQLException(msg);
01372 }
01373 backendNonBlockingThreads.add(nonBlockingThread);
01374 backendNonBlockingThreadsRWLock.releaseWrite();
01375 nonBlockingThread.start();
01376 logger.info(
Translate.get(
01377
"loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
01378 db.enableWrite();
01379 }
01380
01381
if (!db.isInitialized())
01382 db.initializeConnections();
01383 db.enableRead();
01384 }
01385
01397 public synchronized void disableBackend(
DatabaseBackend db)
01398
throws SQLException
01399 {
01400
if (db.isWriteEnabled())
01401 {
01402
01403
try
01404 {
01405 backendBlockingThreadsRWLock.acquireWrite();
01406 }
01407
catch (InterruptedException e)
01408 {
01409 String msg =
Translate.get(
01410
"loadbalancer.backendlist.acquire.writelock.failed", e);
01411 logger.error(msg);
01412
throw new SQLException(msg);
01413 }
01414
01415
int nbOfThreads = backendBlockingThreads.size();
01416
01417
01418
for (
int i = 0; i < nbOfThreads; i++)
01419 {
01420
BackendWorkerThread thread = (
BackendWorkerThread) backendBlockingThreads
01421 .get(i);
01422
if (thread.
getBackend().
equals(db))
01423 {
01424 logger.info(
Translate
01425 .get(
"loadbalancer.backend.workerthread.blocking.remove", db
01426 .getName()));
01427
01428 backendBlockingThreads.remove(thread);
01429
01430
synchronized (thread)
01431 {
01432
01433 thread.
addPriorityTask(
new KillThreadTask(1, 1));
01434 thread.notify();
01435 }
01436
break;
01437 }
01438 }
01439
01440 backendBlockingThreadsRWLock.releaseWrite();
01441
01442
01443
01444
try
01445 {
01446 backendNonBlockingThreadsRWLock.acquireWrite();
01447 }
01448
catch (InterruptedException e)
01449 {
01450 String msg =
Translate.get(
01451
"loadbalancer.backendlist.acquire.writelock.failed", e);
01452 logger.error(msg);
01453
throw new SQLException(msg);
01454 }
01455
01456
01457 nbOfThreads = backendNonBlockingThreads.size();
01458
for (
int i = 0; i < nbOfThreads; i++)
01459 {
01460
BackendWorkerThread thread = (
BackendWorkerThread) backendNonBlockingThreads
01461 .get(i);
01462
if (thread.
getBackend().
equals(db))
01463 {
01464 logger.info(
Translate.get(
01465
"loadbalancer.backend.workerthread.non.blocking.remove", db
01466 .getName()));
01467
01468 backendNonBlockingThreads.remove(thread);
01469
01470
synchronized (thread)
01471 {
01472
01473 thread.
addPriorityTask(
new KillThreadTask(1, 1));
01474 thread.notify();
01475 }
01476
break;
01477 }
01478 }
01479
01480 backendNonBlockingThreadsRWLock.releaseWrite();
01481 }
01482
01483 db.disable();
01484
if (db.isInitialized())
01485 db.finalizeConnections();
01486 }
01487
01491 public String getXmlImpl()
01492 {
01493 StringBuffer info =
new StringBuffer();
01494 info.append(
"<" +
DatabasesXmlTags.ELT_RAIDb_2 +
">");
01495
if (createTablePolicy != null)
01496 info.append(createTablePolicy.getXml());
01497
if (waitForCompletionPolicy != null)
01498 info.append(waitForCompletionPolicy.getXml());
01499
if (macroHandler != null)
01500 info.append(macroHandler.getXml());
01501
this.getRaidb2Xml();
01502 info.append(
"</" +
DatabasesXmlTags.ELT_RAIDb_2 +
">");
01503
return info.toString();
01504 }
01505
01511
public abstract String getRaidb2Xml();
01512
01513 }