Public Member Functions | |
RAIDb1 (VirtualDatabase vdb, WaitForCompletionPolicy waitForCompletionPolicy) throws Exception | |
abstract ControllerResultSet | execReadRequest (SelectRequest request, MetadataCache metadataCache) throws SQLException |
int | execWriteRequest (AbstractWriteRequest request) throws AllBackendsFailedException, NoMoreBackendException, SQLException |
ControllerResultSet | execWriteRequestWithKeys (AbstractWriteRequest request, MetadataCache metadataCache) throws AllBackendsFailedException, SQLException |
ControllerResultSet | execReadStoredProcedure (StoredProcedure proc, MetadataCache metadataCache) throws SQLException |
int | execWriteStoredProcedure (StoredProcedure proc) throws SQLException |
final void | begin (TransactionMarkerMetaData tm) throws SQLException |
void | commit (TransactionMarkerMetaData tm) throws SQLException |
void | rollback (TransactionMarkerMetaData tm) throws SQLException |
void | enableBackend (DatabaseBackend db, boolean writeEnabled) throws SQLException |
synchronized void | disableBackend (DatabaseBackend db) throws SQLException |
String | getXmlImpl () |
abstract String | getRaidb1Xml () |
Protected Member Functions | |
ControllerResultSet | executeRequestOnBackend (SelectRequest request, DatabaseBackend backend, MetadataCache metadataCache) throws SQLException, UnreachableBackendException |
ControllerResultSet | executeStoredProcedureOnBackend (StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache) throws SQLException, UnreachableBackendException |
void | waitForAllWritesToComplete (long transactionId) throws SQLException |
void | waitForAllWritesToComplete (DatabaseBackend backend, long transactionId) throws SQLException |
void | waitForAllWritesToComplete (DatabaseBackend backend) throws SQLException |
Protected Attributes | |
ArrayList | backendBlockingThreads |
ArrayList | backendNonBlockingThreads |
ReadPrioritaryFIFOWriteLock | backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock() |
ReadPrioritaryFIFOWriteLock | backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock() |
WaitForCompletionPolicy | waitForCompletionPolicy |
Static Protected Attributes | |
Trace | logger |
This class is an abstract call because the read requests coming from the request controller are NOT treated here but in the subclasses. Transaction management and write requests are broadcasted to all backends.
Definition at line 74 of file RAIDb1.java.
|
Creates a new RAIDb-1 Round Robin request load balancer. A new backend worker thread is created for each backend.
Definition at line 119 of file RAIDb1.java. References org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.backendBlockingThreads, and org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.backendNonBlockingThreads. 00121 { 00122 super(vdb, RAIDbLevels.RAIDb1, ParsingGranularities.NO_PARSING); 00123 this.waitForCompletionPolicy = waitForCompletionPolicy; 00124 backendBlockingThreads = new ArrayList(); 00125 backendNonBlockingThreads = new ArrayList(); 00126 }
|
|
Begins a new transaction.
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 867 of file RAIDb1.java. 00868 { 00869 }
|
|
Commits a transaction.
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 877 of file RAIDb1.java. References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.hasTaskForTransaction(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.insertTaskAfterLastWriteForTransaction(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction(), and org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.setExpiredTimeout(). 00878 { 00879 long tid = tm.getTransactionId(); 00880 Long lTid = new Long(tid); 00881 // List of backends that still have pending queries for the transaction to 00882 // commit 00883 ArrayList asynchronousBackends = null; 00884 CommitTask task = null; 00885 00886 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00887 { 00888 try 00889 { 00890 backendBlockingThreadsRWLock.acquireWrite(); 00891 // Lock in write to ensure that all writes are posted and we wait in the 00892 // queue, else a read lock has the priority with the implementation we 00893 // are using. 00894 } 00895 catch (InterruptedException e) 00896 { 00897 String msg = Translate.get( 00898 "loadbalancer.backendlist.acquire.writelock.failed", e); 00899 logger.error(msg); 00900 throw new SQLException(msg); 00901 } 00902 00903 int nbOfThreads = backendBlockingThreads.size(); 00904 // Create the task 00905 task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm 00906 .getTimeout(), tm.getLogin(), tid); 00907 00908 for (int i = 0; i < nbOfThreads; i++) 00909 { 00910 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 00911 .get(i); 00912 if (thread.hasTaskForTransaction(lTid)) 00913 { 00914 if (asynchronousBackends == null) 00915 asynchronousBackends = new ArrayList(); 00916 asynchronousBackends.add(thread.getBackend()); 00917 synchronized (thread) 00918 { 00919 thread.insertTaskAfterLastWriteForTransaction(task, lTid); 00920 thread.notify(); 00921 } 00922 } 00923 } 00924 00925 backendBlockingThreadsRWLock.releaseWrite(); 00926 } 00927 00928 try 00929 { 00930 backendNonBlockingThreadsRWLock.acquireWrite(); 00931 } 00932 catch (InterruptedException e) 00933 { 00934 String msg = Translate.get( 00935 "loadbalancer.backendlist.acquire.writelock.failed", e); 00936 logger.error(msg); 00937 throw new SQLException(msg); 00938 } 00939 00940 int nbOfThreads = backendNonBlockingThreads.size(); 00941 ArrayList commitList = new ArrayList(); 00942 00943 // Build the list of backends that need to commit this transaction 00944 for (int i = 0; i < nbOfThreads; i++) 00945 { 00946 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 00947 .get(i); 00948 DatabaseBackend backend = thread.getBackend(); 00949 // If the transaction has been started on this backend and it was not 00950 // previously treated in the asynchronous backend list (late nodes), then 00951 // we have to post the task now in the asynchronous list. 00952 if (backend.isStartedTransaction(lTid) 00953 && ((asynchronousBackends == null) || (!asynchronousBackends 00954 .contains(backend)))) 00955 commitList.add(thread); 00956 } 00957 00958 nbOfThreads = commitList.size(); 00959 if (nbOfThreads == 0) 00960 { 00961 backendNonBlockingThreadsRWLock.releaseWrite(); 00962 return; 00963 } 00964 00965 if (task == null) 00966 task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm 00967 .getTimeout(), tm.getLogin(), tid); 00968 00969 synchronized (task) 00970 { 00971 // Post the task in each backendThread tasklist and wakeup the threads 00972 for (int i = 0; i < nbOfThreads; i++) 00973 { 00974 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 00975 synchronized (thread) 00976 { 00977 thread.addTask(task, tid); 00978 thread.notify(); 00979 } 00980 } 00981 00982 backendNonBlockingThreadsRWLock.releaseWrite(); 00983 00984 // Wait for completion (notified by the task) 00985 try 00986 { 00987 // Wait on task 00988 long timeout = tm.getTimeout(); 00989 if (timeout > 0) 00990 { 00991 long start = System.currentTimeMillis(); 00992 task.wait(timeout); 00993 long end = System.currentTimeMillis(); 00994 long remaining = timeout - (end - start); 00995 if (remaining <= 0) 00996 { 00997 if (task.setExpiredTimeout()) 00998 { // Task will be ignored by all backends 00999 String msg = Translate.get("loadbalancer.commit.timeout", 01000 new String[]{String.valueOf(tid), 01001 String.valueOf(task.getSuccess()), 01002 String.valueOf(task.getFailed())}); 01003 logger.warn(msg); 01004 throw new SQLException(msg); 01005 } 01006 // else task execution already started, to late to cancel 01007 } 01008 } 01009 else 01010 task.wait(); 01011 } 01012 catch (InterruptedException e) 01013 { 01014 if (task.setExpiredTimeout()) 01015 { // Task will be ignored by all backends 01016 String msg = Translate.get("loadbalancer.commit.timeout", 01017 new String[]{String.valueOf(tid), 01018 String.valueOf(task.getSuccess()), 01019 String.valueOf(task.getFailed())}); 01020 logger.warn(msg); 01021 throw new SQLException(msg); 01022 } 01023 // else task execution already started, to late to cancel 01024 } 01025 01026 if (task.getSuccess() > 0) 01027 return; 01028 else 01029 { // All tasks failed 01030 ArrayList exceptions = task.getExceptions(); 01031 if (exceptions == null) 01032 throw new SQLException(Translate.get( 01033 "loadbalancer.commit.all.failed", tid)); 01034 else 01035 { 01036 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 01037 tid) 01038 + "\n"; 01039 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 01040 errorMsg); 01041 logger.error(ex.getMessage()); 01042 throw ex; 01043 } 01044 } 01045 } 01046 }
|
|
Disables a backend that was previously enabled. Ask the corresponding connection manager to finalize the connections if needed. No sanity checks are performed by this function.
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Reimplemented in org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec. Definition at line 1412 of file RAIDb1.java. References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.equals(), and org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(). 01414 { 01415 if (db.isWriteEnabled()) 01416 { 01417 // Starts with backendBlockingThreads 01418 try 01419 { 01420 backendBlockingThreadsRWLock.acquireWrite(); 01421 } 01422 catch (InterruptedException e) 01423 { 01424 String msg = Translate.get( 01425 "loadbalancer.backendlist.acquire.writelock.failed", e); 01426 logger.error(msg); 01427 throw new SQLException(msg); 01428 } 01429 01430 int nbOfThreads = backendBlockingThreads.size(); 01431 01432 // Find the right blocking thread 01433 for (int i = 0; i < nbOfThreads; i++) 01434 { 01435 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01436 .get(i); 01437 if (thread.getBackend().equals(db)) 01438 { 01439 logger.info(Translate 01440 .get("loadbalancer.backend.workerthread.blocking.remove", db 01441 .getName())); 01442 01443 // Remove it from the backendBlockingThread list 01444 backendBlockingThreads.remove(thread); 01445 01446 synchronized (thread) 01447 { 01448 // Kill the thread 01449 thread.addPriorityTask(new KillThreadTask(1, 1)); 01450 thread.notify(); 01451 } 01452 break; 01453 } 01454 } 01455 01456 backendBlockingThreadsRWLock.releaseWrite(); 01457 01458 // Continue with backendNonBlockingThreads 01459 01460 try 01461 { 01462 backendNonBlockingThreadsRWLock.acquireWrite(); 01463 } 01464 catch (InterruptedException e) 01465 { 01466 String msg = Translate.get( 01467 "loadbalancer.backendlist.acquire.writelock.failed", e); 01468 logger.error(msg); 01469 throw new SQLException(msg); 01470 } 01471 01472 // Find the right non-blocking thread 01473 nbOfThreads = backendNonBlockingThreads.size(); 01474 for (int i = 0; i < nbOfThreads; i++) 01475 { 01476 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01477 .get(i); 01478 if (thread.getBackend().equals(db)) 01479 { 01480 logger.info(Translate.get( 01481 "loadbalancer.backend.workerthread.non.blocking.remove", db 01482 .getName())); 01483 01484 // Remove it from the backendNonBlockingThreads list 01485 backendNonBlockingThreads.remove(thread); 01486 01487 synchronized (thread) 01488 { 01489 // Kill the thread 01490 thread.addPriorityTask(new KillThreadTask(1, 1)); 01491 thread.notify(); 01492 } 01493 break; 01494 } 01495 } 01496 01497 backendNonBlockingThreadsRWLock.releaseWrite(); 01498 } 01499 01500 db.disable(); 01501 if (db.isInitialized()) 01502 db.finalizeConnections(); 01503 }
|
|
Enables a Backend that was previously disabled. Ask the corresponding connection manager to initialize the connections if needed. No sanity checks are performed by this function.
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Reimplemented in org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec. Definition at line 1349 of file RAIDb1.java. 01351 { 01352 if (writeEnabled && db.isWriteCanBeEnabled()) 01353 { 01354 // Create 2 worker threads 01355 BackendWorkerThread blockingThread = new BackendWorkerThread(db, this); 01356 BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this); 01357 01358 // Add first to the blocking thread list 01359 try 01360 { 01361 backendBlockingThreadsRWLock.acquireWrite(); 01362 } 01363 catch (InterruptedException e) 01364 { 01365 String msg = Translate.get( 01366 "loadbalancer.backendlist.acquire.writelock.failed", e); 01367 logger.error(msg); 01368 throw new SQLException(msg); 01369 } 01370 backendBlockingThreads.add(blockingThread); 01371 backendBlockingThreadsRWLock.releaseWrite(); 01372 blockingThread.start(); 01373 logger.info(Translate.get( 01374 "loadbalancer.backend.workerthread.blocking.add", db.getName())); 01375 01376 // Then add to the non-blocking thread list 01377 try 01378 { 01379 backendNonBlockingThreadsRWLock.acquireWrite(); 01380 } 01381 catch (InterruptedException e) 01382 { 01383 String msg = Translate.get( 01384 "loadbalancer.backendlist.acquire.writelock.failed", e); 01385 logger.error(msg); 01386 throw new SQLException(msg); 01387 } 01388 backendNonBlockingThreads.add(nonBlockingThread); 01389 backendNonBlockingThreadsRWLock.releaseWrite(); 01390 nonBlockingThread.start(); 01391 logger.info(Translate.get( 01392 "loadbalancer.backend.workerthread.non.blocking.add", db.getName())); 01393 db.enableWrite(); 01394 } 01395 01396 if (!db.isInitialized()) 01397 db.initializeConnections(); 01398 db.enableRead(); 01399 }
|
|
|
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 696 of file RAIDb1.java. References org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask.getResult(). 00698 { 00699 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure( 00700 proc, true, metadataCache); 00701 return task.getResult(); 00702 }
|
|
Execute a read request on the selected backend.
Definition at line 175 of file RAIDb1.java. References org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.handleMacros(). 00178 { 00179 // Handle macros 00180 handleMacros(request); 00181 00182 // Ok, we have a backend, let's execute the request 00183 AbstractConnectionManager cm = backend.getConnectionManager(request 00184 .getLogin()); 00185 00186 // Sanity check 00187 if (cm == null) 00188 { 00189 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00190 new String[]{request.getLogin(), backend.getName()}); 00191 logger.error(msg); 00192 throw new SQLException(msg); 00193 } 00194 00195 // Execute the query 00196 if (request.isAutoCommit()) 00197 { 00198 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00199 // We could do something finer grain here by waiting 00200 // only for writes that depend on the tables we need 00201 // but is that really worth the overhead ? 00202 waitForAllWritesToComplete(backend); 00203 00204 ControllerResultSet rs = null; 00205 boolean badConnection; 00206 do 00207 { 00208 badConnection = false; 00209 // Use a connection just for this request 00210 Connection c = null; 00211 try 00212 { 00213 c = cm.getConnection(); 00214 } 00215 catch (UnreachableBackendException e1) 00216 { 00217 logger.error(Translate.get( 00218 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00219 disableBackend(backend); 00220 throw new UnreachableBackendException(Translate.get( 00221 "loadbalancer.backend.unreacheable", backend.getName())); 00222 } 00223 00224 // Sanity check 00225 if (c == null) 00226 throw new SQLException(Translate.get( 00227 "loadbalancer.backend.no.connection", backend.getName())); 00228 00229 // Execute Query 00230 try 00231 { 00232 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 00233 cm.releaseConnection(c); 00234 } 00235 catch (SQLException e) 00236 { 00237 cm.releaseConnection(c); 00238 throw SQLExceptionFactory.getSQLException(e, Translate.get( 00239 "loadbalancer.request.failed.on.backend", new String[]{ 00240 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00241 backend.getName(), e.getMessage()})); 00242 } 00243 catch (BadConnectionException e) 00244 { // Get rid of the bad connection 00245 cm.deleteConnection(c); 00246 badConnection = true; 00247 } 00248 } 00249 while (badConnection); 00250 if (logger.isDebugEnabled()) 00251 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00252 String.valueOf(request.getId()), backend.getName()})); 00253 return rs; 00254 } 00255 else 00256 { // Inside a transaction 00257 Connection c; 00258 long tid = request.getTransactionId(); 00259 Long lTid = new Long(tid); 00260 00261 // Wait for previous writes to complete 00262 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00263 waitForAllWritesToComplete(backend, request.getTransactionId()); 00264 00265 try 00266 { 00267 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm); 00268 } 00269 catch (UnreachableBackendException e1) 00270 { 00271 logger.error(Translate.get( 00272 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00273 disableBackend(backend); 00274 throw new SQLException(Translate.get( 00275 "loadbalancer.backend.unreacheable", backend.getName())); 00276 } 00277 catch (NoTransactionStartWhenDisablingException e) 00278 { 00279 String msg = Translate.get("loadbalancer.backend.is.disabling", 00280 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()), 00281 backend.getName()}); 00282 logger.error(msg); 00283 throw new SQLException(msg); 00284 } 00285 00286 // Sanity check 00287 if (c == null) 00288 throw new SQLException(Translate.get( 00289 "loadbalancer.unable.retrieve.connection", new String[]{ 00290 String.valueOf(tid), backend.getName()})); 00291 00292 // Execute Query 00293 ControllerResultSet rs = null; 00294 try 00295 { 00296 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 00297 } 00298 catch (SQLException e) 00299 { 00300 throw SQLExceptionFactory.getSQLException(e, Translate.get( 00301 "loadbalancer.request.failed.on.backend", new String[]{ 00302 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00303 backend.getName(), e.getMessage()})); 00304 } 00305 catch (BadConnectionException e) 00306 { // Connection failed, so did the transaction 00307 // Disable the backend. 00308 cm.deleteConnection(tid); 00309 String msg = Translate.get( 00310 "loadbalancer.backend.disabling.connection.failure", backend 00311 .getName()); 00312 logger.error(msg); 00313 disableBackend(backend); 00314 throw new SQLException(msg); 00315 } 00316 if (logger.isDebugEnabled()) 00317 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00318 new String[]{String.valueOf(tid), String.valueOf(request.getId()), 00319 backend.getName()})); 00320 return rs; 00321 } 00322 }
|
|
Execute a stored procedure on the selected backend.
Definition at line 561 of file RAIDb1.java. 00564 { 00565 // Handle macros 00566 handleMacros(proc); 00567 00568 // Ok, we have a backend, let's execute the request 00569 AbstractConnectionManager cm = backend 00570 .getConnectionManager(proc.getLogin()); 00571 00572 // Sanity check 00573 if (cm == null) 00574 { 00575 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00576 new String[]{proc.getLogin(), backend.getName()}); 00577 logger.error(msg); 00578 throw new SQLException(msg); 00579 } 00580 00581 // Execute the query 00582 if (proc.isAutoCommit()) 00583 { 00584 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00585 // We could do something finer grain here by waiting 00586 // only for writes that depend on the tables we need 00587 // but is that really worth the overhead ? 00588 waitForAllWritesToComplete(backend); 00589 00590 // Use a connection just for this request 00591 Connection c = null; 00592 try 00593 { 00594 c = cm.getConnection(); 00595 } 00596 catch (UnreachableBackendException e1) 00597 { 00598 logger.error(Translate.get( 00599 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00600 disableBackend(backend); 00601 throw new UnreachableBackendException(Translate.get( 00602 "loadbalancer.backend.unreacheable", backend.getName())); 00603 } 00604 00605 // Sanity check 00606 if (c == null) 00607 throw new UnreachableBackendException(Translate.get( 00608 "loadbalancer.backend.no.connection", backend.getName())); 00609 00610 // Execute Query 00611 ControllerResultSet rs = null; 00612 try 00613 { 00614 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 00615 backend, c, metadataCache); 00616 } 00617 catch (Exception e) 00618 { 00619 throw new SQLException(Translate.get( 00620 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00621 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00622 backend.getName(), e.getMessage()})); 00623 } 00624 finally 00625 { 00626 cm.releaseConnection(c); 00627 } 00628 if (logger.isDebugEnabled()) 00629 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 00630 new String[]{String.valueOf(proc.getId()), backend.getName()})); 00631 return rs; 00632 } 00633 else 00634 { // Inside a transaction 00635 Connection c; 00636 long tid = proc.getTransactionId(); 00637 Long lTid = new Long(tid); 00638 00639 // Wait for previous writes to complete 00640 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00641 waitForAllWritesToComplete(backend, proc.getTransactionId()); 00642 00643 try 00644 { 00645 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm); 00646 } 00647 catch (UnreachableBackendException e1) 00648 { 00649 logger.error(Translate.get( 00650 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00651 disableBackend(backend); 00652 throw new SQLException(Translate.get( 00653 "loadbalancer.backend.unreacheable", backend.getName())); 00654 } 00655 catch (NoTransactionStartWhenDisablingException e) 00656 { 00657 String msg = Translate.get("loadbalancer.backend.is.disabling", 00658 new String[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00659 backend.getName()}); 00660 logger.error(msg); 00661 throw new SQLException(msg); 00662 } 00663 00664 // Sanity check 00665 if (c == null) 00666 throw new SQLException(Translate.get( 00667 "loadbalancer.unable.retrieve.connection", new String[]{ 00668 String.valueOf(tid), backend.getName()})); 00669 00670 // Execute Query 00671 ControllerResultSet rs; 00672 try 00673 { 00674 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 00675 backend, c, metadataCache); 00676 } 00677 catch (Exception e) 00678 { 00679 throw new SQLException(Translate.get( 00680 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00681 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00682 backend.getName(), e.getMessage()})); 00683 } 00684 if (logger.isDebugEnabled()) 00685 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00686 new String[]{String.valueOf(tid), String.valueOf(proc.getId()), 00687 backend.getName()})); 00688 return rs; 00689 } 00690 }
|
|
Performs a write request. This request is broadcasted to all nodes.
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 335 of file RAIDb1.java. 00337 { 00338 return ((WriteRequestTask) execWriteRequest(request, false, null)) 00339 .getResult(); 00340 }
|
|
Perform a write request and return the auto generated keys.
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 352 of file RAIDb1.java. 00355 { 00356 return ((WriteRequestWithKeysTask) execWriteRequest(request, true, 00357 metadataCache)).getResult(); 00358 }
|
|
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 707 of file RAIDb1.java. References org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask.getResult(). 00708 { 00709 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure( 00710 proc, false, null); 00711 return task.getResult(); 00712 }
|
|
Surrounding raidb1 tags can be treated by <method>getXmlImpl </method> above, but more detailed content have to be returned by the method <method>getRaidb1Xml </method> below.
Implemented in org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_RR, and org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRR. |
|
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Reimplemented in org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec. Definition at line 1508 of file RAIDb1.java. 01509 { 01510 StringBuffer info = new StringBuffer(); 01511 info.append("<" + DatabasesXmlTags.ELT_RAIDb_1 + ">"); 01512 if (waitForCompletionPolicy != null) 01513 info.append(waitForCompletionPolicy.getXml()); 01514 if (macroHandler != null) 01515 info.append(macroHandler.getXml()); 01516 info.append(getRaidb1Xml()); 01517 info.append("</" + DatabasesXmlTags.ELT_RAIDb_1 + ">"); 01518 return info.toString(); 01519 }
|
|
Rollbacks a transaction.
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 1054 of file RAIDb1.java. References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.hasTaskForTransaction(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.insertTaskAfterLastWriteForTransaction(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction(), and org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.setExpiredTimeout(). 01055 { 01056 long tid = tm.getTransactionId(); 01057 Long lTid = new Long(tid); 01058 // List of backends that still have pending queries for the transaction to 01059 // commit 01060 ArrayList asynchronousBackends = null; 01061 RollbackTask task = null; 01062 01063 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 01064 { 01065 try 01066 { 01067 backendBlockingThreadsRWLock.acquireWrite(); 01068 // Lock in write to ensure that all writes are posted and we wait in the 01069 // queue, else a read lock has the priority with the implementation we 01070 // are using. 01071 } 01072 catch (InterruptedException e) 01073 { 01074 String msg = Translate.get( 01075 "loadbalancer.backendlist.acquire.writelock.failed", e); 01076 logger.error(msg); 01077 throw new SQLException(msg); 01078 } 01079 01080 int nbOfThreads = backendBlockingThreads.size(); 01081 // Create the task 01082 task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, tm 01083 .getTimeout(), tm.getLogin(), tid); 01084 01085 for (int i = 0; i < nbOfThreads; i++) 01086 { 01087 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01088 .get(i); 01089 if (thread.hasTaskForTransaction(lTid)) 01090 { 01091 if (asynchronousBackends == null) 01092 asynchronousBackends = new ArrayList(); 01093 asynchronousBackends.add(thread.getBackend()); 01094 synchronized (thread) 01095 { 01096 thread.insertTaskAfterLastWriteForTransaction(task, lTid); 01097 thread.notify(); 01098 } 01099 } 01100 } 01101 01102 backendBlockingThreadsRWLock.releaseWrite(); 01103 } 01104 01105 try 01106 { 01107 backendNonBlockingThreadsRWLock.acquireWrite(); 01108 } 01109 catch (InterruptedException e) 01110 { 01111 String msg = Translate.get( 01112 "loadbalancer.backendlist.acquire.writelock.failed", e); 01113 logger.error(msg); 01114 throw new SQLException(msg); 01115 } 01116 int nbOfThreads = backendNonBlockingThreads.size(); 01117 ArrayList rollbackList = new ArrayList(); 01118 01119 // Build the list of backend that need to rollback this transaction 01120 for (int i = 0; i < nbOfThreads; i++) 01121 { 01122 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01123 .get(i); 01124 DatabaseBackend backend = thread.getBackend(); 01125 // If the transaction has been started on this backend and it was not 01126 // previously treated in the asynchronous backend list (late nodes), then 01127 // we have to post the task now in the asynchronous list. 01128 if (backend.isStartedTransaction(lTid) 01129 && ((asynchronousBackends == null) || (!asynchronousBackends 01130 .contains(backend)))) 01131 rollbackList.add(thread); 01132 } 01133 01134 nbOfThreads = rollbackList.size(); 01135 if (nbOfThreads == 0) 01136 { 01137 backendNonBlockingThreadsRWLock.releaseWrite(); 01138 return; 01139 } 01140 01141 if (task == null) 01142 task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, tm 01143 .getTimeout(), tm.getLogin(), tid); 01144 01145 synchronized (task) 01146 { 01147 // Post the task in each backendThread tasklist and wakeup the threads 01148 for (int i = 0; i < nbOfThreads; i++) 01149 { 01150 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 01151 synchronized (thread) 01152 { 01153 thread.addTask(task, tid); 01154 thread.notify(); 01155 } 01156 } 01157 01158 backendNonBlockingThreadsRWLock.releaseWrite(); 01159 01160 // Wait for completion (notified by the task) 01161 try 01162 { 01163 // Wait on task 01164 long timeout = tm.getTimeout(); 01165 if (timeout > 0) 01166 { 01167 long start = System.currentTimeMillis(); 01168 task.wait(timeout); 01169 long end = System.currentTimeMillis(); 01170 long remaining = timeout - (end - start); 01171 if (remaining <= 0) 01172 { 01173 if (task.setExpiredTimeout()) 01174 { // Task will be ignored by all backends 01175 String msg = Translate.get("loadbalancer.rollback.timeout", 01176 new String[]{String.valueOf(tid), 01177 String.valueOf(task.getSuccess()), 01178 String.valueOf(task.getFailed())}); 01179 logger.warn(msg); 01180 throw new SQLException(msg); 01181 } 01182 // else task execution already started, to late to cancel 01183 } 01184 } 01185 else 01186 task.wait(); 01187 } 01188 catch (InterruptedException e) 01189 { 01190 if (task.setExpiredTimeout()) 01191 { // Task will be ignored by all backends 01192 String msg = Translate.get("loadbalancer.rollback.timeout", 01193 new String[]{String.valueOf(tid), 01194 String.valueOf(task.getSuccess()), 01195 String.valueOf(task.getFailed())}); 01196 logger.warn(msg); 01197 throw new SQLException(msg); 01198 } 01199 // else task execution already started, to late to cancel 01200 } 01201 01202 if (task.getSuccess() > 0) 01203 return; 01204 else 01205 { // All tasks failed 01206 ArrayList exceptions = task.getExceptions(); 01207 if (exceptions == null) 01208 throw new SQLException(Translate.get( 01209 "loadbalancer.rollback.all.failed", tid)); 01210 else 01211 { 01212 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 01213 tid) 01214 + "\n"; 01215 SQLException ex = SQLExceptionFactory.getSQLException(exceptions, 01216 errorMsg); 01217 logger.error(ex.getMessage()); 01218 throw ex; 01219 } 01220 } 01221 } 01222 }
|
|
Waits for all writes in the blocking thread queue of the given backend to complete.
Definition at line 1302 of file RAIDb1.java. References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), and org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete(). 01304 { 01305 try 01306 { 01307 backendBlockingThreadsRWLock.acquireWrite(); 01308 // Lock in write to ensure that all writes are posted and we wait in the 01309 // queue, else a read lock has the priority with the implementation we are 01310 // using. 01311 } 01312 catch (InterruptedException e) 01313 { 01314 String msg = Translate.get( 01315 "loadbalancer.backendlist.acquire.writelock.failed", e); 01316 logger.error(msg); 01317 throw new SQLException(msg); 01318 } 01319 01320 int nbOfThreads = backendBlockingThreads.size(); 01321 01322 for (int i = 0; i < nbOfThreads; i++) 01323 { 01324 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01325 .get(i); 01326 if (thread.getBackend() == backend) 01327 thread.waitForAllTasksToComplete(); 01328 } 01329 01330 backendBlockingThreadsRWLock.releaseWrite(); 01331 }
|
|
Waits for all writes of the given transaction in the blocking thread queue of the given backend to complete before being able to complete the transaction.
Definition at line 1265 of file RAIDb1.java. References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), and org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete(). 01267 { 01268 try 01269 { 01270 backendBlockingThreadsRWLock.acquireWrite(); 01271 // Lock in write to ensure that all writes are posted and we wait in the 01272 // queue, else a read lock has the priority with the implementation we are 01273 // using. 01274 } 01275 catch (InterruptedException e) 01276 { 01277 String msg = Translate.get( 01278 "loadbalancer.backendlist.acquire.writelock.failed", e); 01279 logger.error(msg); 01280 throw new SQLException(msg); 01281 } 01282 01283 int nbOfThreads = backendBlockingThreads.size(); 01284 01285 for (int i = 0; i < nbOfThreads; i++) 01286 { 01287 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01288 .get(i); 01289 if (thread.getBackend() == backend) 01290 thread.waitForAllTasksToComplete(transactionId); 01291 } 01292 01293 backendBlockingThreadsRWLock.releaseWrite(); 01294 }
|
|
Waits for all writes of the given transaction in the blocking thread queue to complete before being able to complete the transaction. Definition at line 1228 of file RAIDb1.java. References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete(). 01230 { 01231 try 01232 { 01233 backendBlockingThreadsRWLock.acquireWrite(); 01234 // Lock in write to ensure that all writes are posted and we wait in the 01235 // queue, else a read lock has the priority with the implementation we are 01236 // using. 01237 } 01238 catch (InterruptedException e) 01239 { 01240 String msg = Translate.get( 01241 "loadbalancer.backendlist.acquire.writelock.failed", e); 01242 logger.error(msg); 01243 throw new SQLException(msg); 01244 } 01245 01246 int nbOfThreads = backendBlockingThreads.size(); 01247 01248 for (int i = 0; i < nbOfThreads; i++) 01249 { 01250 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01251 .get(i); 01252 thread.waitForAllTasksToComplete(transactionId); 01253 } 01254 01255 backendBlockingThreadsRWLock.releaseWrite(); 01256 }
|
|
List of Definition at line 90 of file RAIDb1.java. Referenced by org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.RAIDb1(). |
|
Lock on backendBlockingThreads list Definition at line 97 of file RAIDb1.java. |
|
List of Definition at line 95 of file RAIDb1.java. Referenced by org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.RAIDb1(). |
|
Lock on backendNonBlockingThreads list Definition at line 99 of file RAIDb1.java. |
|
Initial value: Trace
.getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb1")
Reimplemented from org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 103 of file RAIDb1.java. |
|
Should we wait for all backends to commit before returning ? Definition at line 101 of file RAIDb1.java. |