Public Member Functions | |
RAIDb2 (VirtualDatabase vdb, WaitForCompletionPolicy waitForCompletionPolicy, CreateTablePolicy createTablePolicy) throws Exception | |
int | execWriteRequest (AbstractWriteRequest request) throws AllBackendsFailedException, SQLException |
ControllerResultSet | execWriteRequestWithKeys (AbstractWriteRequest request, MetadataCache metadataCache) throws AllBackendsFailedException, SQLException |
abstract ControllerResultSet | execReadRequest (SelectRequest request, MetadataCache metadataCache) throws SQLException |
ControllerResultSet | execReadStoredProcedure (StoredProcedure proc, MetadataCache metadataCache) throws SQLException |
int | execWriteStoredProcedure (StoredProcedure proc) throws SQLException |
final void | begin (TransactionMarkerMetaData tm) throws SQLException |
void | commit (TransactionMarkerMetaData tm) throws SQLException |
void | rollback (TransactionMarkerMetaData tm) throws SQLException |
void | enableBackend (DatabaseBackend db, boolean writeEnabled) throws SQLException |
synchronized void | disableBackend (DatabaseBackend db) throws SQLException |
String | getXmlImpl () |
abstract String | getRaidb2Xml () |
Protected Member Functions | |
ControllerResultSet | executeRequestOnBackend (SelectRequest request, DatabaseBackend backend, MetadataCache metadataCache) throws SQLException, UnreachableBackendException |
ControllerResultSet | executeStoredProcedureOnBackend (StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache) throws SQLException, UnreachableBackendException |
void | waitForAllWritesToComplete (long transactionId) throws SQLException |
void | waitForAllWritesToComplete (DatabaseBackend backend, long transactionId) throws SQLException |
void | waitForAllWritesToComplete (DatabaseBackend backend) throws SQLException |
Protected Attributes | |
ArrayList | backendBlockingThreads |
ArrayList | backendNonBlockingThreads |
ReadPrioritaryFIFOWriteLock | backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock() |
ReadPrioritaryFIFOWriteLock | backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock() |
WaitForCompletionPolicy | waitForCompletionPolicy |
CreateTablePolicy | createTablePolicy |
Static Protected Attributes | |
Trace | logger |
This class is an abstract call because the read requests coming from the Request Manager are NOT treated here but in the subclasses. Transaction management and write requests are broadcasted to all backends owning the written table.
Definition at line 76 of file RAIDb2.java.
|
Creates a new RAIDb-1 Round Robin request load balancer. A new backend worker thread is created for each backend.
Definition at line 113 of file RAIDb2.java. 00116 { 00117 super(vdb, RAIDbLevels.RAIDb2, ParsingGranularities.TABLE); 00118 00119 this.waitForCompletionPolicy = waitForCompletionPolicy; 00120 backendBlockingThreads = new ArrayList(); 00121 backendNonBlockingThreads = new ArrayList(); 00122 this.createTablePolicy = createTablePolicy; 00123 }
|
|
Begins a new transaction.
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 908 of file RAIDb2.java. 00909 { 00910 }
|
|
Commits a transaction.
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 918 of file RAIDb2.java. References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction(), and org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.setExpiredTimeout(). 00919 { 00920 long tid = tm.getTransactionId(); 00921 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00922 waitForAllWritesToComplete(tid); 00923 00924 try 00925 { 00926 backendNonBlockingThreadsRWLock.acquireWrite(); 00927 } 00928 catch (InterruptedException e) 00929 { 00930 String msg = Translate.get( 00931 "loadbalancer.backendlist.acquire.writelock.failed", e); 00932 logger.error(msg); 00933 throw new SQLException(msg); 00934 } 00935 00936 int nbOfThreads = backendNonBlockingThreads.size(); 00937 ArrayList commitList = new ArrayList(); 00938 Long iTid = new Long(tid); 00939 00940 // Build the list of backend that need to commit this transaction 00941 for (int i = 0; i < nbOfThreads; i++) 00942 { 00943 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 00944 .get(i); 00945 if (thread.getBackend().isStartedTransaction(iTid)) 00946 commitList.add(thread); 00947 } 00948 00949 nbOfThreads = commitList.size(); 00950 if (nbOfThreads == 0) 00951 { 00952 backendNonBlockingThreadsRWLock.releaseWrite(); 00953 return; 00954 } 00955 00956 // Create the task 00957 CommitTask task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm 00958 .getTimeout(), tm.getLogin(), tid); 00959 00960 synchronized (task) 00961 { 00962 // Post the task in each backendThread tasklist and wakeup the threads 00963 for (int i = 0; i < nbOfThreads; i++) 00964 { 00965 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 00966 synchronized (thread) 00967 { 00968 thread.addTask(task, tid); 00969 thread.notify(); 00970 } 00971 } 00972 00973 backendNonBlockingThreadsRWLock.releaseWrite(); 00974 00975 // Wait for completion (notified by the task) 00976 try 00977 { 00978 // Wait on task 00979 long timeout = tm.getTimeout(); 00980 if (timeout > 0) 00981 { 00982 long start = System.currentTimeMillis(); 00983 task.wait(timeout); 00984 long end = System.currentTimeMillis(); 00985 long remaining = timeout - (end - start); 00986 if (remaining <= 0) 00987 { 00988 if (task.setExpiredTimeout()) 00989 { // Task will be ignored by all backends 00990 String msg = Translate.get("loadbalancer.commit.timeout", 00991 new String[]{String.valueOf(tid), 00992 String.valueOf(task.getSuccess()), 00993 String.valueOf(task.getFailed())}); 00994 logger.warn(msg); 00995 throw new SQLException(msg); 00996 } 00997 // else task execution already started, to late to cancel 00998 } 00999 } 01000 else 01001 task.wait(); 01002 } 01003 catch (InterruptedException e) 01004 { 01005 if (task.setExpiredTimeout()) 01006 { // Task will be ignored by all backends 01007 String msg = Translate.get("loadbalancer.commit.timeout", 01008 new String[]{String.valueOf(tid), 01009 String.valueOf(task.getSuccess()), 01010 String.valueOf(task.getFailed())}); 01011 logger.warn(msg); 01012 throw new SQLException(msg); 01013 } 01014 // else task execution already started, to late to cancel 01015 } 01016 01017 if (task.getSuccess() > 0) 01018 return; 01019 else 01020 { // All tasks failed 01021 ArrayList exceptions = task.getExceptions(); 01022 if (exceptions == null) 01023 throw new SQLException(Translate.get( 01024 "loadbalancer.commit.all.failed", tid)); 01025 else 01026 { 01027 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 01028 tid) 01029 + "\n"; 01030 for (int i = 0; i < exceptions.size(); i++) 01031 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01032 logger.error(errorMsg); 01033 throw new SQLException(errorMsg); 01034 } 01035 } 01036 } 01037 }
|
|
Disables a backend that was previously enabled. Ask the corresponding connection manager to finalize the connections if needed. No sanity checks are performed by this function.
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Reimplemented in org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec. Definition at line 1353 of file RAIDb2.java. References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.equals(), and org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(). 01355 { 01356 if (db.isWriteEnabled()) 01357 { 01358 // Start with the backendBlockingThread list 01359 try 01360 { 01361 backendBlockingThreadsRWLock.acquireWrite(); 01362 } 01363 catch (InterruptedException e) 01364 { 01365 String msg = Translate.get( 01366 "loadbalancer.backendlist.acquire.writelock.failed", e); 01367 logger.error(msg); 01368 throw new SQLException(msg); 01369 } 01370 01371 int nbOfThreads = backendBlockingThreads.size(); 01372 01373 // Find the right blocking thread 01374 for (int i = 0; i < nbOfThreads; i++) 01375 { 01376 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01377 .get(i); 01378 if (thread.getBackend().equals(db)) 01379 { 01380 logger.info(Translate 01381 .get("loadbalancer.backend.workerthread.blocking.remove", db 01382 .getName())); 01383 01384 backendBlockingThreads.remove(thread); 01385 01386 synchronized (thread) 01387 { 01388 // Kill the thread 01389 thread.addPriorityTask(new KillThreadTask(1, 1)); 01390 thread.notify(); 01391 } 01392 break; 01393 } 01394 } 01395 01396 backendBlockingThreadsRWLock.releaseWrite(); 01397 01398 // Continue with the backendNonBlockingThread list 01399 01400 try 01401 { 01402 backendNonBlockingThreadsRWLock.acquireWrite(); 01403 } 01404 catch (InterruptedException e) 01405 { 01406 String msg = Translate.get( 01407 "loadbalancer.backendlist.acquire.writelock.failed", e); 01408 logger.error(msg); 01409 throw new SQLException(msg); 01410 } 01411 01412 // Find the right non-blocking thread 01413 nbOfThreads = backendNonBlockingThreads.size(); 01414 for (int i = 0; i < nbOfThreads; i++) 01415 { 01416 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01417 .get(i); 01418 if (thread.getBackend().equals(db)) 01419 { 01420 logger.info(Translate.get( 01421 "loadbalancer.backend.workerthread.non.blocking.remove", db 01422 .getName())); 01423 01424 backendNonBlockingThreads.remove(thread); 01425 01426 synchronized (thread) 01427 { 01428 // Kill the thread 01429 thread.addPriorityTask(new KillThreadTask(1, 1)); 01430 thread.notify(); 01431 } 01432 break; 01433 } 01434 } 01435 01436 backendNonBlockingThreadsRWLock.releaseWrite(); 01437 } 01438 01439 db.disable(); 01440 if (db.isInitialized()) 01441 db.finalizeConnections(); 01442 }
|
|
Enables a Backend that was previously disabled. Ask the corresponding connection manager to initialize the connections if needed. No sanity checks are performed by this function.
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Reimplemented in org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec. Definition at line 1290 of file RAIDb2.java. 01292 { 01293 if (writeEnabled) 01294 { 01295 // Create 2 worker threads 01296 BackendWorkerThread blockingThread = new BackendWorkerThread(db, this); 01297 BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this); 01298 01299 // Add first to the blocking thread list 01300 try 01301 { 01302 backendBlockingThreadsRWLock.acquireWrite(); 01303 } 01304 catch (InterruptedException e) 01305 { 01306 String msg = Translate.get( 01307 "loadbalancer.backendlist.acquire.writelock.failed", e); 01308 logger.error(msg); 01309 throw new SQLException(msg); 01310 } 01311 backendBlockingThreads.add(blockingThread); 01312 backendBlockingThreadsRWLock.releaseWrite(); 01313 blockingThread.start(); 01314 logger.info(Translate.get( 01315 "loadbalancer.backend.workerthread.blocking.add", db.getName())); 01316 01317 // Then add to the non-blocking thread list 01318 try 01319 { 01320 backendNonBlockingThreadsRWLock.acquireWrite(); 01321 } 01322 catch (InterruptedException e) 01323 { 01324 String msg = Translate.get( 01325 "loadbalancer.backendlist.acquire.writelock.failed", e); 01326 logger.error(msg); 01327 throw new SQLException(msg); 01328 } 01329 backendNonBlockingThreads.add(nonBlockingThread); 01330 backendNonBlockingThreadsRWLock.releaseWrite(); 01331 nonBlockingThread.start(); 01332 logger.info(Translate.get( 01333 "loadbalancer.backend.workerthread.non.blocking.add", db.getName())); 01334 db.enableWrite(); 01335 } 01336 01337 if (!db.isInitialized()) 01338 db.initializeConnections(); 01339 db.enableRead(); 01340 }
|
|
Implementation specific load balanced read execution.
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Implemented in org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_RR, and org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRR. |
|
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 735 of file RAIDb2.java. References org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask.getResult(). 00737 { 00738 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure( 00739 proc, true, metadataCache); 00740 return task.getResult(); 00741 }
|
|
Execute a read request on the selected backend.
Definition at line 442 of file RAIDb2.java. 00445 { 00446 // Handle macros 00447 handleMacros(request); 00448 00449 // Ok, we have a backend, let's execute the request 00450 AbstractConnectionManager cm = backend.getConnectionManager(request 00451 .getLogin()); 00452 00453 // Sanity check 00454 if (cm == null) 00455 { 00456 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00457 new String[]{request.getLogin(), backend.getName()}); 00458 logger.error(msg); 00459 throw new SQLException(msg); 00460 } 00461 00462 // Execute the query 00463 if (request.isAutoCommit()) 00464 { 00465 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00466 // We could do something finer grain here by waiting 00467 // only for writes that depend on the tables we need 00468 // but is that really worth the overhead ? 00469 waitForAllWritesToComplete(backend); 00470 00471 ControllerResultSet rs = null; 00472 boolean badConnection; 00473 do 00474 { 00475 badConnection = false; 00476 // Use a connection just for this request 00477 Connection c = null; 00478 try 00479 { 00480 c = cm.getConnection(); 00481 } 00482 catch (UnreachableBackendException e1) 00483 { 00484 logger.error(Translate.get( 00485 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00486 disableBackend(backend); 00487 throw new UnreachableBackendException(Translate.get( 00488 "loadbalancer.backend.unreacheable", backend.getName())); 00489 } 00490 00491 // Sanity check 00492 if (c == null) 00493 throw new UnreachableBackendException( 00494 "No more connections on backend " + backend.getName()); 00495 00496 // Execute Query 00497 try 00498 { 00499 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 00500 cm.releaseConnection(c); 00501 } 00502 catch (SQLException e) 00503 { 00504 cm.releaseConnection(c); 00505 throw new SQLException(Translate.get( 00506 "loadbalancer.request.failed.on.backend", new String[]{ 00507 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00508 backend.getName(), e.getMessage()})); 00509 } 00510 catch (BadConnectionException e) 00511 { // Get rid of the bad connection 00512 cm.deleteConnection(c); 00513 badConnection = true; 00514 } 00515 } 00516 while (badConnection); 00517 if (logger.isDebugEnabled()) 00518 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00519 String.valueOf(request.getId()), backend.getName()})); 00520 return rs; 00521 } 00522 else 00523 { // Inside a transaction 00524 Connection c; 00525 long tid = request.getTransactionId(); 00526 Long lTid = new Long(tid); 00527 00528 // Wait for previous writes to complete 00529 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00530 waitForAllWritesToComplete(backend, request.getTransactionId()); 00531 00532 try 00533 { 00534 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm); 00535 } 00536 catch (UnreachableBackendException e1) 00537 { 00538 logger.error(Translate.get( 00539 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00540 disableBackend(backend); 00541 throw new SQLException(Translate.get( 00542 "loadbalancer.backend.unreacheable", backend.getName())); 00543 } 00544 catch (NoTransactionStartWhenDisablingException e) 00545 { 00546 String msg = Translate.get("loadbalancer.backend.is.disabling", 00547 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()), 00548 backend.getName()}); 00549 logger.error(msg); 00550 throw new SQLException(msg); 00551 } 00552 00553 // Sanity check 00554 if (c == null) 00555 throw new SQLException(Translate.get( 00556 "loadbalancer.unable.retrieve.connection", new String[]{ 00557 String.valueOf(tid), backend.getName()})); 00558 00559 // Execute Query 00560 ControllerResultSet rs = null; 00561 try 00562 { 00563 rs = executeSelectRequestOnBackend(request, backend, c, metadataCache); 00564 } 00565 catch (SQLException e) 00566 { 00567 throw new SQLException(Translate.get( 00568 "loadbalancer.request.failed.on.backend", new String[]{ 00569 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00570 backend.getName(), e.getMessage()})); 00571 } 00572 catch (BadConnectionException e) 00573 { // Connection failed, so did the transaction 00574 // Disable the backend. 00575 cm.deleteConnection(tid); 00576 String msg = Translate.get( 00577 "loadbalancer.backend.disabling.connection.failure", backend 00578 .getName()); 00579 logger.error(msg); 00580 disableBackend(backend); 00581 throw new SQLException(msg); 00582 } 00583 if (logger.isDebugEnabled()) 00584 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00585 new String[]{String.valueOf(tid), String.valueOf(request.getId()), 00586 backend.getName()})); 00587 return rs; 00588 } 00589 }
|
|
Execute a stored procedure on the selected backend.
Definition at line 600 of file RAIDb2.java. 00603 { 00604 // Handle macros 00605 handleMacros(proc); 00606 00607 // Ok, we have a backend, let's execute the request 00608 AbstractConnectionManager cm = backend 00609 .getConnectionManager(proc.getLogin()); 00610 00611 // Sanity check 00612 if (cm == null) 00613 { 00614 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00615 new String[]{proc.getLogin(), backend.getName()}); 00616 logger.error(msg); 00617 throw new SQLException(msg); 00618 } 00619 00620 // Execute the query 00621 if (proc.isAutoCommit()) 00622 { 00623 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00624 // We could do something finer grain here by waiting 00625 // only for writes that depend on the tables we need 00626 // but is that really worth the overhead ? 00627 waitForAllWritesToComplete(backend); 00628 00629 // Use a connection just for this request 00630 Connection c = null; 00631 try 00632 { 00633 c = cm.getConnection(); 00634 } 00635 catch (UnreachableBackendException e1) 00636 { 00637 logger.error(Translate.get( 00638 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00639 disableBackend(backend); 00640 throw new UnreachableBackendException(Translate.get( 00641 "loadbalancer.backend.unreacheable", backend.getName())); 00642 } 00643 00644 // Sanity check 00645 if (c == null) 00646 throw new SQLException(Translate.get( 00647 "loadbalancer.backend.no.connection", backend.getName())); 00648 00649 // Execute Query 00650 ControllerResultSet rs = null; 00651 try 00652 { 00653 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 00654 backend, c, metadataCache); 00655 } 00656 catch (Exception e) 00657 { 00658 throw new SQLException(Translate.get( 00659 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00660 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00661 backend.getName(), e.getMessage()})); 00662 } 00663 finally 00664 { 00665 cm.releaseConnection(c); 00666 } 00667 if (logger.isDebugEnabled()) 00668 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 00669 new String[]{String.valueOf(proc.getId()), backend.getName()})); 00670 return rs; 00671 } 00672 else 00673 { // Inside a transaction 00674 Connection c; 00675 long tid = proc.getTransactionId(); 00676 Long lTid = new Long(tid); 00677 00678 // Wait for previous writes to complete 00679 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00680 waitForAllWritesToComplete(backend, proc.getTransactionId()); 00681 00682 try 00683 { 00684 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm); 00685 } 00686 catch (UnreachableBackendException e1) 00687 { 00688 logger.error(Translate.get( 00689 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00690 disableBackend(backend); 00691 throw new SQLException(Translate.get( 00692 "loadbalancer.backend.unreacheable", backend.getName())); 00693 } 00694 catch (NoTransactionStartWhenDisablingException e) 00695 { 00696 String msg = Translate.get("loadbalancer.backend.is.disabling", 00697 new String[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00698 backend.getName()}); 00699 logger.error(msg); 00700 throw new SQLException(msg); 00701 } 00702 00703 // Sanity check 00704 if (c == null) 00705 throw new SQLException(Translate.get( 00706 "loadbalancer.unable.retrieve.connection", new String[]{ 00707 String.valueOf(tid), backend.getName()})); 00708 00709 // Execute Query 00710 ControllerResultSet rs; 00711 try 00712 { 00713 rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc, 00714 backend, c, metadataCache); 00715 } 00716 catch (Exception e) 00717 { 00718 throw new SQLException(Translate.get( 00719 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00720 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00721 backend.getName(), e.getMessage()})); 00722 } 00723 if (logger.isDebugEnabled()) 00724 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00725 new String[]{String.valueOf(tid), String.valueOf(proc.getId()), 00726 backend.getName()})); 00727 return rs; 00728 } 00729 }
|
|
Performs a write request. This request is broadcasted to all nodes that owns the table to be written.
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 167 of file RAIDb2.java. Referenced by org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeys(). 00169 { 00170 return ((WriteRequestTask) execWriteRequest(request, false, null)) 00171 .getResult(); 00172 }
|
|
Perform a write request and return the auto generated keys.
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 184 of file RAIDb2.java. References org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest(). 00187 { 00188 return ((WriteRequestWithKeysTask) execWriteRequest(request, true, 00189 metadataCache)).getResult(); 00190 }
|
|
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 746 of file RAIDb2.java. References org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask.getResult(). 00747 { 00748 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure( 00749 proc, false, null); 00750 return task.getResult(); 00751 }
|
|
return xml formatted information about this raidb2 load balancer
Implemented in org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_RR, and org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRR. |
|
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Reimplemented in org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec. Definition at line 1447 of file RAIDb2.java. 01448 { 01449 StringBuffer info = new StringBuffer(); 01450 info.append("<" + DatabasesXmlTags.ELT_RAIDb_2 + ">"); 01451 if (createTablePolicy != null) 01452 info.append(createTablePolicy.getXml()); 01453 if (waitForCompletionPolicy != null) 01454 info.append(waitForCompletionPolicy.getXml()); 01455 if (macroHandler != null) 01456 info.append(macroHandler.getXml()); 01457 this.getRaidb2Xml(); 01458 info.append("</" + DatabasesXmlTags.ELT_RAIDb_2 + ">"); 01459 return info.toString(); 01460 }
|
|
Rollbacks a transaction.
Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 1045 of file RAIDb2.java. References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction(), and org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.setExpiredTimeout(). 01046 { 01047 long tid = tm.getTransactionId(); 01048 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 01049 waitForAllWritesToComplete(tid); 01050 01051 try 01052 { 01053 backendNonBlockingThreadsRWLock.acquireWrite(); 01054 } 01055 catch (InterruptedException e) 01056 { 01057 String msg = Translate.get( 01058 "loadbalancer.backendlist.acquire.writelock.failed", e); 01059 logger.error(msg); 01060 throw new SQLException(msg); 01061 } 01062 int nbOfThreads = backendNonBlockingThreads.size(); 01063 ArrayList rollbackList = new ArrayList(); 01064 Long iTid = new Long(tid); 01065 01066 // Build the list of backend that need to rollback this transaction 01067 for (int i = 0; i < nbOfThreads; i++) 01068 { 01069 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01070 .get(i); 01071 if (thread.getBackend().isStartedTransaction(iTid)) 01072 rollbackList.add(thread); 01073 } 01074 01075 nbOfThreads = rollbackList.size(); 01076 if (nbOfThreads == 0) 01077 { 01078 backendNonBlockingThreadsRWLock.releaseWrite(); 01079 return; 01080 } 01081 01082 // Create the task 01083 RollbackTask task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, 01084 tm.getTimeout(), tm.getLogin(), tid); 01085 01086 synchronized (task) 01087 { 01088 // Post the task in each backendThread tasklist and wakeup the threads 01089 for (int i = 0; i < nbOfThreads; i++) 01090 { 01091 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 01092 synchronized (thread) 01093 { 01094 thread.addTask(task, tid); 01095 thread.notify(); 01096 } 01097 } 01098 01099 backendNonBlockingThreadsRWLock.releaseWrite(); 01100 01101 // Wait for completion (notified by the task) 01102 try 01103 { 01104 // Wait on task 01105 long timeout = tm.getTimeout(); 01106 if (timeout > 0) 01107 { 01108 long start = System.currentTimeMillis(); 01109 task.wait(timeout); 01110 long end = System.currentTimeMillis(); 01111 long remaining = timeout - (end - start); 01112 if (remaining <= 0) 01113 { 01114 if (task.setExpiredTimeout()) 01115 { // Task will be ignored by all backends 01116 String msg = Translate.get("loadbalancer.rollback.timeout", 01117 new String[]{String.valueOf(tid), 01118 String.valueOf(task.getSuccess()), 01119 String.valueOf(task.getFailed())}); 01120 logger.warn(msg); 01121 throw new SQLException(msg); 01122 } 01123 // else task execution already started, to late to cancel 01124 } 01125 } 01126 else 01127 task.wait(); 01128 } 01129 catch (InterruptedException e) 01130 { 01131 if (task.setExpiredTimeout()) 01132 { // Task will be ignored by all backends 01133 String msg = Translate.get("loadbalancer.rollback.timeout", 01134 new String[]{String.valueOf(tid), 01135 String.valueOf(task.getSuccess()), 01136 String.valueOf(task.getFailed())}); 01137 logger.warn(msg); 01138 throw new SQLException(msg); 01139 } 01140 // else task execution already started, to late to cancel 01141 } 01142 01143 if (task.getSuccess() > 0) 01144 return; 01145 else 01146 { // All tasks failed 01147 ArrayList exceptions = task.getExceptions(); 01148 if (exceptions == null) 01149 throw new SQLException(Translate.get( 01150 "loadbalancer.rollback.all.failed", tid)); 01151 else 01152 { 01153 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 01154 tid) 01155 + "\n"; 01156 for (int i = 0; i < exceptions.size(); i++) 01157 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01158 logger.error(errorMsg); 01159 throw new SQLException(errorMsg); 01160 } 01161 } 01162 } 01163 }
|
|
Waits for all writes in the blocking thread queue of the given backend to complete.
Definition at line 1243 of file RAIDb2.java. References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), and org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete(). 01245 { 01246 try 01247 { 01248 backendBlockingThreadsRWLock.acquireWrite(); 01249 // Lock in write to ensure that all writes are posted and we wait in the 01250 // queue, else a read lock has the priority with the implementation we are 01251 // using. 01252 } 01253 catch (InterruptedException e) 01254 { 01255 String msg = Translate.get( 01256 "loadbalancer.backendlist.acquire.writelock.failed", e); 01257 logger.error(msg); 01258 throw new SQLException(msg); 01259 } 01260 01261 int nbOfThreads = backendBlockingThreads.size(); 01262 01263 for (int i = 0; i < nbOfThreads; i++) 01264 { 01265 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01266 .get(i); 01267 if (thread.getBackend() == backend) 01268 thread.waitForAllTasksToComplete(); 01269 } 01270 01271 backendBlockingThreadsRWLock.releaseWrite(); 01272 }
|
|
Waits for all writes of the given transaction in the blocking thread queue of the given backend to complete before being able to complete the transaction.
Definition at line 1206 of file RAIDb2.java. References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), and org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete(). 01208 { 01209 try 01210 { 01211 backendBlockingThreadsRWLock.acquireWrite(); 01212 // Lock in write to ensure that all writes are posted and we wait in the 01213 // queue, else a read lock has the priority with the implementation we are 01214 // using. 01215 } 01216 catch (InterruptedException e) 01217 { 01218 String msg = Translate.get( 01219 "loadbalancer.backendlist.acquire.writelock.failed", e); 01220 logger.error(msg); 01221 throw new SQLException(msg); 01222 } 01223 01224 int nbOfThreads = backendBlockingThreads.size(); 01225 01226 for (int i = 0; i < nbOfThreads; i++) 01227 { 01228 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01229 .get(i); 01230 if (thread.getBackend() == backend) 01231 thread.waitForAllTasksToComplete(transactionId); 01232 } 01233 01234 backendBlockingThreadsRWLock.releaseWrite(); 01235 }
|
|
Waits for all writes of the given transaction in the blocking thread queue to complete before being able to complete the transaction. Definition at line 1169 of file RAIDb2.java. References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete(). 01171 { 01172 try 01173 { 01174 backendBlockingThreadsRWLock.acquireWrite(); 01175 // Lock in write to ensure that all writes are posted and we wait in the 01176 // queue, else a read lock has the priority with the implementation we are 01177 // using. 01178 } 01179 catch (InterruptedException e) 01180 { 01181 String msg = Translate.get( 01182 "loadbalancer.backendlist.acquire.writelock.failed", e); 01183 logger.error(msg); 01184 throw new SQLException(msg); 01185 } 01186 01187 int nbOfThreads = backendBlockingThreads.size(); 01188 01189 for (int i = 0; i < nbOfThreads; i++) 01190 { 01191 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01192 .get(i); 01193 thread.waitForAllTasksToComplete(transactionId); 01194 } 01195 01196 backendBlockingThreadsRWLock.releaseWrite(); 01197 }
|
|
Initial value: Trace
.getLogger("org.objectweb.cjdbc.controller.loadbalancer.raidb2")
Reimplemented from org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer. Definition at line 95 of file RAIDb2.java. |