00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb2;
00026
00027 import java.sql.Connection;
00028 import java.sql.SQLException;
00029 import java.util.ArrayList;
00030
00031 import org.objectweb.cjdbc.common.exceptions.BadConnectionException;
00032 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException;
00033 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
00034 import org.objectweb.cjdbc.common.i18n.Translate;
00035 import org.objectweb.cjdbc.common.log.Trace;
00036 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00037 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00038 import org.objectweb.cjdbc.common.sql.SelectRequest;
00039 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00040 import org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock;
00041 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00042 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00043 import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
00044 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
00045 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00046 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00047 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00048 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
00049 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableException;
00050 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy;
00051 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule;
00052 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
00053 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask;
00054 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00055 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask;
00056 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask;
00057 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestTask;
00058 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestWithKeysTask;
00059 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask;
00060 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00061 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00062 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00063 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076 public abstract class RAIDb2 extends AbstractLoadBalancer
00077 {
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087 protected ArrayList backendBlockingThreads;
00088 protected ArrayList backendNonBlockingThreads;
00089 protected ReadPrioritaryFIFOWriteLock backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock();
00090 protected ReadPrioritaryFIFOWriteLock backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock();
00091
00092 protected WaitForCompletionPolicy waitForCompletionPolicy;
00093 protected CreateTablePolicy createTablePolicy;
00094
00095 protected static Trace logger = Trace
00096 .getLogger("org.objectweb.cjdbc.controller.loadbalancer.raidb2");
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113 public RAIDb2(VirtualDatabase vdb,
00114 WaitForCompletionPolicy waitForCompletionPolicy,
00115 CreateTablePolicy createTablePolicy) throws Exception
00116 {
00117 super(vdb, RAIDbLevels.RAIDb2, ParsingGranularities.TABLE);
00118
00119 this.waitForCompletionPolicy = waitForCompletionPolicy;
00120 backendBlockingThreads = new ArrayList();
00121 backendNonBlockingThreads = new ArrayList();
00122 this.createTablePolicy = createTablePolicy;
00123 }
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136 private int getNbToWait(int nbOfThreads)
00137 {
00138 int nbToWait;
00139 switch (waitForCompletionPolicy.getPolicy())
00140 {
00141 case WaitForCompletionPolicy.FIRST :
00142 nbToWait = 1;
00143 break;
00144 case WaitForCompletionPolicy.MAJORITY :
00145 nbToWait = nbOfThreads / 2 + 1;
00146 break;
00147 default :
00148 logger
00149 .warn(Translate.get("loadbalancer.waitforcompletion.unsupported"));
00150 case WaitForCompletionPolicy.ALL :
00151 nbToWait = nbOfThreads;
00152 break;
00153 }
00154 return nbToWait;
00155 }
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167 public int execWriteRequest(AbstractWriteRequest request)
00168 throws AllBackendsFailedException, SQLException
00169 {
00170 return ((WriteRequestTask) execWriteRequest(request, false, null))
00171 .getResult();
00172 }
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184 public ControllerResultSet execWriteRequestWithKeys(
00185 AbstractWriteRequest request, MetadataCache metadataCache)
00186 throws AllBackendsFailedException, SQLException
00187 {
00188 return ((WriteRequestWithKeysTask) execWriteRequest(request, true,
00189 metadataCache)).getResult();
00190 }
00191
00192
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205 private AbstractTask execWriteRequest(AbstractWriteRequest request,
00206 boolean useKeys, MetadataCache metadataCache)
00207 throws AllBackendsFailedException, SQLException
00208 {
00209 ArrayList backendThreads;
00210 ReadPrioritaryFIFOWriteLock lock;
00211
00212
00213 handleMacros(request);
00214
00215
00216 if (request.mightBlock())
00217 {
00218 backendThreads = backendBlockingThreads;
00219 lock = backendBlockingThreadsRWLock;
00220 }
00221 else
00222 {
00223 backendThreads = backendNonBlockingThreads;
00224 lock = backendNonBlockingThreadsRWLock;
00225 if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00226 && (request.getTransactionId() != 0))
00227 waitForAllWritesToComplete(request.getTransactionId());
00228 }
00229
00230 try
00231 {
00232 lock.acquireWrite();
00233 }
00234 catch (InterruptedException e)
00235 {
00236 String msg = Translate.get(
00237 "loadbalancer.backendlist.acquire.writelock.failed", e);
00238 logger.error(msg);
00239 throw new SQLException(msg);
00240 }
00241
00242 int nbOfThreads = backendThreads.size();
00243 ArrayList writeList = new ArrayList();
00244 String tableName = request.getTableName();
00245
00246 if (request.isCreate())
00247 {
00248 CreateTableRule rule = createTablePolicy.getTableRule(request
00249 .getTableName());
00250 if (rule == null)
00251 rule = createTablePolicy.getDefaultRule();
00252
00253
00254 ArrayList chosen;
00255 try
00256 {
00257 chosen = rule.getBackends(vdb.getBackends());
00258 }
00259 catch (CreateTableException e)
00260 {
00261 throw new SQLException(Translate.get(
00262 "loadbalancer.create.table.rule.failed", e.getMessage()));
00263 }
00264
00265
00266 if (chosen != null)
00267 {
00268 for (int i = 0; i < nbOfThreads; i++)
00269 {
00270 BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00271 .get(i);
00272 if (chosen.contains(thread.getBackend()))
00273 writeList.add(thread);
00274 }
00275 }
00276 }
00277 else
00278 {
00279 for (int i = 0; i < nbOfThreads; i++)
00280 {
00281 BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00282 .get(i);
00283 if (thread.getBackend().hasTable(tableName))
00284 writeList.add(thread);
00285 }
00286 }
00287
00288 nbOfThreads = writeList.size();
00289 if (nbOfThreads == 0)
00290 {
00291 lock.releaseWrite();
00292 String msg = Translate.get("loadbalancer.execute.no.backend.found",
00293 request.getSQLShortForm(vdb.getSQLShortFormLength()));
00294 logger.warn(msg);
00295 throw new SQLException(msg);
00296 }
00297 else
00298 {
00299 if (logger.isDebugEnabled())
00300 logger.debug(Translate.get("loadbalancer.execute.on.several",
00301 new String[]{String.valueOf(request.getId()),
00302 String.valueOf(nbOfThreads)}));
00303 }
00304
00305
00306 AbstractTask task;
00307 if (useKeys)
00308 task = new WriteRequestWithKeysTask(getNbToWait(nbOfThreads),
00309 nbOfThreads, request, metadataCache);
00310 else
00311 task = new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads,
00312 request);
00313
00314 synchronized (task)
00315 {
00316
00317
00318
00319
00320
00321
00322 if (request.isAutoCommit())
00323 {
00324 for (int i = 0; i < nbOfThreads; i++)
00325 {
00326 BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i);
00327 synchronized (thread)
00328 {
00329 thread.addTask(task);
00330 }
00331 }
00332 }
00333 else
00334 {
00335 for (int i = 0; i < nbOfThreads; i++)
00336 {
00337 BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i);
00338 synchronized (thread)
00339 {
00340 thread.addTask(task, request.getTransactionId());
00341 }
00342 }
00343 }
00344
00345
00346 for (int i = 0; i < nbOfThreads; i++)
00347 {
00348 BackendWorkerThread thread = (BackendWorkerThread) writeList.get(i);
00349 synchronized (thread)
00350 {
00351 thread.notify();
00352 }
00353 }
00354
00355 lock.releaseWrite();
00356
00357
00358 try
00359 {
00360
00361 long timeout = request.getTimeout() * 1000;
00362 if (timeout > 0)
00363 {
00364 long start = System.currentTimeMillis();
00365 task.wait(timeout);
00366 long end = System.currentTimeMillis();
00367 long remaining = timeout - (end - start);
00368 if (remaining <= 0)
00369 {
00370 if (task.setExpiredTimeout())
00371 {
00372 String msg = Translate.get("loadbalancer.request.timeout",
00373 new String[]{String.valueOf(request.getId()),
00374 String.valueOf(task.getSuccess()),
00375 String.valueOf(task.getFailed())});
00376 logger.warn(msg);
00377 throw new SQLException(msg);
00378 }
00379
00380 }
00381
00382 }
00383 else
00384 task.wait();
00385 }
00386 catch (InterruptedException e)
00387 {
00388 if (task.setExpiredTimeout())
00389 {
00390 String msg = Translate.get("loadbalancer.request.timeout",
00391 new String[]{String.valueOf(request.getId()),
00392 String.valueOf(task.getSuccess()),
00393 String.valueOf(task.getFailed())});
00394 logger.warn(msg);
00395 throw new SQLException(msg);
00396 }
00397
00398 }
00399
00400 if (task.getSuccess() > 0)
00401 return task;
00402 else
00403 {
00404 ArrayList exceptions = task.getExceptions();
00405 if (exceptions == null)
00406 throw new AllBackendsFailedException(Translate.get(
00407 "loadbalancer.request.failed.all", request.getId()));
00408 else
00409 {
00410 String errorMsg = Translate.get("loadbalancer.request.failed.stack",
00411 request.getId())
00412 + "\n";
00413 for (int i = 0; i < exceptions.size(); i++)
00414 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
00415 logger.error(errorMsg);
00416 throw new SQLException(errorMsg);
00417 }
00418 }
00419 }
00420 }
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430 public abstract ControllerResultSet execReadRequest(SelectRequest request,
00431 MetadataCache metadataCache) throws SQLException;
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442 protected ControllerResultSet executeRequestOnBackend(SelectRequest request,
00443 DatabaseBackend backend, MetadataCache metadataCache)
00444 throws SQLException, UnreachableBackendException
00445 {
00446
00447 handleMacros(request);
00448
00449
00450 AbstractConnectionManager cm = backend.getConnectionManager(request
00451 .getLogin());
00452
00453
00454 if (cm == null)
00455 {
00456 String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00457 new String[]{request.getLogin(), backend.getName()});
00458 logger.error(msg);
00459 throw new SQLException(msg);
00460 }
00461
00462
00463 if (request.isAutoCommit())
00464 {
00465 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00466
00467
00468
00469 waitForAllWritesToComplete(backend);
00470
00471 ControllerResultSet rs = null;
00472 boolean badConnection;
00473 do
00474 {
00475 badConnection = false;
00476
00477 Connection c = null;
00478 try
00479 {
00480 c = cm.getConnection();
00481 }
00482 catch (UnreachableBackendException e1)
00483 {
00484 logger.error(Translate.get(
00485 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00486 disableBackend(backend);
00487 throw new UnreachableBackendException(Translate.get(
00488 "loadbalancer.backend.unreacheable", backend.getName()));
00489 }
00490
00491
00492 if (c == null)
00493 throw new UnreachableBackendException(
00494 "No more connections on backend " + backend.getName());
00495
00496
00497 try
00498 {
00499 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00500 cm.releaseConnection(c);
00501 }
00502 catch (SQLException e)
00503 {
00504 cm.releaseConnection(c);
00505 throw new SQLException(Translate.get(
00506 "loadbalancer.request.failed.on.backend", new String[]{
00507 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00508 backend.getName(), e.getMessage()}));
00509 }
00510 catch (BadConnectionException e)
00511 {
00512 cm.deleteConnection(c);
00513 badConnection = true;
00514 }
00515 }
00516 while (badConnection);
00517 if (logger.isDebugEnabled())
00518 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00519 String.valueOf(request.getId()), backend.getName()}));
00520 return rs;
00521 }
00522 else
00523 {
00524 Connection c;
00525 long tid = request.getTransactionId();
00526 Long lTid = new Long(tid);
00527
00528
00529 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00530 waitForAllWritesToComplete(backend, request.getTransactionId());
00531
00532 try
00533 {
00534 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00535 }
00536 catch (UnreachableBackendException e1)
00537 {
00538 logger.error(Translate.get(
00539 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00540 disableBackend(backend);
00541 throw new SQLException(Translate.get(
00542 "loadbalancer.backend.unreacheable", backend.getName()));
00543 }
00544 catch (NoTransactionStartWhenDisablingException e)
00545 {
00546 String msg = Translate.get("loadbalancer.backend.is.disabling",
00547 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00548 backend.getName()});
00549 logger.error(msg);
00550 throw new SQLException(msg);
00551 }
00552
00553
00554 if (c == null)
00555 throw new SQLException(Translate.get(
00556 "loadbalancer.unable.retrieve.connection", new String[]{
00557 String.valueOf(tid), backend.getName()}));
00558
00559
00560 ControllerResultSet rs = null;
00561 try
00562 {
00563 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00564 }
00565 catch (SQLException e)
00566 {
00567 throw new SQLException(Translate.get(
00568 "loadbalancer.request.failed.on.backend", new String[]{
00569 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00570 backend.getName(), e.getMessage()}));
00571 }
00572 catch (BadConnectionException e)
00573 {
00574
00575 cm.deleteConnection(tid);
00576 String msg = Translate.get(
00577 "loadbalancer.backend.disabling.connection.failure", backend
00578 .getName());
00579 logger.error(msg);
00580 disableBackend(backend);
00581 throw new SQLException(msg);
00582 }
00583 if (logger.isDebugEnabled())
00584 logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00585 new String[]{String.valueOf(tid), String.valueOf(request.getId()),
00586 backend.getName()}));
00587 return rs;
00588 }
00589 }
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600 protected ControllerResultSet executeStoredProcedureOnBackend(
00601 StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache)
00602 throws SQLException, UnreachableBackendException
00603 {
00604
00605 handleMacros(proc);
00606
00607
00608 AbstractConnectionManager cm = backend
00609 .getConnectionManager(proc.getLogin());
00610
00611
00612 if (cm == null)
00613 {
00614 String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00615 new String[]{proc.getLogin(), backend.getName()});
00616 logger.error(msg);
00617 throw new SQLException(msg);
00618 }
00619
00620
00621 if (proc.isAutoCommit())
00622 {
00623 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00624
00625
00626
00627 waitForAllWritesToComplete(backend);
00628
00629
00630 Connection c = null;
00631 try
00632 {
00633 c = cm.getConnection();
00634 }
00635 catch (UnreachableBackendException e1)
00636 {
00637 logger.error(Translate.get(
00638 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00639 disableBackend(backend);
00640 throw new UnreachableBackendException(Translate.get(
00641 "loadbalancer.backend.unreacheable", backend.getName()));
00642 }
00643
00644
00645 if (c == null)
00646 throw new SQLException(Translate.get(
00647 "loadbalancer.backend.no.connection", backend.getName()));
00648
00649
00650 ControllerResultSet rs = null;
00651 try
00652 {
00653 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00654 backend, c, metadataCache);
00655 }
00656 catch (Exception e)
00657 {
00658 throw new SQLException(Translate.get(
00659 "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00660 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00661 backend.getName(), e.getMessage()}));
00662 }
00663 finally
00664 {
00665 cm.releaseConnection(c);
00666 }
00667 if (logger.isDebugEnabled())
00668 logger.debug(Translate.get("loadbalancer.storedprocedure.on",
00669 new String[]{String.valueOf(proc.getId()), backend.getName()}));
00670 return rs;
00671 }
00672 else
00673 {
00674 Connection c;
00675 long tid = proc.getTransactionId();
00676 Long lTid = new Long(tid);
00677
00678
00679 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00680 waitForAllWritesToComplete(backend, proc.getTransactionId());
00681
00682 try
00683 {
00684 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00685 }
00686 catch (UnreachableBackendException e1)
00687 {
00688 logger.error(Translate.get(
00689 "loadbalancer.backend.disabling.unreachable", backend.getName()));
00690 disableBackend(backend);
00691 throw new SQLException(Translate.get(
00692 "loadbalancer.backend.unreacheable", backend.getName()));
00693 }
00694 catch (NoTransactionStartWhenDisablingException e)
00695 {
00696 String msg = Translate.get("loadbalancer.backend.is.disabling",
00697 new String[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00698 backend.getName()});
00699 logger.error(msg);
00700 throw new SQLException(msg);
00701 }
00702
00703
00704 if (c == null)
00705 throw new SQLException(Translate.get(
00706 "loadbalancer.unable.retrieve.connection", new String[]{
00707 String.valueOf(tid), backend.getName()}));
00708
00709
00710 ControllerResultSet rs;
00711 try
00712 {
00713 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00714 backend, c, metadataCache);
00715 }
00716 catch (Exception e)
00717 {
00718 throw new SQLException(Translate.get(
00719 "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00720 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00721 backend.getName(), e.getMessage()}));
00722 }
00723 if (logger.isDebugEnabled())
00724 logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00725 new String[]{String.valueOf(tid), String.valueOf(proc.getId()),
00726 backend.getName()}));
00727 return rs;
00728 }
00729 }
00730
00731
00732
00733
00734
00735 public ControllerResultSet execReadStoredProcedure(StoredProcedure proc,
00736 MetadataCache metadataCache) throws SQLException
00737 {
00738 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure(
00739 proc, true, metadataCache);
00740 return task.getResult();
00741 }
00742
00743
00744
00745
00746 public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException
00747 {
00748 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure(
00749 proc, false, null);
00750 return task.getResult();
00751 }
00752
00753
00754
00755
00756
00757
00758
00759
00760
00761
00762
00763 private AbstractTask callStoredProcedure(StoredProcedure proc,
00764 boolean isRead, MetadataCache metadataCache) throws SQLException
00765 {
00766 ArrayList backendThreads = backendBlockingThreads;
00767 ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock;
00768
00769 try
00770 {
00771
00772
00773
00774
00775
00776 lock.acquireWrite();
00777 }
00778 catch (InterruptedException e)
00779 {
00780 String msg;
00781 msg = Translate.get("loadbalancer.backendlist.acquire.writelock.failed",
00782 e);
00783 logger.error(msg);
00784 throw new SQLException(msg);
00785 }
00786
00787 int nbOfThreads = backendThreads.size();
00788
00789
00790 AbstractTask task;
00791 if (isRead)
00792 task = new ReadStoredProcedureTask(getNbToWait(nbOfThreads), nbOfThreads,
00793 proc, metadataCache);
00794 else
00795 task = new WriteStoredProcedureTask(getNbToWait(nbOfThreads),
00796 nbOfThreads, proc);
00797
00798 synchronized (task)
00799 {
00800 int nbOfBackends = 0;
00801
00802
00803 for (int i = 0; i < nbOfThreads; i++)
00804 {
00805 BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00806 .get(i);
00807 if (thread.getBackend().hasStoredProcedure(proc.getProcedureName()))
00808 {
00809 nbOfBackends++;
00810 synchronized (thread)
00811 {
00812 if (proc.isAutoCommit())
00813 thread.addTask(task);
00814 else
00815 thread.addTask(task, proc.getTransactionId());
00816 thread.notify();
00817 }
00818 }
00819 }
00820
00821 lock.releaseWrite();
00822
00823 if (nbOfBackends == 0)
00824 {
00825 throw new SQLException(Translate.get(
00826 "loadbalancer.backend.no.required.storedprocedure", proc
00827 .getProcedureName()));
00828 }
00829 else
00830 task.setTotalNb(nbOfBackends);
00831
00832
00833 try
00834 {
00835
00836 long timeout = proc.getTimeout() * 1000;
00837 if (timeout > 0)
00838 {
00839 long start = System.currentTimeMillis();
00840 task.wait(timeout);
00841 long end = System.currentTimeMillis();
00842 long remaining = timeout - (end - start);
00843 if (remaining <= 0)
00844 {
00845 if (task.setExpiredTimeout())
00846 {
00847 String msg = Translate.get(
00848 "loadbalancer.storedprocedure.timeout", new String[]{
00849 String.valueOf(proc.getId()),
00850 String.valueOf(task.getSuccess()),
00851 String.valueOf(task.getFailed())});
00852 logger.warn(msg);
00853 throw new SQLException(msg);
00854 }
00855
00856 }
00857
00858 }
00859 else
00860 task.wait();
00861 }
00862 catch (InterruptedException e)
00863 {
00864 if (task.setExpiredTimeout())
00865 {
00866 String msg = Translate.get("loadbalancer.storedprocedure.timeout",
00867 new String[]{String.valueOf(proc.getId()),
00868 String.valueOf(task.getSuccess()),
00869 String.valueOf(task.getFailed())});
00870 logger.warn(msg);
00871 throw new SQLException(msg);
00872 }
00873
00874 }
00875
00876 if (task.getSuccess() > 0)
00877 return task;
00878 else
00879 {
00880 ArrayList exceptions = task.getExceptions();
00881 if (exceptions == null)
00882 throw new SQLException(Translate.get(
00883 "loadbalancer.storedprocedure.all.failed", proc.getId()));
00884 else
00885 {
00886 String errorMsg = Translate.get(
00887 "loadbalancer.storedprocedure.failed.stack", proc.getId())
00888 + "\n";
00889 for (int i = 0; i < exceptions.size(); i++)
00890 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
00891 logger.error(errorMsg);
00892 throw new SQLException(errorMsg);
00893 }
00894 }
00895 }
00896 }
00897
00898
00899
00900
00901
00902
00903
00904
00905
00906
00907
00908 public final void begin(TransactionMarkerMetaData tm) throws SQLException
00909 {
00910 }
00911
00912
00913
00914
00915
00916
00917
00918 public void commit(TransactionMarkerMetaData tm) throws SQLException
00919 {
00920 long tid = tm.getTransactionId();
00921 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00922 waitForAllWritesToComplete(tid);
00923
00924 try
00925 {
00926 backendNonBlockingThreadsRWLock.acquireWrite();
00927 }
00928 catch (InterruptedException e)
00929 {
00930 String msg = Translate.get(
00931 "loadbalancer.backendlist.acquire.writelock.failed", e);
00932 logger.error(msg);
00933 throw new SQLException(msg);
00934 }
00935
00936 int nbOfThreads = backendNonBlockingThreads.size();
00937 ArrayList commitList = new ArrayList();
00938 Long iTid = new Long(tid);
00939
00940
00941 for (int i = 0; i < nbOfThreads; i++)
00942 {
00943 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
00944 .get(i);
00945 if (thread.getBackend().isStartedTransaction(iTid))
00946 commitList.add(thread);
00947 }
00948
00949 nbOfThreads = commitList.size();
00950 if (nbOfThreads == 0)
00951 {
00952 backendNonBlockingThreadsRWLock.releaseWrite();
00953 return;
00954 }
00955
00956
00957 CommitTask task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm
00958 .getTimeout(), tm.getLogin(), tid);
00959
00960 synchronized (task)
00961 {
00962
00963 for (int i = 0; i < nbOfThreads; i++)
00964 {
00965 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i);
00966 synchronized (thread)
00967 {
00968 thread.addTask(task, tid);
00969 thread.notify();
00970 }
00971 }
00972
00973 backendNonBlockingThreadsRWLock.releaseWrite();
00974
00975
00976 try
00977 {
00978
00979 long timeout = tm.getTimeout();
00980 if (timeout > 0)
00981 {
00982 long start = System.currentTimeMillis();
00983 task.wait(timeout);
00984 long end = System.currentTimeMillis();
00985 long remaining = timeout - (end - start);
00986 if (remaining <= 0)
00987 {
00988 if (task.setExpiredTimeout())
00989 {
00990 String msg = Translate.get("loadbalancer.commit.timeout",
00991 new String[]{String.valueOf(tid),
00992 String.valueOf(task.getSuccess()),
00993 String.valueOf(task.getFailed())});
00994 logger.warn(msg);
00995 throw new SQLException(msg);
00996 }
00997
00998 }
00999 }
01000 else
01001 task.wait();
01002 }
01003 catch (InterruptedException e)
01004 {
01005 if (task.setExpiredTimeout())
01006 {
01007 String msg = Translate.get("loadbalancer.commit.timeout",
01008 new String[]{String.valueOf(tid),
01009 String.valueOf(task.getSuccess()),
01010 String.valueOf(task.getFailed())});
01011 logger.warn(msg);
01012 throw new SQLException(msg);
01013 }
01014
01015 }
01016
01017 if (task.getSuccess() > 0)
01018 return;
01019 else
01020 {
01021 ArrayList exceptions = task.getExceptions();
01022 if (exceptions == null)
01023 throw new SQLException(Translate.get(
01024 "loadbalancer.commit.all.failed", tid));
01025 else
01026 {
01027 String errorMsg = Translate.get("loadbalancer.commit.failed.stack",
01028 tid)
01029 + "\n";
01030 for (int i = 0; i < exceptions.size(); i++)
01031 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
01032 logger.error(errorMsg);
01033 throw new SQLException(errorMsg);
01034 }
01035 }
01036 }
01037 }
01038
01039
01040
01041
01042
01043
01044
01045 public void rollback(TransactionMarkerMetaData tm) throws SQLException
01046 {
01047 long tid = tm.getTransactionId();
01048 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
01049 waitForAllWritesToComplete(tid);
01050
01051 try
01052 {
01053 backendNonBlockingThreadsRWLock.acquireWrite();
01054 }
01055 catch (InterruptedException e)
01056 {
01057 String msg = Translate.get(
01058 "loadbalancer.backendlist.acquire.writelock.failed", e);
01059 logger.error(msg);
01060 throw new SQLException(msg);
01061 }
01062 int nbOfThreads = backendNonBlockingThreads.size();
01063 ArrayList rollbackList = new ArrayList();
01064 Long iTid = new Long(tid);
01065
01066
01067 for (int i = 0; i < nbOfThreads; i++)
01068 {
01069 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
01070 .get(i);
01071 if (thread.getBackend().isStartedTransaction(iTid))
01072 rollbackList.add(thread);
01073 }
01074
01075 nbOfThreads = rollbackList.size();
01076 if (nbOfThreads == 0)
01077 {
01078 backendNonBlockingThreadsRWLock.releaseWrite();
01079 return;
01080 }
01081
01082
01083 RollbackTask task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads,
01084 tm.getTimeout(), tm.getLogin(), tid);
01085
01086 synchronized (task)
01087 {
01088
01089 for (int i = 0; i < nbOfThreads; i++)
01090 {
01091 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i);
01092 synchronized (thread)
01093 {
01094 thread.addTask(task, tid);
01095 thread.notify();
01096 }
01097 }
01098
01099 backendNonBlockingThreadsRWLock.releaseWrite();
01100
01101
01102 try
01103 {
01104
01105 long timeout = tm.getTimeout();
01106 if (timeout > 0)
01107 {
01108 long start = System.currentTimeMillis();
01109 task.wait(timeout);
01110 long end = System.currentTimeMillis();
01111 long remaining = timeout - (end - start);
01112 if (remaining <= 0)
01113 {
01114 if (task.setExpiredTimeout())
01115 {
01116 String msg = Translate.get("loadbalancer.rollback.timeout",
01117 new String[]{String.valueOf(tid),
01118 String.valueOf(task.getSuccess()),
01119 String.valueOf(task.getFailed())});
01120 logger.warn(msg);
01121 throw new SQLException(msg);
01122 }
01123
01124 }
01125 }
01126 else
01127 task.wait();
01128 }
01129 catch (InterruptedException e)
01130 {
01131 if (task.setExpiredTimeout())
01132 {
01133 String msg = Translate.get("loadbalancer.rollback.timeout",
01134 new String[]{String.valueOf(tid),
01135 String.valueOf(task.getSuccess()),
01136 String.valueOf(task.getFailed())});
01137 logger.warn(msg);
01138 throw new SQLException(msg);
01139 }
01140
01141 }
01142
01143 if (task.getSuccess() > 0)
01144 return;
01145 else
01146 {
01147 ArrayList exceptions = task.getExceptions();
01148 if (exceptions == null)
01149 throw new SQLException(Translate.get(
01150 "loadbalancer.rollback.all.failed", tid));
01151 else
01152 {
01153 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack",
01154 tid)
01155 + "\n";
01156 for (int i = 0; i < exceptions.size(); i++)
01157 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
01158 logger.error(errorMsg);
01159 throw new SQLException(errorMsg);
01160 }
01161 }
01162 }
01163 }
01164
01165
01166
01167
01168
01169 protected void waitForAllWritesToComplete(long transactionId)
01170 throws SQLException
01171 {
01172 try
01173 {
01174 backendBlockingThreadsRWLock.acquireWrite();
01175
01176
01177
01178 }
01179 catch (InterruptedException e)
01180 {
01181 String msg = Translate.get(
01182 "loadbalancer.backendlist.acquire.writelock.failed", e);
01183 logger.error(msg);
01184 throw new SQLException(msg);
01185 }
01186
01187 int nbOfThreads = backendBlockingThreads.size();
01188
01189 for (int i = 0; i < nbOfThreads; i++)
01190 {
01191 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01192 .get(i);
01193 thread.waitForAllTasksToComplete(transactionId);
01194 }
01195
01196 backendBlockingThreadsRWLock.releaseWrite();
01197 }
01198
01199
01200
01201
01202
01203
01204
01205
01206 protected void waitForAllWritesToComplete(DatabaseBackend backend,
01207 long transactionId) throws SQLException
01208 {
01209 try
01210 {
01211 backendBlockingThreadsRWLock.acquireWrite();
01212
01213
01214
01215 }
01216 catch (InterruptedException e)
01217 {
01218 String msg = Translate.get(
01219 "loadbalancer.backendlist.acquire.writelock.failed", e);
01220 logger.error(msg);
01221 throw new SQLException(msg);
01222 }
01223
01224 int nbOfThreads = backendBlockingThreads.size();
01225
01226 for (int i = 0; i < nbOfThreads; i++)
01227 {
01228 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01229 .get(i);
01230 if (thread.getBackend() == backend)
01231 thread.waitForAllTasksToComplete(transactionId);
01232 }
01233
01234 backendBlockingThreadsRWLock.releaseWrite();
01235 }
01236
01237
01238
01239
01240
01241
01242
01243 protected void waitForAllWritesToComplete(DatabaseBackend backend)
01244 throws SQLException
01245 {
01246 try
01247 {
01248 backendBlockingThreadsRWLock.acquireWrite();
01249
01250
01251
01252 }
01253 catch (InterruptedException e)
01254 {
01255 String msg = Translate.get(
01256 "loadbalancer.backendlist.acquire.writelock.failed", e);
01257 logger.error(msg);
01258 throw new SQLException(msg);
01259 }
01260
01261 int nbOfThreads = backendBlockingThreads.size();
01262
01263 for (int i = 0; i < nbOfThreads; i++)
01264 {
01265 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01266 .get(i);
01267 if (thread.getBackend() == backend)
01268 thread.waitForAllTasksToComplete();
01269 }
01270
01271 backendBlockingThreadsRWLock.releaseWrite();
01272 }
01273
01274
01275
01276
01277
01278
01279
01280
01281
01282
01283
01284
01285
01286
01287
01288
01289
01290 public void enableBackend(DatabaseBackend db, boolean writeEnabled)
01291 throws SQLException
01292 {
01293 if (writeEnabled)
01294 {
01295
01296 BackendWorkerThread blockingThread = new BackendWorkerThread(db, this);
01297 BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this);
01298
01299
01300 try
01301 {
01302 backendBlockingThreadsRWLock.acquireWrite();
01303 }
01304 catch (InterruptedException e)
01305 {
01306 String msg = Translate.get(
01307 "loadbalancer.backendlist.acquire.writelock.failed", e);
01308 logger.error(msg);
01309 throw new SQLException(msg);
01310 }
01311 backendBlockingThreads.add(blockingThread);
01312 backendBlockingThreadsRWLock.releaseWrite();
01313 blockingThread.start();
01314 logger.info(Translate.get(
01315 "loadbalancer.backend.workerthread.blocking.add", db.getName()));
01316
01317
01318 try
01319 {
01320 backendNonBlockingThreadsRWLock.acquireWrite();
01321 }
01322 catch (InterruptedException e)
01323 {
01324 String msg = Translate.get(
01325 "loadbalancer.backendlist.acquire.writelock.failed", e);
01326 logger.error(msg);
01327 throw new SQLException(msg);
01328 }
01329 backendNonBlockingThreads.add(nonBlockingThread);
01330 backendNonBlockingThreadsRWLock.releaseWrite();
01331 nonBlockingThread.start();
01332 logger.info(Translate.get(
01333 "loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
01334 db.enableWrite();
01335 }
01336
01337 if (!db.isInitialized())
01338 db.initializeConnections();
01339 db.enableRead();
01340 }
01341
01342
01343
01344
01345
01346
01347
01348
01349
01350
01351
01352
01353 public synchronized void disableBackend(DatabaseBackend db)
01354 throws SQLException
01355 {
01356 if (db.isWriteEnabled())
01357 {
01358
01359 try
01360 {
01361 backendBlockingThreadsRWLock.acquireWrite();
01362 }
01363 catch (InterruptedException e)
01364 {
01365 String msg = Translate.get(
01366 "loadbalancer.backendlist.acquire.writelock.failed", e);
01367 logger.error(msg);
01368 throw new SQLException(msg);
01369 }
01370
01371 int nbOfThreads = backendBlockingThreads.size();
01372
01373
01374 for (int i = 0; i < nbOfThreads; i++)
01375 {
01376 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01377 .get(i);
01378 if (thread.getBackend().equals(db))
01379 {
01380 logger.info(Translate
01381 .get("loadbalancer.backend.workerthread.blocking.remove", db
01382 .getName()));
01383
01384 backendBlockingThreads.remove(thread);
01385
01386 synchronized (thread)
01387 {
01388
01389 thread.addPriorityTask(new KillThreadTask(1, 1));
01390 thread.notify();
01391 }
01392 break;
01393 }
01394 }
01395
01396 backendBlockingThreadsRWLock.releaseWrite();
01397
01398
01399
01400 try
01401 {
01402 backendNonBlockingThreadsRWLock.acquireWrite();
01403 }
01404 catch (InterruptedException e)
01405 {
01406 String msg = Translate.get(
01407 "loadbalancer.backendlist.acquire.writelock.failed", e);
01408 logger.error(msg);
01409 throw new SQLException(msg);
01410 }
01411
01412
01413 nbOfThreads = backendNonBlockingThreads.size();
01414 for (int i = 0; i < nbOfThreads; i++)
01415 {
01416 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
01417 .get(i);
01418 if (thread.getBackend().equals(db))
01419 {
01420 logger.info(Translate.get(
01421 "loadbalancer.backend.workerthread.non.blocking.remove", db
01422 .getName()));
01423
01424 backendNonBlockingThreads.remove(thread);
01425
01426 synchronized (thread)
01427 {
01428
01429 thread.addPriorityTask(new KillThreadTask(1, 1));
01430 thread.notify();
01431 }
01432 break;
01433 }
01434 }
01435
01436 backendNonBlockingThreadsRWLock.releaseWrite();
01437 }
01438
01439 db.disable();
01440 if (db.isInitialized())
01441 db.finalizeConnections();
01442 }
01443
01444
01445
01446
01447 public String getXmlImpl()
01448 {
01449 StringBuffer info = new StringBuffer();
01450 info.append("<" + DatabasesXmlTags.ELT_RAIDb_2 + ">");
01451 if (createTablePolicy != null)
01452 info.append(createTablePolicy.getXml());
01453 if (waitForCompletionPolicy != null)
01454 info.append(waitForCompletionPolicy.getXml());
01455 if (macroHandler != null)
01456 info.append(macroHandler.getXml());
01457 this.getRaidb2Xml();
01458 info.append("</" + DatabasesXmlTags.ELT_RAIDb_2 + ">");
01459 return info.toString();
01460 }
01461
01462
01463
01464
01465
01466
01467 public abstract String getRaidb2Xml();
01468
01469 }