00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb1;
00026
00027 import java.sql.Connection;
00028 import java.sql.SQLException;
00029 import java.util.ArrayList;
00030
00031 import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
00032 import org.objectweb.cjdbc.common.exceptions.NoMoreBackendException;
00033 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException;
00034 import org.objectweb.cjdbc.common.exceptions.SQLExceptionFactory;
00035 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
00036 import org.objectweb.cjdbc.common.i18n.Translate;
00037 import org.objectweb.cjdbc.common.log.Trace;
00038 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00039 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00040 import org.objectweb.cjdbc.common.sql.SelectRequest;
00041 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00042 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock;
00043 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00044 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00045 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
00046 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
00047 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00048 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00049 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00050 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
00051 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
00052 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask;
00053 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00054 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask;
00055 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask;
00056 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestTask;
00057 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestWithKeysTask;
00058 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask;
00059 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00060 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00061 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00062 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074 public abstract class RAIDb1 extends AbstractLoadBalancer
00075 {
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090 protected ArrayList backendBlockingThreads;
00091
00092
00093
00094
00095 protected ArrayList backendNonBlockingThreads;
00096
00097 protected ReadPrioritaryFIFOWriteLock backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock();
00098
00099 protected ReadPrioritaryFIFOWriteLock backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock();
00100
00101 protected WaitForCompletionPolicy waitForCompletionPolicy;
00102
00103 protected static Trace logger = Trace
00104 .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb1");
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119 public RAIDb1(VirtualDatabase vdb,
00120 WaitForCompletionPolicy waitForCompletionPolicy) throws Exception
00121 {
00122 super(vdb, RAIDbLevels.RAIDb1, ParsingGranularities.NO_PARSING);
00123 this.waitForCompletionPolicy = waitForCompletionPolicy;
00124 backendBlockingThreads = new ArrayList();
00125 backendNonBlockingThreads = new ArrayList();
00126 }
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139 private int getNbToWait(int nbOfThreads)
00140 {
00141 int nbToWait;
00142 switch (waitForCompletionPolicy.getPolicy())
00143 {
00144 case WaitForCompletionPolicy.FIRST :
00145 nbToWait = 1;
00146 break;
00147 case WaitForCompletionPolicy.MAJORITY :
00148 nbToWait = nbOfThreads / 2 + 1;
00149 break;
00150 default :
00151 logger
00152 .warn(Translate.get("loadbalancer.waitforcompletion.unsupported"));
00153 case WaitForCompletionPolicy.ALL :
00154 nbToWait = nbOfThreads;
00155 break;
00156 }
00157 return nbToWait;
00158 }
00159
00160
00161
00162
00163 public abstract ControllerResultSet execReadRequest(SelectRequest request,
00164 MetadataCache metadataCache) throws SQLException;
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175 protected ControllerResultSet executeRequestOnBackend(SelectRequest request,
00176 DatabaseBackend backend, MetadataCache metadataCache)
00177 throws SQLException, UnreachableBackendException
00178 {
00179
00180 handleMacros(request);
00181
00182
00183 AbstractConnectionManager cm = backend.getConnectionManager(request
00184 .getLogin());
00185
00186
00187 if (cm == null)
00188 {
00189 String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00190 new String[]{request.getLogin(), backend.getName()});
00191 logger.error(msg);
00192 throw new SQLException(msg);
00193 }
00194
00195
00196 if (request.isAutoCommit())
00197 {
00198 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00199
00200
00201
00202 waitForAllWritesToComplete(backend);
00203
00204 ControllerResultSet rs = null;
00205 boolean badConnection;
00206 do
00207 {
00208 badConnection = false;
00209
00210 Connection c = null;
00211 try
00212 {
00213 c = cm.getConnection();
00214 }
00215 catch (UnreachableBackendException e1)
00216 {
00217 logger.error(Translate.get(
00218 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00219 disableBackend(backend);
00220 throw new UnreachableBackendException(Translate.get(
00221 "loadbalancer.backend.unreacheable", backend.getName()));
00222 }
00223
00224
00225 if (c == null)
00226 throw new SQLException(Translate.get(
00227 "loadbalancer.backend.no.connection", backend.getName()));
00228
00229
00230 try
00231 {
00232 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00233 cm.releaseConnection(c);
00234 }
00235 catch (SQLException e)
00236 {
00237 cm.releaseConnection(c);
00238 throw SQLExceptionFactory.getSQLException(e, Translate.get(
00239 "loadbalancer.request.failed.on.backend", new String[]{
00240 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00241 backend.getName(), e.getMessage()}));
00242 }
00243 catch (BadConnectionException e)
00244 {
00245 cm.deleteConnection(c);
00246 badConnection = true;
00247 }
00248 }
00249 while (badConnection);
00250 if (logger.isDebugEnabled())
00251 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00252 String.valueOf(request.getId()), backend.getName()}));
00253 return rs;
00254 }
00255 else
00256 {
00257 Connection c;
00258 long tid = request.getTransactionId();
00259 Long lTid = new Long(tid);
00260
00261
00262 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00263 waitForAllWritesToComplete(backend, request.getTransactionId());
00264
00265 try
00266 {
00267 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00268 }
00269 catch (UnreachableBackendException e1)
00270 {
00271 logger.error(Translate.get(
00272 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00273 disableBackend(backend);
00274 throw new SQLException(Translate.get(
00275 "loadbalancer.backend.unreacheable", backend.getName()));
00276 }
00277 catch (NoTransactionStartWhenDisablingException e)
00278 {
00279 String msg = Translate.get("loadbalancer.backend.is.disabling",
00280 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00281 backend.getName()});
00282 logger.error(msg);
00283 throw new SQLException(msg);
00284 }
00285
00286
00287 if (c == null)
00288 throw new SQLException(Translate.get(
00289 "loadbalancer.unable.retrieve.connection", new String[]{
00290 String.valueOf(tid), backend.getName()}));
00291
00292
00293 ControllerResultSet rs = null;
00294 try
00295 {
00296 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00297 }
00298 catch (SQLException e)
00299 {
00300 throw SQLExceptionFactory.getSQLException(e, Translate.get(
00301 "loadbalancer.request.failed.on.backend", new String[]{
00302 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00303 backend.getName(), e.getMessage()}));
00304 }
00305 catch (BadConnectionException e)
00306 {
00307
00308 cm.deleteConnection(tid);
00309 String msg = Translate.get(
00310 "loadbalancer.backend.disabling.connection.failure", backend
00311 .getName());
00312 logger.error(msg);
00313 disableBackend(backend);
00314 throw new SQLException(msg);
00315 }
00316 if (logger.isDebugEnabled())
00317 logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00318 new String[]{String.valueOf(tid), String.valueOf(request.getId()),
00319 backend.getName()}));
00320 return rs;
00321 }
00322 }
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335 public int execWriteRequest(AbstractWriteRequest request)
00336 throws AllBackendsFailedException, NoMoreBackendException, SQLException
00337 {
00338 return ((WriteRequestTask) execWriteRequest(request, false, null))
00339 .getResult();
00340 }
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352 public ControllerResultSet execWriteRequestWithKeys(
00353 AbstractWriteRequest request, MetadataCache metadataCache)
00354 throws AllBackendsFailedException, SQLException
00355 {
00356 return ((WriteRequestWithKeysTask) execWriteRequest(request, true,
00357 metadataCache)).getResult();
00358 }
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371
00372 private AbstractTask execWriteRequest(AbstractWriteRequest request,
00373 boolean useKeys, MetadataCache metadataCache)
00374 throws AllBackendsFailedException, NoMoreBackendException, SQLException
00375 {
00376 ArrayList backendThreads;
00377 ReadPrioritaryFIFOWriteLock lock;
00378
00379
00380 handleMacros(request);
00381
00382
00383 if (request.mightBlock())
00384 {
00385 backendThreads = backendBlockingThreads;
00386 lock = backendBlockingThreadsRWLock;
00387 }
00388 else
00389 {
00390 backendThreads = backendNonBlockingThreads;
00391 lock = backendNonBlockingThreadsRWLock;
00392 if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00393 && (request.getTransactionId() != 0))
00394 waitForAllWritesToComplete(request.getTransactionId());
00395
00396
00397
00398 }
00399
00400 try
00401 {
00402 lock.acquireWrite();
00403 }
00404 catch (InterruptedException e)
00405 {
00406 String msg = Translate.get(
00407 "loadbalancer.backendlist.acquire.writelock.failed", e);
00408 logger.error(msg);
00409 throw new SQLException(msg);
00410 }
00411
00412
00413
00414
00415 int nbOfThreads = backendThreads.size();
00416 if (nbOfThreads == 0)
00417 {
00418 lock.releaseWrite();
00419 throw new NoMoreBackendException(Translate
00420 .get("loadbalancer.backendlist.empty"));
00421 }
00422 else
00423 {
00424 if (logger.isDebugEnabled())
00425 logger.debug(Translate.get("loadbalancer.execute.on.several",
00426 new String[]{String.valueOf(request.getId()),
00427 String.valueOf(nbOfThreads)}));
00428 }
00429
00430
00431 AbstractTask task;
00432 if (useKeys)
00433 task = new WriteRequestWithKeysTask(getNbToWait(nbOfThreads),
00434 nbOfThreads, request, metadataCache);
00435 else
00436 task = new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads,
00437 request);
00438
00439 synchronized (task)
00440 {
00441
00442
00443
00444
00445
00446
00447 if (request.isAutoCommit())
00448 {
00449 for (int i = 0; i < nbOfThreads; i++)
00450 {
00451 BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00452 .get(i);
00453 synchronized (thread)
00454 {
00455 thread.addTask(task);
00456 }
00457 }
00458 }
00459 else
00460 {
00461 for (int i = 0; i < nbOfThreads; i++)
00462 {
00463 BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00464 .get(i);
00465 synchronized (thread)
00466 {
00467 thread.addTask(task, request.getTransactionId());
00468 }
00469 }
00470 }
00471
00472
00473 for (int i = 0; i < nbOfThreads; i++)
00474 {
00475 BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00476 .get(i);
00477 synchronized (thread)
00478 {
00479 thread.notify();
00480 }
00481 }
00482
00483 lock.releaseWrite();
00484
00485
00486 try
00487 {
00488
00489 long timeout = request.getTimeout() * 1000;
00490 if (timeout > 0)
00491 {
00492 long start = System.currentTimeMillis();
00493 task.wait(timeout);
00494 long end = System.currentTimeMillis();
00495 long remaining = timeout - (end - start);
00496 if (remaining <= 0)
00497 {
00498 if (task.setExpiredTimeout())
00499 {
00500 String msg = Translate.get("loadbalancer.request.timeout",
00501 new String[]{String.valueOf(request.getId()),
00502 String.valueOf(task.getSuccess()),
00503 String.valueOf(task.getFailed())});
00504
00505 logger.warn(msg);
00506 throw new SQLException(msg);
00507 }
00508
00509 }
00510
00511 }
00512 else
00513 task.wait();
00514 }
00515 catch (InterruptedException e)
00516 {
00517 if (task.setExpiredTimeout())
00518 {
00519 String msg = Translate.get("loadbalancer.request.timeout",
00520 new String[]{String.valueOf(request.getId()),
00521 String.valueOf(task.getSuccess()),
00522 String.valueOf(task.getFailed())});
00523
00524 logger.warn(msg);
00525 throw new SQLException(msg);
00526 }
00527
00528 }
00529
00530 if (task.getSuccess() > 0)
00531 return task;
00532 else
00533 {
00534 ArrayList exceptions = task.getExceptions();
00535 if (exceptions == null)
00536 throw new AllBackendsFailedException(Translate.get(
00537 "loadbalancer.request.failed.all", request.getId()));
00538 else
00539 {
00540 String errorMsg = Translate.get("loadbalancer.request.failed.stack",
00541 request.getId())
00542 + "\n";
00543 SQLException ex = SQLExceptionFactory.getSQLException(exceptions,
00544 errorMsg);
00545 logger.error(ex.getMessage());
00546 throw ex;
00547 }
00548 }
00549 }
00550 }
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561 protected ControllerResultSet executeStoredProcedureOnBackend(
00562 StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache)
00563 throws SQLException, UnreachableBackendException
00564 {
00565
00566 handleMacros(proc);
00567
00568
00569 AbstractConnectionManager cm = backend
00570 .getConnectionManager(proc.getLogin());
00571
00572
00573 if (cm == null)
00574 {
00575 String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00576 new String[]{proc.getLogin(), backend.getName()});
00577 logger.error(msg);
00578 throw new SQLException(msg);
00579 }
00580
00581
00582 if (proc.isAutoCommit())
00583 {
00584 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00585
00586
00587
00588 waitForAllWritesToComplete(backend);
00589
00590
00591 Connection c = null;
00592 try
00593 {
00594 c = cm.getConnection();
00595 }
00596 catch (UnreachableBackendException e1)
00597 {
00598 logger.error(Translate.get(
00599 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00600 disableBackend(backend);
00601 throw new UnreachableBackendException(Translate.get(
00602 "loadbalancer.backend.unreacheable", backend.getName()));
00603 }
00604
00605
00606 if (c == null)
00607 throw new UnreachableBackendException(Translate.get(
00608 "loadbalancer.backend.no.connection", backend.getName()));
00609
00610
00611 ControllerResultSet rs = null;
00612 try
00613 {
00614 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00615 backend, c, metadataCache);
00616 }
00617 catch (Exception e)
00618 {
00619 throw new SQLException(Translate.get(
00620 "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00621 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00622 backend.getName(), e.getMessage()}));
00623 }
00624 finally
00625 {
00626 cm.releaseConnection(c);
00627 }
00628 if (logger.isDebugEnabled())
00629 logger.debug(Translate.get("loadbalancer.storedprocedure.on",
00630 new String[]{String.valueOf(proc.getId()), backend.getName()}));
00631 return rs;
00632 }
00633 else
00634 {
00635 Connection c;
00636 long tid = proc.getTransactionId();
00637 Long lTid = new Long(tid);
00638
00639
00640 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00641 waitForAllWritesToComplete(backend, proc.getTransactionId());
00642
00643 try
00644 {
00645 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00646 }
00647 catch (UnreachableBackendException e1)
00648 {
00649 logger.error(Translate.get(
00650 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00651 disableBackend(backend);
00652 throw new SQLException(Translate.get(
00653 "loadbalancer.backend.unreacheable", backend.getName()));
00654 }
00655 catch (NoTransactionStartWhenDisablingException e)
00656 {
00657 String msg = Translate.get("loadbalancer.backend.is.disabling",
00658 new String[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00659 backend.getName()});
00660 logger.error(msg);
00661 throw new SQLException(msg);
00662 }
00663
00664
00665 if (c == null)
00666 throw new SQLException(Translate.get(
00667 "loadbalancer.unable.retrieve.connection", new String[]{
00668 String.valueOf(tid), backend.getName()}));
00669
00670
00671 ControllerResultSet rs;
00672 try
00673 {
00674 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00675 backend, c, metadataCache);
00676 }
00677 catch (Exception e)
00678 {
00679 throw new SQLException(Translate.get(
00680 "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00681 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00682 backend.getName(), e.getMessage()}));
00683 }
00684 if (logger.isDebugEnabled())
00685 logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00686 new String[]{String.valueOf(tid), String.valueOf(proc.getId()),
00687 backend.getName()}));
00688 return rs;
00689 }
00690 }
00691
00692
00693
00694
00695
00696 public ControllerResultSet execReadStoredProcedure(StoredProcedure proc,
00697 MetadataCache metadataCache) throws SQLException
00698 {
00699 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure(
00700 proc, true, metadataCache);
00701 return task.getResult();
00702 }
00703
00704
00705
00706
00707 public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException
00708 {
00709 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure(
00710 proc, false, null);
00711 return task.getResult();
00712 }
00713
00714
00715
00716
00717
00718
00719
00720
00721
00722
00723
00724 private AbstractTask callStoredProcedure(StoredProcedure proc,
00725 boolean isRead, MetadataCache metadataCache) throws SQLException
00726 {
00727 ArrayList backendThreads = backendBlockingThreads;
00728 ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock;
00729
00730 try
00731 {
00732
00733
00734
00735
00736
00737 lock.acquireWrite();
00738 }
00739 catch (InterruptedException e)
00740 {
00741 String msg;
00742 msg = Translate.get("loadbalancer.backendlist.acquire.writelock.failed",
00743 e);
00744 logger.error(msg);
00745 throw new SQLException(msg);
00746 }
00747
00748 int nbOfThreads = backendThreads.size();
00749 if (nbOfThreads == 0)
00750 {
00751 lock.releaseWrite();
00752 throw new NoMoreBackendException(Translate
00753 .get("loadbalancer.backendlist.empty"));
00754 }
00755 else
00756 {
00757 if (logger.isDebugEnabled())
00758 logger.debug(Translate.get("loadbalancer.execute.on.several",
00759 new String[]{String.valueOf(proc.getId()),
00760 String.valueOf(nbOfThreads)}));
00761 }
00762
00763
00764 AbstractTask task;
00765 if (isRead)
00766 task = new ReadStoredProcedureTask(getNbToWait(nbOfThreads), nbOfThreads,
00767 proc, metadataCache);
00768 else
00769 task = new WriteStoredProcedureTask(getNbToWait(nbOfThreads),
00770 nbOfThreads, proc);
00771
00772 synchronized (task)
00773 {
00774
00775 for (int i = 0; i < nbOfThreads; i++)
00776 {
00777 BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00778 .get(i);
00779 synchronized (thread)
00780 {
00781 if (proc.isAutoCommit())
00782 thread.addTask(task);
00783 else
00784 thread.addTask(task, proc.getTransactionId());
00785 thread.notify();
00786 }
00787 }
00788
00789 lock.releaseWrite();
00790
00791
00792 try
00793 {
00794
00795 long timeout = proc.getTimeout() * 1000;
00796 if (timeout > 0)
00797 {
00798 long start = System.currentTimeMillis();
00799 task.wait(timeout);
00800 long end = System.currentTimeMillis();
00801 long remaining = timeout - (end - start);
00802 if (remaining <= 0)
00803 {
00804 if (task.setExpiredTimeout())
00805 {
00806 String msg = Translate.get(
00807 "loadbalancer.storedprocedure.timeout", new String[]{
00808 String.valueOf(proc.getId()),
00809 String.valueOf(task.getSuccess()),
00810 String.valueOf(task.getFailed())});
00811 logger.warn(msg);
00812 throw new SQLException(msg);
00813 }
00814
00815 }
00816
00817 }
00818 else
00819 task.wait();
00820 }
00821 catch (InterruptedException e)
00822 {
00823 if (task.setExpiredTimeout())
00824 {
00825 String msg = Translate.get("loadbalancer.storedprocedure.timeout",
00826 new String[]{String.valueOf(proc.getId()),
00827 String.valueOf(task.getSuccess()),
00828 String.valueOf(task.getFailed())});
00829 logger.warn(msg);
00830 throw new SQLException(msg);
00831 }
00832
00833 }
00834
00835 if (task.getSuccess() > 0)
00836 return task;
00837 else
00838 {
00839 ArrayList exceptions = task.getExceptions();
00840 if (exceptions == null)
00841 throw new SQLException(Translate.get(
00842 "loadbalancer.storedprocedure.all.failed", proc.getId()));
00843 else
00844 {
00845 String errorMsg = Translate.get(
00846 "loadbalancer.storedprocedure.failed.stack", proc.getId())
00847 + "\n";
00848 SQLException ex = SQLExceptionFactory.getSQLException(exceptions,
00849 errorMsg);
00850 logger.error(ex.getMessage());
00851 throw ex;
00852 }
00853 }
00854 }
00855 }
00856
00857
00858
00859
00860
00861
00862
00863
00864
00865
00866
00867 public final void begin(TransactionMarkerMetaData tm) throws SQLException
00868 {
00869 }
00870
00871
00872
00873
00874
00875
00876
00877 public void commit(TransactionMarkerMetaData tm) throws SQLException
00878 {
00879 long tid = tm.getTransactionId();
00880 Long lTid = new Long(tid);
00881
00882
00883 ArrayList asynchronousBackends = null;
00884 CommitTask task = null;
00885
00886 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00887 {
00888 try
00889 {
00890 backendBlockingThreadsRWLock.acquireWrite();
00891
00892
00893
00894 }
00895 catch (InterruptedException e)
00896 {
00897 String msg = Translate.get(
00898 "loadbalancer.backendlist.acquire.writelock.failed", e);
00899 logger.error(msg);
00900 throw new SQLException(msg);
00901 }
00902
00903 int nbOfThreads = backendBlockingThreads.size();
00904
00905 task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm
00906 .getTimeout(), tm.getLogin(), tid);
00907
00908 for (int i = 0; i < nbOfThreads; i++)
00909 {
00910 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
00911 .get(i);
00912 if (thread.hasTaskForTransaction(lTid))
00913 {
00914 if (asynchronousBackends == null)
00915 asynchronousBackends = new ArrayList();
00916 asynchronousBackends.add(thread.getBackend());
00917 synchronized (thread)
00918 {
00919 thread.insertTaskAfterLastWriteForTransaction(task, lTid);
00920 thread.notify();
00921 }
00922 }
00923 }
00924
00925 backendBlockingThreadsRWLock.releaseWrite();
00926 }
00927
00928 try
00929 {
00930 backendNonBlockingThreadsRWLock.acquireWrite();
00931 }
00932 catch (InterruptedException e)
00933 {
00934 String msg = Translate.get(
00935 "loadbalancer.backendlist.acquire.writelock.failed", e);
00936 logger.error(msg);
00937 throw new SQLException(msg);
00938 }
00939
00940 int nbOfThreads = backendNonBlockingThreads.size();
00941 ArrayList commitList = new ArrayList();
00942
00943
00944 for (int i = 0; i < nbOfThreads; i++)
00945 {
00946 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
00947 .get(i);
00948 DatabaseBackend backend = thread.getBackend();
00949
00950
00951
00952 if (backend.isStartedTransaction(lTid)
00953 && ((asynchronousBackends == null) || (!asynchronousBackends
00954 .contains(backend))))
00955 commitList.add(thread);
00956 }
00957
00958 nbOfThreads = commitList.size();
00959 if (nbOfThreads == 0)
00960 {
00961 backendNonBlockingThreadsRWLock.releaseWrite();
00962 return;
00963 }
00964
00965 if (task == null)
00966 task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm
00967 .getTimeout(), tm.getLogin(), tid);
00968
00969 synchronized (task)
00970 {
00971
00972 for (int i = 0; i < nbOfThreads; i++)
00973 {
00974 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i);
00975 synchronized (thread)
00976 {
00977 thread.addTask(task, tid);
00978 thread.notify();
00979 }
00980 }
00981
00982 backendNonBlockingThreadsRWLock.releaseWrite();
00983
00984
00985 try
00986 {
00987
00988 long timeout = tm.getTimeout();
00989 if (timeout > 0)
00990 {
00991 long start = System.currentTimeMillis();
00992 task.wait(timeout);
00993 long end = System.currentTimeMillis();
00994 long remaining = timeout - (end - start);
00995 if (remaining <= 0)
00996 {
00997 if (task.setExpiredTimeout())
00998 {
00999 String msg = Translate.get("loadbalancer.commit.timeout",
01000 new String[]{String.valueOf(tid),
01001 String.valueOf(task.getSuccess()),
01002 String.valueOf(task.getFailed())});
01003 logger.warn(msg);
01004 throw new SQLException(msg);
01005 }
01006
01007 }
01008 }
01009 else
01010 task.wait();
01011 }
01012 catch (InterruptedException e)
01013 {
01014 if (task.setExpiredTimeout())
01015 {
01016 String msg = Translate.get("loadbalancer.commit.timeout",
01017 new String[]{String.valueOf(tid),
01018 String.valueOf(task.getSuccess()),
01019 String.valueOf(task.getFailed())});
01020 logger.warn(msg);
01021 throw new SQLException(msg);
01022 }
01023
01024 }
01025
01026 if (task.getSuccess() > 0)
01027 return;
01028 else
01029 {
01030 ArrayList exceptions = task.getExceptions();
01031 if (exceptions == null)
01032 throw new SQLException(Translate.get(
01033 "loadbalancer.commit.all.failed", tid));
01034 else
01035 {
01036 String errorMsg = Translate.get("loadbalancer.commit.failed.stack",
01037 tid)
01038 + "\n";
01039 SQLException ex = SQLExceptionFactory.getSQLException(exceptions,
01040 errorMsg);
01041 logger.error(ex.getMessage());
01042 throw ex;
01043 }
01044 }
01045 }
01046 }
01047
01048
01049
01050
01051
01052
01053
01054 public void rollback(TransactionMarkerMetaData tm) throws SQLException
01055 {
01056 long tid = tm.getTransactionId();
01057 Long lTid = new Long(tid);
01058
01059
01060 ArrayList asynchronousBackends = null;
01061 RollbackTask task = null;
01062
01063 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
01064 {
01065 try
01066 {
01067 backendBlockingThreadsRWLock.acquireWrite();
01068
01069
01070
01071 }
01072 catch (InterruptedException e)
01073 {
01074 String msg = Translate.get(
01075 "loadbalancer.backendlist.acquire.writelock.failed", e);
01076 logger.error(msg);
01077 throw new SQLException(msg);
01078 }
01079
01080 int nbOfThreads = backendBlockingThreads.size();
01081
01082 task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, tm
01083 .getTimeout(), tm.getLogin(), tid);
01084
01085 for (int i = 0; i < nbOfThreads; i++)
01086 {
01087 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01088 .get(i);
01089 if (thread.hasTaskForTransaction(lTid))
01090 {
01091 if (asynchronousBackends == null)
01092 asynchronousBackends = new ArrayList();
01093 asynchronousBackends.add(thread.getBackend());
01094 synchronized (thread)
01095 {
01096 thread.insertTaskAfterLastWriteForTransaction(task, lTid);
01097 thread.notify();
01098 }
01099 }
01100 }
01101
01102 backendBlockingThreadsRWLock.releaseWrite();
01103 }
01104
01105 try
01106 {
01107 backendNonBlockingThreadsRWLock.acquireWrite();
01108 }
01109 catch (InterruptedException e)
01110 {
01111 String msg = Translate.get(
01112 "loadbalancer.backendlist.acquire.writelock.failed", e);
01113 logger.error(msg);
01114 throw new SQLException(msg);
01115 }
01116 int nbOfThreads = backendNonBlockingThreads.size();
01117 ArrayList rollbackList = new ArrayList();
01118
01119
01120 for (int i = 0; i < nbOfThreads; i++)
01121 {
01122 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
01123 .get(i);
01124 DatabaseBackend backend = thread.getBackend();
01125
01126
01127
01128 if (backend.isStartedTransaction(lTid)
01129 && ((asynchronousBackends == null) || (!asynchronousBackends
01130 .contains(backend))))
01131 rollbackList.add(thread);
01132 }
01133
01134 nbOfThreads = rollbackList.size();
01135 if (nbOfThreads == 0)
01136 {
01137 backendNonBlockingThreadsRWLock.releaseWrite();
01138 return;
01139 }
01140
01141 if (task == null)
01142 task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, tm
01143 .getTimeout(), tm.getLogin(), tid);
01144
01145 synchronized (task)
01146 {
01147
01148 for (int i = 0; i < nbOfThreads; i++)
01149 {
01150 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i);
01151 synchronized (thread)
01152 {
01153 thread.addTask(task, tid);
01154 thread.notify();
01155 }
01156 }
01157
01158 backendNonBlockingThreadsRWLock.releaseWrite();
01159
01160
01161 try
01162 {
01163
01164 long timeout = tm.getTimeout();
01165 if (timeout > 0)
01166 {
01167 long start = System.currentTimeMillis();
01168 task.wait(timeout);
01169 long end = System.currentTimeMillis();
01170 long remaining = timeout - (end - start);
01171 if (remaining <= 0)
01172 {
01173 if (task.setExpiredTimeout())
01174 {
01175 String msg = Translate.get("loadbalancer.rollback.timeout",
01176 new String[]{String.valueOf(tid),
01177 String.valueOf(task.getSuccess()),
01178 String.valueOf(task.getFailed())});
01179 logger.warn(msg);
01180 throw new SQLException(msg);
01181 }
01182
01183 }
01184 }
01185 else
01186 task.wait();
01187 }
01188 catch (InterruptedException e)
01189 {
01190 if (task.setExpiredTimeout())
01191 {
01192 String msg = Translate.get("loadbalancer.rollback.timeout",
01193 new String[]{String.valueOf(tid),
01194 String.valueOf(task.getSuccess()),
01195 String.valueOf(task.getFailed())});
01196 logger.warn(msg);
01197 throw new SQLException(msg);
01198 }
01199
01200 }
01201
01202 if (task.getSuccess() > 0)
01203 return;
01204 else
01205 {
01206 ArrayList exceptions = task.getExceptions();
01207 if (exceptions == null)
01208 throw new SQLException(Translate.get(
01209 "loadbalancer.rollback.all.failed", tid));
01210 else
01211 {
01212 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack",
01213 tid)
01214 + "\n";
01215 SQLException ex = SQLExceptionFactory.getSQLException(exceptions,
01216 errorMsg);
01217 logger.error(ex.getMessage());
01218 throw ex;
01219 }
01220 }
01221 }
01222 }
01223
01224
01225
01226
01227
01228 protected void waitForAllWritesToComplete(long transactionId)
01229 throws SQLException
01230 {
01231 try
01232 {
01233 backendBlockingThreadsRWLock.acquireWrite();
01234
01235
01236
01237 }
01238 catch (InterruptedException e)
01239 {
01240 String msg = Translate.get(
01241 "loadbalancer.backendlist.acquire.writelock.failed", e);
01242 logger.error(msg);
01243 throw new SQLException(msg);
01244 }
01245
01246 int nbOfThreads = backendBlockingThreads.size();
01247
01248 for (int i = 0; i < nbOfThreads; i++)
01249 {
01250 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01251 .get(i);
01252 thread.waitForAllTasksToComplete(transactionId);
01253 }
01254
01255 backendBlockingThreadsRWLock.releaseWrite();
01256 }
01257
01258
01259
01260
01261
01262
01263
01264
01265 protected void waitForAllWritesToComplete(DatabaseBackend backend,
01266 long transactionId) throws SQLException
01267 {
01268 try
01269 {
01270 backendBlockingThreadsRWLock.acquireWrite();
01271
01272
01273
01274 }
01275 catch (InterruptedException e)
01276 {
01277 String msg = Translate.get(
01278 "loadbalancer.backendlist.acquire.writelock.failed", e);
01279 logger.error(msg);
01280 throw new SQLException(msg);
01281 }
01282
01283 int nbOfThreads = backendBlockingThreads.size();
01284
01285 for (int i = 0; i < nbOfThreads; i++)
01286 {
01287 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01288 .get(i);
01289 if (thread.getBackend() == backend)
01290 thread.waitForAllTasksToComplete(transactionId);
01291 }
01292
01293 backendBlockingThreadsRWLock.releaseWrite();
01294 }
01295
01296
01297
01298
01299
01300
01301
01302 protected void waitForAllWritesToComplete(DatabaseBackend backend)
01303 throws SQLException
01304 {
01305 try
01306 {
01307 backendBlockingThreadsRWLock.acquireWrite();
01308
01309
01310
01311 }
01312 catch (InterruptedException e)
01313 {
01314 String msg = Translate.get(
01315 "loadbalancer.backendlist.acquire.writelock.failed", e);
01316 logger.error(msg);
01317 throw new SQLException(msg);
01318 }
01319
01320 int nbOfThreads = backendBlockingThreads.size();
01321
01322 for (int i = 0; i < nbOfThreads; i++)
01323 {
01324 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01325 .get(i);
01326 if (thread.getBackend() == backend)
01327 thread.waitForAllTasksToComplete();
01328 }
01329
01330 backendBlockingThreadsRWLock.releaseWrite();
01331 }
01332
01333
01334
01335
01336
01337
01338
01339
01340
01341
01342
01343
01344
01345
01346
01347
01348
01349 public void enableBackend(DatabaseBackend db, boolean writeEnabled)
01350 throws SQLException
01351 {
01352 if (writeEnabled && db.isWriteCanBeEnabled())
01353 {
01354
01355 BackendWorkerThread blockingThread = new BackendWorkerThread(db, this);
01356 BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this);
01357
01358
01359 try
01360 {
01361 backendBlockingThreadsRWLock.acquireWrite();
01362 }
01363 catch (InterruptedException e)
01364 {
01365 String msg = Translate.get(
01366 "loadbalancer.backendlist.acquire.writelock.failed", e);
01367 logger.error(msg);
01368 throw new SQLException(msg);
01369 }
01370 backendBlockingThreads.add(blockingThread);
01371 backendBlockingThreadsRWLock.releaseWrite();
01372 blockingThread.start();
01373 logger.info(Translate.get(
01374 "loadbalancer.backend.workerthread.blocking.add", db.getName()));
01375
01376
01377 try
01378 {
01379 backendNonBlockingThreadsRWLock.acquireWrite();
01380 }
01381 catch (InterruptedException e)
01382 {
01383 String msg = Translate.get(
01384 "loadbalancer.backendlist.acquire.writelock.failed", e);
01385 logger.error(msg);
01386 throw new SQLException(msg);
01387 }
01388 backendNonBlockingThreads.add(nonBlockingThread);
01389 backendNonBlockingThreadsRWLock.releaseWrite();
01390 nonBlockingThread.start();
01391 logger.info(Translate.get(
01392 "loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
01393 db.enableWrite();
01394 }
01395
01396 if (!db.isInitialized())
01397 db.initializeConnections();
01398 db.enableRead();
01399 }
01400
01401
01402
01403
01404
01405
01406
01407
01408
01409
01410
01411
01412 public synchronized void disableBackend(DatabaseBackend db)
01413 throws SQLException
01414 {
01415 if (db.isWriteEnabled())
01416 {
01417
01418 try
01419 {
01420 backendBlockingThreadsRWLock.acquireWrite();
01421 }
01422 catch (InterruptedException e)
01423 {
01424 String msg = Translate.get(
01425 "loadbalancer.backendlist.acquire.writelock.failed", e);
01426 logger.error(msg);
01427 throw new SQLException(msg);
01428 }
01429
01430 int nbOfThreads = backendBlockingThreads.size();
01431
01432
01433 for (int i = 0; i < nbOfThreads; i++)
01434 {
01435 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01436 .get(i);
01437 if (thread.getBackend().equals(db))
01438 {
01439 logger.info(Translate
01440 .get("loadbalancer.backend.workerthread.blocking.remove", db
01441 .getName()));
01442
01443
01444 backendBlockingThreads.remove(thread);
01445
01446 synchronized (thread)
01447 {
01448
01449 thread.addPriorityTask(new KillThreadTask(1, 1));
01450 thread.notify();
01451 }
01452 break;
01453 }
01454 }
01455
01456 backendBlockingThreadsRWLock.releaseWrite();
01457
01458
01459
01460 try
01461 {
01462 backendNonBlockingThreadsRWLock.acquireWrite();
01463 }
01464 catch (InterruptedException e)
01465 {
01466 String msg = Translate.get(
01467 "loadbalancer.backendlist.acquire.writelock.failed", e);
01468 logger.error(msg);
01469 throw new SQLException(msg);
01470 }
01471
01472
01473 nbOfThreads = backendNonBlockingThreads.size();
01474 for (int i = 0; i < nbOfThreads; i++)
01475 {
01476 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
01477 .get(i);
01478 if (thread.getBackend().equals(db))
01479 {
01480 logger.info(Translate.get(
01481 "loadbalancer.backend.workerthread.non.blocking.remove", db
01482 .getName()));
01483
01484
01485 backendNonBlockingThreads.remove(thread);
01486
01487 synchronized (thread)
01488 {
01489
01490 thread.addPriorityTask(new KillThreadTask(1, 1));
01491 thread.notify();
01492 }
01493 break;
01494 }
01495 }
01496
01497 backendNonBlockingThreadsRWLock.releaseWrite();
01498 }
01499
01500 db.disable();
01501 if (db.isInitialized())
01502 db.finalizeConnections();
01503 }
01504
01505
01506
01507
01508 public String getXmlImpl()
01509 {
01510 StringBuffer info = new StringBuffer();
01511 info.append("<" + DatabasesXmlTags.ELT_RAIDb_1 + ">");
01512 if (waitForCompletionPolicy != null)
01513 info.append(waitForCompletionPolicy.getXml());
01514 if (macroHandler != null)
01515 info.append(macroHandler.getXml());
01516 info.append(getRaidb1Xml());
01517 info.append("</" + DatabasesXmlTags.ELT_RAIDb_1 + ">");
01518 return info.toString();
01519 }
01520
01521
01522
01523
01524
01525
01526
01527
01528 public abstract String getRaidb1Xml();
01529 }