00001
00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb1;
00026
00027
import java.sql.Connection;
00028
import java.sql.SQLException;
00029
import java.util.ArrayList;
00030
00031
import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
00032
import org.objectweb.cjdbc.common.exceptions.NoMoreBackendException;
00033
import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
00034
import org.objectweb.cjdbc.common.i18n.Translate;
00035
import org.objectweb.cjdbc.common.log.Trace;
00036
import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00037
import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00038
import org.objectweb.cjdbc.common.sql.SelectRequest;
00039
import org.objectweb.cjdbc.common.sql.StoredProcedure;
00040
import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock;
00041
import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00042
import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00043
import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
00044
import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
00045
import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00046
import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00047
import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00048
import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
00049
import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
00050
import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask;
00051
import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00052
import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask;
00053
import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask;
00054
import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestTask;
00055
import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestWithKeysTask;
00056
import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask;
00057
import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00058
import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00059
import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00060
import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00061
00072 public abstract class RAIDb1 extends AbstractLoadBalancer
00073 {
00074
00075
00076
00077
00078
00079 protected ArrayList
backendBlockingThreads;
00080 protected ArrayList
backendNonBlockingThreads;
00081 protected ReadPrioritaryFIFOWriteLock
backendBlockingThreadsRWLock =
new ReadPrioritaryFIFOWriteLock();
00082 protected ReadPrioritaryFIFOWriteLock
backendNonBlockingThreadsRWLock =
new ReadPrioritaryFIFOWriteLock();
00083
00084 protected WaitForCompletionPolicy
waitForCompletionPolicy;
00085
00086 protected static Trace
logger = Trace
00087 .getLogger(
"org.objectweb.cjdbc.controller.loadbalancer.RAIDb1");
00088
00089
00090
00091
00092
00102 public RAIDb1(
VirtualDatabase vdb,
00103 WaitForCompletionPolicy waitForCompletionPolicy)
throws SQLException
00104 {
00105 super(vdb,
RAIDbLevels.RAIDb1,
ParsingGranularities.NO_PARSING);
00106
00107
this.waitForCompletionPolicy =
waitForCompletionPolicy;
00108
backendBlockingThreads =
new ArrayList();
00109
backendNonBlockingThreads =
new ArrayList();
00110 }
00111
00112
00113
00114
00115
00123 private int getNbToWait(
int nbOfThreads)
00124 {
00125
int nbToWait;
00126
switch (
waitForCompletionPolicy.getPolicy())
00127 {
00128
case WaitForCompletionPolicy.FIRST :
00129 nbToWait = 1;
00130
break;
00131
case WaitForCompletionPolicy.MAJORITY :
00132 nbToWait = nbOfThreads / 2 + 1;
00133
break;
00134
default :
00135
logger
00136 .warn(
Translate.get(
"loadbalancer.waitforcompletion.unsupported"));
00137
case WaitForCompletionPolicy.ALL :
00138 nbToWait = nbOfThreads;
00139
break;
00140 }
00141
return nbToWait;
00142 }
00143
00147
public abstract ControllerResultSet execReadRequest(
SelectRequest request,
00148
MetadataCache metadataCache)
throws SQLException;
00149
00159 protected ControllerResultSet executeRequestOnBackend(
SelectRequest request,
00160
DatabaseBackend backend,
MetadataCache metadataCache)
00161
throws SQLException,
UnreachableBackendException
00162 {
00163
00164
handleMacros(request);
00165
00166
00167
AbstractConnectionManager cm = backend.getConnectionManager(request
00168 .getLogin());
00169
00170
00171
if (cm == null)
00172 {
00173 String msg =
Translate.get(
"loadbalancer.connectionmanager.not.found",
00174
new String[]{request.getLogin(), backend.getName()});
00175
logger.error(msg);
00176
throw new SQLException(msg);
00177 }
00178
00179
00180
if (request.isAutoCommit())
00181 {
00182
if (waitForCompletionPolicy.
getPolicy() !=
WaitForCompletionPolicy.ALL)
00183
00184
00185
00186
waitForAllWritesToComplete(backend);
00187
00188
ControllerResultSet rs = null;
00189
boolean badConnection;
00190
do
00191 {
00192 badConnection =
false;
00193
00194 Connection c = null;
00195
try
00196 {
00197 c = cm.getConnection();
00198 }
00199
catch (UnreachableBackendException e1)
00200 {
00201 logger.
error(
Translate.get(
00202
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00203 backend.disable();
00204
throw new UnreachableBackendException(
Translate.get(
00205
"loadbalancer.backend.unreacheable", backend.getName()));
00206 }
00207
00208
00209
if (c == null)
00210
throw new SQLException(Translate.get(
00211
"loadbalancer.backend.no.connection", backend.getName()));
00212
00213
00214
try
00215 {
00216 rs =
executeSelectRequestOnBackend(request, backend, c, metadataCache);
00217 cm.releaseConnection(c);
00218 }
00219
catch (SQLException e)
00220 {
00221 cm.releaseConnection(c);
00222
throw new SQLException(Translate.get(
00223
"loadbalancer.request.failed.on.backend",
new String[]{
00224 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00225 backend.getName(), e.getMessage()}));
00226 }
00227
catch (BadConnectionException e)
00228 {
00229 cm.deleteConnection(c);
00230 badConnection =
true;
00231 }
00232 }
00233
while (badConnection);
00234
if (
logger.isDebugEnabled())
00235
logger.debug(Translate.get(
"loadbalancer.execute.on",
new String[]{
00236 String.valueOf(request.getId()), backend.getName()}));
00237
return rs;
00238 }
00239
else
00240 {
00241 Connection c;
00242
long tid = request.getTransactionId();
00243 Long lTid =
new Long(tid);
00244
00245
00246
if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00247 waitForAllWritesToComplete(backend, request.getTransactionId());
00248
00249
if (!backend.isStartedTransaction(lTid))
00250 {
00251
try
00252 {
00253 c = cm.getConnection(tid);
00254 }
00255
catch (UnreachableBackendException e1)
00256 {
00257 logger.error(Translate.get(
00258
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00259 backend.disable();
00260
throw new SQLException(Translate.get(
00261
"loadbalancer.backend.unreacheable", backend.getName()));
00262 }
00263
00264
00265
if (c == null)
00266
throw new SQLException(Translate.get(
00267
"loadbalancer.unable.get.connection",
new String[]{
00268 String.valueOf(tid), backend.getName()}));
00269
00270
00271 backend.startTransaction(lTid);
00272 c.setAutoCommit(
false);
00273 }
00274
else
00275 {
00276 c = cm.retrieveConnection(tid);
00277
00278
00279
if (c == null)
00280
throw new SQLException(Translate.get(
00281
"loadbalancer.unable.retrieve.connection",
new String[]{
00282 String.valueOf(tid), backend.getName()}));
00283 }
00284
00285
00286 ControllerResultSet rs = null;
00287
try
00288 {
00289 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00290 }
00291
catch (SQLException e)
00292 {
00293
throw new SQLException(Translate.get(
00294
"loadbalancer.request.failed.on.backend",
new String[]{
00295 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00296 backend.getName(), e.getMessage()}));
00297 }
00298
catch (BadConnectionException e)
00299 {
00300
00301 cm.deleteConnection(tid);
00302 String msg = Translate.get(
00303
"loadbalancer.backend.disabling.connection.failure", backend
00304 .getName());
00305 logger.error(msg);
00306 backend.disable();
00307
throw new SQLException(msg);
00308 }
00309
if (logger.isDebugEnabled())
00310 logger.debug(Translate.get(
"loadbalancer.execute.transaction.on",
00311
new String[]{String.valueOf(tid), String.valueOf(request.getId()),
00312 backend.getName()}));
00313
return rs;
00314 }
00315 }
00316
00327 public int execWriteRequest(
AbstractWriteRequest request)
00328
throws AllBackendsFailedException,
NoMoreBackendException,SQLException
00329 {
00330
return ((
WriteRequestTask) execWriteRequest(request,
false, null))
00331 .getResult();
00332 }
00333
00344 public ControllerResultSet execWriteRequestWithKeys(
00345
AbstractWriteRequest request,
MetadataCache metadataCache)
00346
throws AllBackendsFailedException, SQLException
00347 {
00348
return ((
WriteRequestWithKeysTask) execWriteRequest(request,
true,
00349 metadataCache)).getResult();
00350 }
00351
00364 private AbstractTask execWriteRequest(
AbstractWriteRequest request,
00365
boolean useKeys,
MetadataCache metadataCache)
00366
throws AllBackendsFailedException, NoMoreBackendException,SQLException
00367 {
00368 ArrayList backendThreads;
00369 ReadPrioritaryFIFOWriteLock lock;
00370
00371
00372 handleMacros(request);
00373
00374
00375
if (request.mightBlock())
00376 {
00377 backendThreads = backendBlockingThreads;
00378 lock = backendBlockingThreadsRWLock;
00379 }
00380
else
00381 {
00382 backendThreads = backendNonBlockingThreads;
00383 lock = backendNonBlockingThreadsRWLock;
00384
if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00385 && (request.getTransactionId() != 0))
00386 waitForAllWritesToComplete(request.getTransactionId());
00387 }
00388
00389
try
00390 {
00391 lock.acquireRead();
00392 }
00393
catch (InterruptedException e)
00394 {
00395 String msg =
Translate.get(
00396
"loadbalancer.backendlist.acquire.readlock.failed", e);
00397 logger.error(msg);
00398
throw new SQLException(msg);
00399 }
00400
00401
int nbOfThreads = backendThreads.size();
00402
if(nbOfThreads==0)
00403
throw new NoMoreBackendException(
Translate.get(
"loadbalancer.backendlist.empty"));
00404
00405
00406
AbstractTask task;
00407
if (useKeys)
00408 task =
new WriteRequestWithKeysTask(getNbToWait(nbOfThreads),
00409 nbOfThreads, request, metadataCache);
00410
else
00411 task =
new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads,
00412 request);
00413
00414
synchronized (task)
00415 {
00416
if (waitForCompletionPolicy.getPolicy() == WaitForCompletionPolicy.ALL)
00417 {
00418
for (
int i = 0; i < nbOfThreads; i++)
00419 {
00420
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads
00421 .get(i);
00422
synchronized (thread)
00423 {
00424 thread.
addTask(task);
00425 thread.notify();
00426 }
00427 }
00428 }
00429
else
00430 {
00431
00432
00433
00434
00435
00436
00437
if (request.mightBlock())
00438 {
00439
for (
int i = 0; i < nbOfThreads; i++)
00440 {
00441
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads
00442 .get(i);
00443
synchronized (thread)
00444 {
00445 thread.
addTask(task, request.getTransactionId());
00446 }
00447 }
00448 }
00449
else
00450 {
00451
for (
int i = 0; i < nbOfThreads; i++)
00452 {
00453
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads
00454 .get(i);
00455
synchronized (thread)
00456 {
00457 thread.
addTask(task);
00458 }
00459 }
00460 }
00461
00462
00463
for (
int i = 0; i < nbOfThreads; i++)
00464 {
00465
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads
00466 .get(i);
00467
synchronized (thread)
00468 {
00469 thread.notify();
00470 }
00471 }
00472 }
00473
00474 lock.releaseRead();
00475
00476
00477
try
00478 {
00479
00480
long timeout = request.getTimeout() * 1000;
00481
if (timeout > 0)
00482 {
00483
long start = System.currentTimeMillis();
00484 task.wait(timeout);
00485
long end = System.currentTimeMillis();
00486
long remaining = timeout - (end - start);
00487
if (remaining <= 0)
00488 {
00489 String msg =
Translate.get(
"loadbalancer.request.timeout",
00490
new String[]{String.valueOf(request.getId()),
00491 String.valueOf(task.
getSuccess()),
00492 String.valueOf(task.
getFailed())});
00493
00494
00495 lock.acquireRead();
00496 nbOfThreads = backendThreads.size();
00497
for (
int i = 0; i < nbOfThreads; i++)
00498 {
00499
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads
00500 .get(i);
00501
synchronized (thread)
00502 {
00503 thread.
removeTask(task);
00504 }
00505 }
00506 lock.releaseRead();
00507
00508 logger.warn(msg);
00509
throw new SQLException(msg);
00510 }
00511
00512 }
00513
else
00514 task.wait();
00515 }
00516
catch (InterruptedException e)
00517 {
00518
00519
try
00520 {
00521 lock.acquireRead();
00522 nbOfThreads = backendThreads.size();
00523 }
00524
catch (InterruptedException ignore)
00525 {
00526 nbOfThreads = 0;
00527 }
00528
for (
int i = 0; i < nbOfThreads; i++)
00529 {
00530
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads
00531 .get(i);
00532
synchronized (thread)
00533 {
00534 thread.
removeTask(task);
00535 }
00536 }
00537 lock.releaseRead();
00538
00539
throw new SQLException(
Translate.get(
"loadbalancer.request.timeout",
00540
new String[]{String.valueOf(request.getId()),
00541 String.valueOf(task.
getSuccess()),
00542 String.valueOf(task.
getFailed())}));
00543 }
00544
00545
if (task.getSuccess() > 0)
00546
return task;
00547
else
00548 {
00549 ArrayList exceptions = task.getExceptions();
00550
if (exceptions == null)
00551
throw new AllBackendsFailedException(
Translate.get(
00552
"loadbalancer.request.failed.all", request.getId()));
00553
else
00554 {
00555 String errorMsg =
Translate.get(
"loadbalancer.request.failed.stack",
00556 request.getId())
00557 +
"\n";
00558
for (
int i = 0; i < exceptions.size(); i++)
00559 errorMsg += ((SQLException) exceptions.get(i)).getMessage() +
"\n";
00560 logger.error(errorMsg);
00561
throw new SQLException(errorMsg);
00562 }
00563 }
00564 }
00565 }
00566
00576 protected ControllerResultSet executeStoredProcedureOnBackend(
00577
StoredProcedure proc,
DatabaseBackend backend,
MetadataCache metadataCache)
00578
throws SQLException,
UnreachableBackendException
00579 {
00580
00581 handleMacros(proc);
00582
00583
00584
AbstractConnectionManager cm = backend
00585 .getConnectionManager(proc.getLogin());
00586
00587
00588
if (cm == null)
00589 {
00590 String msg =
Translate.get(
"loadbalancer.connectionmanager.not.found",
00591
new String[]{proc.getLogin(), backend.getName()});
00592 logger.error(msg);
00593
throw new SQLException(msg);
00594 }
00595
00596
00597
if (proc.isAutoCommit())
00598 {
00599
if (waitForCompletionPolicy.getPolicy() !=
WaitForCompletionPolicy.ALL)
00600
00601
00602
00603 waitForAllWritesToComplete(backend);
00604
00605
00606 Connection c = null;
00607
try
00608 {
00609 c = cm.getConnection();
00610 }
00611
catch (UnreachableBackendException e1)
00612 {
00613 logger.error(
Translate.get(
00614
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00615 backend.disable();
00616
throw new UnreachableBackendException(
Translate.get(
00617
"loadbalancer.backend.unreacheable", backend.getName()));
00618 }
00619
00620
00621
if (c == null)
00622
throw new UnreachableBackendException(Translate.get(
00623
"loadbalancer.backend.no.connection", backend.getName()));
00624
00625
00626 ControllerResultSet rs = null;
00627
try
00628 {
00629 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00630 backend, c, metadataCache);
00631 }
00632
catch (Exception e)
00633 {
00634
throw new SQLException(Translate.get(
00635
"loadbalancer.storedprocedure.failed.on.backend",
new String[]{
00636 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00637 backend.getName(), e.getMessage()}));
00638 }
00639 finally
00640 {
00641 cm.releaseConnection(c);
00642 }
00643
if (logger.isDebugEnabled())
00644 logger.debug(Translate.get(
"loadbalancer.storedprocedure.on",
00645
new String[]{String.valueOf(proc.getId()), backend.getName()}));
00646
return rs;
00647 }
00648
else
00649 {
00650 Connection c;
00651
long tid = proc.getTransactionId();
00652 Long lTid =
new Long(tid);
00653
00654
00655
if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00656 waitForAllWritesToComplete(backend, proc.getTransactionId());
00657
00658
if (!backend.isStartedTransaction(lTid))
00659 {
00660
try
00661 {
00662 c = cm.getConnection(tid);
00663 }
00664
catch (UnreachableBackendException e1)
00665 {
00666 logger.error(Translate.get(
00667
"loadbalancer.backend.disabling.unreachable", backend.getName()));
00668 backend.disable();
00669
throw new SQLException(Translate.get(
00670
"loadbalancer.backend.unreacheable", backend.getName()));
00671 }
00672
00673
00674
if (c == null)
00675
throw new SQLException(Translate.get(
00676
"loadbalancer.unable.get.connection",
new String[]{
00677 String.valueOf(tid), backend.getName()}));
00678
00679
00680 backend.startTransaction(lTid);
00681 c.setAutoCommit(
false);
00682 }
00683
else
00684 {
00685 c = cm.retrieveConnection(tid);
00686
00687
00688
if (c == null)
00689
throw new SQLException(Translate.get(
00690
"loadbalancer.unable.retrieve.connection",
new String[]{
00691 String.valueOf(tid), backend.getName()}));
00692 }
00693
00694
00695 ControllerResultSet rs;
00696
try
00697 {
00698 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00699 backend, c, metadataCache);
00700 }
00701
catch (Exception e)
00702 {
00703
throw new SQLException(Translate.get(
00704
"loadbalancer.storedprocedure.failed.on.backend",
new String[]{
00705 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00706 backend.getName(), e.getMessage()}));
00707 }
00708
if (logger.isDebugEnabled())
00709 logger.debug(Translate.get(
"loadbalancer.execute.transaction.on",
00710
new String[]{String.valueOf(tid), String.valueOf(proc.getId()),
00711 backend.getName()}));
00712
return rs;
00713 }
00714 }
00715
00720 public ControllerResultSet execReadStoredProcedure(
StoredProcedure proc,
00721
MetadataCache metadataCache)
throws SQLException
00722 {
00723
ReadStoredProcedureTask task = (
ReadStoredProcedureTask) callStoredProcedure(
00724 proc,
true, metadataCache);
00725
return task.
getResult();
00726 }
00727
00731 public int execWriteStoredProcedure(
StoredProcedure proc)
throws SQLException
00732 {
00733
WriteStoredProcedureTask task = (
WriteStoredProcedureTask) callStoredProcedure(
00734 proc,
false, null);
00735
return task.
getResult();
00736 }
00737
00748 private AbstractTask callStoredProcedure(
StoredProcedure proc,
00749
boolean isRead,
MetadataCache metadataCache)
throws SQLException
00750 {
00751 ArrayList backendThreads = backendBlockingThreads;
00752 ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock;
00753
00754
try
00755 {
00756 lock.acquireRead();
00757 }
00758
catch (InterruptedException e)
00759 {
00760 String msg =
Translate.get(
00761
"loadbalancer.backendlist.acquire.readlock.failed", e);
00762 logger.error(msg);
00763
throw new SQLException(msg);
00764 }
00765
00766
int nbOfThreads = backendThreads.size();
00767
00768
00769
AbstractTask task;
00770
if (isRead)
00771 task =
new ReadStoredProcedureTask(getNbToWait(nbOfThreads), nbOfThreads,
00772 proc, metadataCache);
00773
else
00774 task =
new WriteStoredProcedureTask(getNbToWait(nbOfThreads),
00775 nbOfThreads, proc);
00776
00777
synchronized (task)
00778 {
00779
00780
for (
int i = 0; i < nbOfThreads; i++)
00781 {
00782
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads
00783 .get(i);
00784
synchronized (thread)
00785 {
00786
if ((waitForCompletionPolicy.getPolicy() == WaitForCompletionPolicy.ALL))
00787 thread.
addTask(task);
00788
else
00789 thread.
addTask(task, proc.getTransactionId());
00790 thread.notify();
00791 }
00792 }
00793
00794 lock.releaseRead();
00795
00796
00797
try
00798 {
00799
00800
long timeout = proc.getTimeout() * 1000;
00801
if (timeout > 0)
00802 {
00803
long start = System.currentTimeMillis();
00804 task.wait(timeout);
00805
long end = System.currentTimeMillis();
00806
long remaining = timeout - (end - start);
00807
if (remaining <= 0)
00808 {
00809 String msg =
Translate.get(
"loadbalancer.storedprocedure.timeout",
00810
new String[]{String.valueOf(proc.getId()),
00811 String.valueOf(task.
getSuccess()),
00812 String.valueOf(task.
getFailed())});
00813
00814 lock.acquireRead();
00815 nbOfThreads = backendThreads.size();
00816
for (
int i = 0; i < nbOfThreads; i++)
00817 {
00818
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads
00819 .get(i);
00820
synchronized (thread)
00821 {
00822 thread.
removeTask(task);
00823 }
00824 }
00825 lock.releaseRead();
00826
00827 logger.warn(msg);
00828
throw new SQLException(msg);
00829 }
00830
00831 }
00832
else
00833 task.wait();
00834 }
00835
catch (InterruptedException e)
00836 {
00837
00838
try
00839 {
00840 lock.acquireRead();
00841 nbOfThreads = backendThreads.size();
00842 }
00843
catch (InterruptedException ignore)
00844 {
00845 nbOfThreads = 0;
00846 }
00847
for (
int i = 0; i < nbOfThreads; i++)
00848 {
00849
BackendWorkerThread thread = (
BackendWorkerThread) backendThreads
00850 .get(i);
00851
synchronized (thread)
00852 {
00853 thread.
removeTask(task);
00854 }
00855 }
00856 lock.releaseRead();
00857
00858
throw new SQLException(
Translate.get(
00859
"loadbalancer.storedprocedure.timeout",
new String[]{
00860 String.valueOf(proc.getId()),
00861 String.valueOf(task.
getSuccess()),
00862 String.valueOf(task.
getFailed())}));
00863 }
00864
00865
if (task.getSuccess() > 0)
00866
return task;
00867
else
00868 {
00869 ArrayList exceptions = task.getExceptions();
00870
if (exceptions == null)
00871
throw new SQLException(
Translate.get(
00872
"loadbalancer.storedprocedure.all.failed", proc.getId()));
00873
else
00874 {
00875 String errorMsg =
Translate.get(
00876
"loadbalancer.storedprocedure.failed.stack", proc.getId())
00877 +
"\n";
00878
for (
int i = 0; i < exceptions.size(); i++)
00879 errorMsg += ((SQLException) exceptions.get(i)).getMessage() +
"\n";
00880 logger.error(errorMsg);
00881
throw new SQLException(errorMsg);
00882 }
00883 }
00884 }
00885 }
00886
00887
00888
00889
00890
00897 public final void begin(
TransactionMarkerMetaData tm)
throws SQLException
00898 {
00899 }
00900
00907 public void commit(
TransactionMarkerMetaData tm)
throws SQLException
00908 {
00909
if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00910 waitForAllWritesToComplete(tm.getTransactionId());
00911
00912
try
00913 {
00914 backendNonBlockingThreadsRWLock.acquireRead();
00915 }
00916
catch (InterruptedException e)
00917 {
00918 String msg =
Translate.get(
00919
"loadbalancer.backendlist.acquire.readlock.failed", e);
00920 logger.error(msg);
00921
throw new SQLException(msg);
00922 }
00923
00924
int nbOfThreads = backendNonBlockingThreads.size();
00925 ArrayList commitList =
new ArrayList();
00926 Long lTid =
new Long(tm.getTransactionId());
00927
00928
00929
for (
int i = 0; i < nbOfThreads; i++)
00930 {
00931
BackendWorkerThread thread = (
BackendWorkerThread) backendNonBlockingThreads
00932 .get(i);
00933
if (thread.
getBackend().
isStartedTransaction(lTid))
00934 commitList.add(thread);
00935 }
00936
00937 nbOfThreads = commitList.size();
00938
if (nbOfThreads == 0)
00939 {
00940 backendNonBlockingThreadsRWLock.releaseRead();
00941
return;
00942 }
00943
00944
00945
CommitTask task =
new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm
00946 .getTimeout(), tm.getLogin(), tm.getTransactionId());
00947
00948
synchronized (task)
00949 {
00950
00951
for (
int i = 0; i < nbOfThreads; i++)
00952 {
00953
BackendWorkerThread thread = (
BackendWorkerThread) commitList.get(i);
00954
synchronized (thread)
00955 {
00956 thread.
addTask(task);
00957 thread.notify();
00958 }
00959 }
00960
00961 backendNonBlockingThreadsRWLock.releaseRead();
00962
00963
00964
try
00965 {
00966
00967
long timeout = tm.getTimeout();
00968
if (timeout > 0)
00969 {
00970
long start = System.currentTimeMillis();
00971 task.wait(timeout);
00972
long end = System.currentTimeMillis();
00973
long remaining = timeout - (end - start);
00974
if (remaining <= 0)
00975 {
00976 String msg =
Translate.get(
"loadbalancer.commit.timeout",
00977
new String[]{String.valueOf(tm.getTransactionId()),
00978 String.valueOf(task.
getSuccess()),
00979 String.valueOf(task.
getFailed())});
00980 logger.warn(msg);
00981
throw new SQLException(msg);
00982 }
00983 }
00984
else
00985 task.wait();
00986 }
00987
catch (InterruptedException e)
00988 {
00989
throw new SQLException(
Translate.get(
"loadbalancer.commit.timeout",
00990
new String[]{String.valueOf(tm.getTransactionId()),
00991 String.valueOf(task.
getSuccess()),
00992 String.valueOf(task.
getFailed())}));
00993 }
00994
00995
if (task.getSuccess() > 0)
00996
return;
00997
else
00998 {
00999 ArrayList exceptions = task.getExceptions();
01000
if (exceptions == null)
01001
throw new SQLException(
Translate.get(
01002
"loadbalancer.commit.all.failed", tm.getTransactionId()));
01003
else
01004 {
01005 String errorMsg =
Translate.get(
"loadbalancer.commit.failed.stack",
01006 tm.getTransactionId())
01007 +
"\n";
01008
for (
int i = 0; i < exceptions.size(); i++)
01009 errorMsg += ((SQLException) exceptions.get(i)).getMessage() +
"\n";
01010 logger.error(errorMsg);
01011
throw new SQLException(errorMsg);
01012 }
01013 }
01014 }
01015 }
01016
01023 public void rollback(
TransactionMarkerMetaData tm)
throws SQLException
01024 {
01025
if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
01026 waitForAllWritesToComplete(tm.getTransactionId());
01027
01028
try
01029 {
01030 backendNonBlockingThreadsRWLock.acquireRead();
01031 }
01032
catch (InterruptedException e)
01033 {
01034 String msg =
Translate.get(
01035
"loadbalancer.backendlist.acquire.readlock.failed", e);
01036 logger.error(msg);
01037
throw new SQLException(msg);
01038 }
01039
int nbOfThreads = backendNonBlockingThreads.size();
01040 ArrayList rollbackList =
new ArrayList();
01041 Long lTid =
new Long(tm.getTransactionId());
01042
01043
01044
for (
int i = 0; i < nbOfThreads; i++)
01045 {
01046
BackendWorkerThread thread = (
BackendWorkerThread) backendNonBlockingThreads
01047 .get(i);
01048
if (thread.
getBackend().
isStartedTransaction(lTid))
01049 rollbackList.add(thread);
01050 }
01051
01052 nbOfThreads = rollbackList.size();
01053
if (nbOfThreads == 0)
01054 {
01055 backendNonBlockingThreadsRWLock.releaseRead();
01056
return;
01057 }
01058
01059
01060
RollbackTask task =
new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads,
01061 tm.getTimeout(), tm.getLogin(), tm.getTransactionId());
01062
01063
synchronized (task)
01064 {
01065
01066
for (
int i = 0; i < nbOfThreads; i++)
01067 {
01068
BackendWorkerThread thread = (
BackendWorkerThread) rollbackList.get(i);
01069
synchronized (thread)
01070 {
01071 thread.
addTask(task);
01072 thread.notify();
01073 }
01074 }
01075
01076 backendNonBlockingThreadsRWLock.releaseRead();
01077
01078
01079
try
01080 {
01081
01082
long timeout = tm.getTimeout();
01083
if (timeout > 0)
01084 {
01085
long start = System.currentTimeMillis();
01086 task.wait(timeout);
01087
long end = System.currentTimeMillis();
01088
long remaining = timeout - (end - start);
01089
if (remaining <= 0)
01090 {
01091 String msg =
Translate.get(
"loadbalancer.rollback.timeout",
01092
new String[]{String.valueOf(tm.getTransactionId()),
01093 String.valueOf(task.
getSuccess()),
01094 String.valueOf(task.
getFailed())});
01095 logger.warn(msg);
01096
throw new SQLException(msg);
01097 }
01098 }
01099
else
01100 task.wait();
01101 }
01102
catch (InterruptedException e)
01103 {
01104
throw new SQLException(
Translate.get(
"loadbalancer.rollback.timeout",
01105
new String[]{String.valueOf(tm.getTransactionId()),
01106 String.valueOf(task.
getSuccess()),
01107 String.valueOf(task.
getFailed())}));
01108 }
01109
01110
if (task.getSuccess() > 0)
01111
return;
01112
else
01113 {
01114 ArrayList exceptions = task.getExceptions();
01115
if (exceptions == null)
01116
throw new SQLException(
Translate.get(
01117
"loadbalancer.rollback.all.failed", tm.getTransactionId()));
01118
else
01119 {
01120 String errorMsg =
Translate.get(
"loadbalancer.rollback.failed.stack",
01121 tm.getTransactionId())
01122 +
"\n";
01123
for (
int i = 0; i < exceptions.size(); i++)
01124 errorMsg += ((SQLException) exceptions.get(i)).getMessage() +
"\n";
01125 logger.error(errorMsg);
01126
throw new SQLException(errorMsg);
01127 }
01128 }
01129 }
01130 }
01131
01136 protected void waitForAllWritesToComplete(
long transactionId)
01137
throws SQLException
01138 {
01139
try
01140 {
01141 backendBlockingThreadsRWLock.acquireRead();
01142 }
01143
catch (InterruptedException e)
01144 {
01145 String msg =
Translate.get(
01146
"loadbalancer.backendlist.acquire.readlock.failed", e);
01147 logger.error(msg);
01148
throw new SQLException(msg);
01149 }
01150
01151
int nbOfThreads = backendBlockingThreads.size();
01152
01153
for (
int i = 0; i < nbOfThreads; i++)
01154 {
01155
BackendWorkerThread thread = (
BackendWorkerThread) backendBlockingThreads
01156 .get(i);
01157 thread.
waitForAllTasksToComplete(transactionId);
01158 }
01159
01160 backendBlockingThreadsRWLock.releaseRead();
01161 }
01162
01170 protected void waitForAllWritesToComplete(
DatabaseBackend backend,
01171
long transactionId)
throws SQLException
01172 {
01173
try
01174 {
01175 backendBlockingThreadsRWLock.acquireRead();
01176 }
01177
catch (InterruptedException e)
01178 {
01179 String msg =
Translate.get(
01180
"loadbalancer.backendlist.acquire.readlock.failed", e);
01181 logger.error(msg);
01182
throw new SQLException(msg);
01183 }
01184
01185
int nbOfThreads = backendBlockingThreads.size();
01186
01187
for (
int i = 0; i < nbOfThreads; i++)
01188 {
01189
BackendWorkerThread thread = (
BackendWorkerThread) backendBlockingThreads
01190 .get(i);
01191
if (thread.
getBackend() == backend)
01192 thread.
waitForAllTasksToComplete(transactionId);
01193 }
01194
01195 backendBlockingThreadsRWLock.releaseRead();
01196 }
01197
01204 protected void waitForAllWritesToComplete(
DatabaseBackend backend)
01205
throws SQLException
01206 {
01207
try
01208 {
01209 backendBlockingThreadsRWLock.acquireRead();
01210 }
01211
catch (InterruptedException e)
01212 {
01213 String msg =
Translate.get(
01214
"loadbalancer.backendlist.acquire.readlock.failed", e);
01215 logger.error(msg);
01216
throw new SQLException(msg);
01217 }
01218
01219
int nbOfThreads = backendBlockingThreads.size();
01220
01221
for (
int i = 0; i < nbOfThreads; i++)
01222 {
01223
BackendWorkerThread thread = (
BackendWorkerThread) backendBlockingThreads
01224 .get(i);
01225
if (thread.
getBackend() == backend)
01226 thread.
waitForAllTasksToComplete();
01227 }
01228
01229 backendBlockingThreadsRWLock.releaseRead();
01230 }
01231
01232
01233
01234
01235
01248 public void enableBackend(
DatabaseBackend db,
boolean writeEnabled)
01249
throws SQLException
01250 {
01251
if (writeEnabled && db.isWriteCanBeEnabled())
01252 {
01253
01254
BackendWorkerThread blockingThread =
new BackendWorkerThread(db,
this);
01255 BackendWorkerThread nonBlockingThread =
new BackendWorkerThread(db,
this);
01256
01257
01258
try
01259 {
01260 backendBlockingThreadsRWLock.acquireWrite();
01261 }
01262
catch (InterruptedException e)
01263 {
01264 String msg =
Translate.get(
01265
"loadbalancer.backendlist.acquire.writelock.failed", e);
01266 logger.error(msg);
01267
throw new SQLException(msg);
01268 }
01269 backendBlockingThreads.add(blockingThread);
01270 backendBlockingThreadsRWLock.releaseWrite();
01271 blockingThread.start();
01272 logger.info(
Translate.get(
01273
"loadbalancer.backend.workerthread.blocking.add", db.getName()));
01274
01275
01276
try
01277 {
01278 backendNonBlockingThreadsRWLock.acquireWrite();
01279 }
01280
catch (InterruptedException e)
01281 {
01282 String msg =
Translate.get(
01283
"loadbalancer.backendlist.acquire.writelock.failed", e);
01284 logger.error(msg);
01285
throw new SQLException(msg);
01286 }
01287 backendNonBlockingThreads.add(nonBlockingThread);
01288 backendNonBlockingThreadsRWLock.releaseWrite();
01289 nonBlockingThread.start();
01290 logger.info(
Translate.get(
01291
"loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
01292 db.enableWrite();
01293 }
01294
01295
if (!db.isInitialized())
01296 db.initializeConnections();
01297 db.enableRead();
01298 }
01299
01311 public synchronized void disableBackend(
DatabaseBackend db)
01312
throws SQLException
01313 {
01314
if (db.isWriteEnabled())
01315 {
01316
01317
try
01318 {
01319 backendBlockingThreadsRWLock.acquireWrite();
01320 }
01321
catch (InterruptedException e)
01322 {
01323 String msg =
Translate.get(
01324
"loadbalancer.backendlist.acquire.writelock.failed", e);
01325 logger.error(msg);
01326
throw new SQLException(msg);
01327 }
01328
01329
int nbOfThreads = backendBlockingThreads.size();
01330
01331
01332
for (
int i = 0; i < nbOfThreads; i++)
01333 {
01334
BackendWorkerThread thread = (
BackendWorkerThread) backendBlockingThreads
01335 .get(i);
01336
if (thread.
getBackend().
equals(db))
01337 {
01338 logger.info(
Translate
01339 .get(
"loadbalancer.backend.workerthread.blocking.remove", db
01340 .getName()));
01341
01342
01343 backendBlockingThreads.remove(thread);
01344
01345
synchronized (thread)
01346 {
01347
01348 thread.
addPriorityTask(
new KillThreadTask(1, 1));
01349 thread.notify();
01350 }
01351
break;
01352 }
01353 }
01354
01355 backendBlockingThreadsRWLock.releaseWrite();
01356
01357
01358
01359
try
01360 {
01361 backendNonBlockingThreadsRWLock.acquireWrite();
01362 }
01363
catch (InterruptedException e)
01364 {
01365 String msg =
Translate.get(
01366
"loadbalancer.backendlist.acquire.writelock.failed", e);
01367 logger.error(msg);
01368
throw new SQLException(msg);
01369 }
01370
01371
01372 nbOfThreads = backendNonBlockingThreads.size();
01373
for (
int i = 0; i < nbOfThreads; i++)
01374 {
01375
BackendWorkerThread thread = (
BackendWorkerThread) backendNonBlockingThreads
01376 .get(i);
01377
if (thread.
getBackend().
equals(db))
01378 {
01379 logger.info(
Translate.get(
01380
"loadbalancer.backend.workerthread.non.blocking.remove", db
01381 .getName()));
01382
01383
01384 backendNonBlockingThreads.remove(thread);
01385
01386
synchronized (thread)
01387 {
01388
01389 thread.
addPriorityTask(
new KillThreadTask(1, 1));
01390 thread.notify();
01391 }
01392
break;
01393 }
01394 }
01395
01396 backendNonBlockingThreadsRWLock.releaseWrite();
01397 }
01398
01399 db.disable();
01400
if (db.isInitialized())
01401 db.finalizeConnections();
01402 }
01403
01407 public String getXmlImpl()
01408 {
01409 StringBuffer info =
new StringBuffer();
01410 info.append(
"<" +
DatabasesXmlTags.ELT_RAIDb_1 +
">");
01411
if (waitForCompletionPolicy != null)
01412 info.append(waitForCompletionPolicy.getXml());
01413
if (macroHandler != null)
01414 info.append(macroHandler.getXml());
01415 info.append(getRaidb1Xml());
01416 info.append(
"</" +
DatabasesXmlTags.ELT_RAIDb_1 +
">");
01417
return info.toString();
01418 }
01419
01427
public abstract String getRaidb1Xml();
01428 }