The read requests coming from the request manager are sent to the backend nodes using a weighted round robin. Write requests are broadcasted to all backends.
RAIDb2_WRR.java の 51 行で定義されています。
|
Creates a new RAIDb-2 Weighted Round Robin request load balancer.
RAIDb2_WRR.java の 76 行で定義されています。
00082 { 00083 super(vdb, waitForCompletionPolicy, createTablePolicy, timestampResolution); 00084 index = -1; 00085 } |
|
Begins a new transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 1005 行で定義されています。
01006 { 01007 } |
|
Commits a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 1015 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
01016 { 01017 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 01018 waitForAllWritesToComplete(tm.getTransactionId()); 01019 01020 try 01021 { 01022 backendNonBlockingThreadsRWLock.acquireRead(); 01023 } 01024 catch (InterruptedException e) 01025 { 01026 String msg = Translate.get( 01027 "loadbalancer.backendlist.acquire.readlock.failed", e); 01028 logger.error(msg); 01029 throw new SQLException(msg); 01030 } 01031 01032 int nbOfThreads = backendNonBlockingThreads.size(); 01033 ArrayList commitList = new ArrayList(); 01034 Long iTid = new Long(tm.getTransactionId()); 01035 01036 // Build the list of backend that need to commit this transaction 01037 for (int i = 0; i < nbOfThreads; i++) 01038 { 01039 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01040 .get(i); 01041 if (thread.getBackend().isStartedTransaction(iTid)) 01042 commitList.add(thread); 01043 } 01044 01045 nbOfThreads = commitList.size(); 01046 if (nbOfThreads == 0) 01047 { 01048 backendNonBlockingThreadsRWLock.releaseRead(); 01049 return; 01050 } 01051 01052 // Create the task 01053 CommitTask task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm 01054 .getTimeout(), tm.getLogin(), tm.getTransactionId()); 01055 01056 synchronized (task) 01057 { 01058 // Post the task in each backendThread tasklist and wakeup the threads 01059 for (int i = 0; i < nbOfThreads; i++) 01060 { 01061 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 01062 synchronized (thread) 01063 { 01064 thread.addTask(task); 01065 thread.notify(); 01066 } 01067 } 01068 01069 backendNonBlockingThreadsRWLock.releaseRead(); 01070 01071 // Wait for completion (notified by the task) 01072 try 01073 { 01074 // Wait on task 01075 long timeout = tm.getTimeout(); 01076 if (timeout > 0) 01077 { 01078 long start = System.currentTimeMillis(); 01079 task.wait(timeout); 01080 long end = System.currentTimeMillis(); 01081 long remaining = timeout - (end - start); 01082 if (remaining <= 0) 01083 { 01084 String msg = Translate.get("loadbalancer.commit.timeout", 01085 new String[]{String.valueOf(tm.getTransactionId()), 01086 String.valueOf(task.getSuccess()), 01087 String.valueOf(task.getFailed())}); 01088 logger.warn(msg); 01089 throw new SQLException(msg); 01090 } 01091 } 01092 else 01093 task.wait(); 01094 } 01095 catch (InterruptedException e) 01096 { 01097 throw new SQLException(Translate.get("loadbalancer.commit.timeout", 01098 new String[]{String.valueOf(tm.getTransactionId()), 01099 String.valueOf(task.getSuccess()), 01100 String.valueOf(task.getFailed())})); 01101 } 01102 01103 if (task.getSuccess() > 0) 01104 return; 01105 else 01106 { // All tasks failed 01107 ArrayList exceptions = task.getExceptions(); 01108 if (exceptions == null) 01109 throw new SQLException(Translate.get( 01110 "loadbalancer.commit.all.failed", tm.getTransactionId())); 01111 else 01112 { 01113 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 01114 tm.getTransactionId()) 01115 + "\n"; 01116 for (int i = 0; i < exceptions.size(); i++) 01117 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01118 logger.error(errorMsg); 01119 throw new SQLException(errorMsg); 01120 } 01121 } 01122 } 01123 } |
|
Disables a backend that was previously enabled. Ask the corresponding connection manager to finalize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。 RAIDb2.java の 1421 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.equals(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend().
01423 { 01424 if (db.isWriteEnabled()) 01425 { 01426 // Start with the backendBlockingThread list 01427 try 01428 { 01429 backendBlockingThreadsRWLock.acquireWrite(); 01430 } 01431 catch (InterruptedException e) 01432 { 01433 String msg = Translate.get( 01434 "loadbalancer.backendlist.acquire.writelock.failed", e); 01435 logger.error(msg); 01436 throw new SQLException(msg); 01437 } 01438 01439 int nbOfThreads = backendBlockingThreads.size(); 01440 01441 // Find the right blocking thread 01442 for (int i = 0; i < nbOfThreads; i++) 01443 { 01444 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01445 .get(i); 01446 if (thread.getBackend().equals(db)) 01447 { 01448 logger.info(Translate 01449 .get("loadbalancer.backend.workerthread.blocking.remove", db 01450 .getName())); 01451 01452 backendBlockingThreads.remove(thread); 01453 01454 synchronized (thread) 01455 { 01456 // Kill the thread 01457 thread.addPriorityTask(new KillThreadTask(1, 1)); 01458 thread.notify(); 01459 } 01460 break; 01461 } 01462 } 01463 01464 backendBlockingThreadsRWLock.releaseWrite(); 01465 01466 // Continue with the backendNonBlockingThread list 01467 01468 try 01469 { 01470 backendNonBlockingThreadsRWLock.acquireWrite(); 01471 } 01472 catch (InterruptedException e) 01473 { 01474 String msg = Translate.get( 01475 "loadbalancer.backendlist.acquire.writelock.failed", e); 01476 logger.error(msg); 01477 throw new SQLException(msg); 01478 } 01479 01480 // Find the right non-blocking thread 01481 nbOfThreads = backendNonBlockingThreads.size(); 01482 for (int i = 0; i < nbOfThreads; i++) 01483 { 01484 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01485 .get(i); 01486 if (thread.getBackend().equals(db)) 01487 { 01488 logger.info(Translate.get( 01489 "loadbalancer.backend.workerthread.non.blocking.remove", db 01490 .getName())); 01491 01492 backendNonBlockingThreads.remove(thread); 01493 01494 synchronized (thread) 01495 { 01496 // Kill the thread 01497 thread.addPriorityTask(new KillThreadTask(1, 1)); 01498 thread.notify(); 01499 } 01500 break; 01501 } 01502 } 01503 01504 backendNonBlockingThreadsRWLock.releaseWrite(); 01505 } 01506 01507 db.disable(); 01508 if (db.isInitialized()) 01509 db.finalizeConnections(); 01510 } |
|
Enables a Backend that was previously disabled. Ask the corresponding connection manager to initialize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。 RAIDb2.java の 1356 行で定義されています。 参照先 org.objectweb.cjdbc.controller.backend.DatabaseBackend.enableWrite(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.getName().
01358 { 01359 if (writeEnabled) 01360 { 01361 // Create 2 worker threads 01362 BackendWorkerThread blockingThread = new BackendWorkerThread( 01363 (DatabaseBackend) db, this); 01364 BackendWorkerThread nonBlockingThread = new BackendWorkerThread( 01365 (DatabaseBackend) db, this); 01366 01367 // Add first to the blocking thread list 01368 try 01369 { 01370 backendBlockingThreadsRWLock.acquireWrite(); 01371 } 01372 catch (InterruptedException e) 01373 { 01374 String msg = Translate.get( 01375 "loadbalancer.backendlist.acquire.writelock.failed", e); 01376 logger.error(msg); 01377 throw new SQLException(msg); 01378 } 01379 backendBlockingThreads.add(blockingThread); 01380 backendBlockingThreadsRWLock.releaseWrite(); 01381 blockingThread.start(); 01382 logger.info(Translate.get( 01383 "loadbalancer.backend.workerthread.blocking.add", db.getName())); 01384 01385 // Then add to the non-blocking thread list 01386 try 01387 { 01388 backendNonBlockingThreadsRWLock.acquireWrite(); 01389 } 01390 catch (InterruptedException e) 01391 { 01392 String msg = Translate.get( 01393 "loadbalancer.backendlist.acquire.writelock.failed", e); 01394 logger.error(msg); 01395 throw new SQLException(msg); 01396 } 01397 backendNonBlockingThreads.add(nonBlockingThread); 01398 backendNonBlockingThreadsRWLock.releaseWrite(); 01399 nonBlockingThread.start(); 01400 logger.info(Translate.get( 01401 "loadbalancer.backend.workerthread.non.blocking.add", db.getName())); 01402 db.enableWrite(); 01403 } 01404 01405 if (!db.isInitialized()) 01406 db.initializeConnections(); 01407 db.enableRead(); 01408 } |
|
Chooses the node to execute the stored procedure using a round-robin algorithm. If the next node has not the needed stored procedure, we try the next one and so on until a suitable backend is found.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2_WRR.java の 115 行で定義されています。
00117 { 00118 throw new NotImplementedException( 00119 this.getClass().getName() + ":execReadStoredProcedure"); 00120 } |
|
Performs a read request. It is up to the implementation to choose to which backend node(s) this request should be sent.
org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2に実装されています. RAIDb2_WRR.java の 101 行で定義されています。
00103 { 00104 throw new NotImplementedException( 00105 this.getClass().getName() + ":execReadRequest"); 00106 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 816 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask.getResult().
00818 { 00819 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure( 00820 proc, true); 00821 return task.getResult(); 00822 } |
|
Execute a read request on the selected backend.
RAIDb2.java の 492 行で定義されています。
00494 { 00495 // Handle macros 00496 request.setSQL(RequestManager.handleSQLMacros(request.getSQL(), 00497 timestampResolution, false)); 00498 00499 // Ok, we have a backend, let's execute the request 00500 AbstractConnectionManager cm = backend.getConnectionManager(request 00501 .getLogin()); 00502 00503 // Sanity check 00504 if (cm == null) 00505 { 00506 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00507 new String[]{request.getLogin(), backend.getName()}); 00508 logger.error(msg); 00509 throw new SQLException(msg); 00510 } 00511 00512 // Execute the query 00513 if (request.isAutoCommit()) 00514 { 00515 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00516 // We could do something finer grain here by waiting 00517 // only for writes that depend on the tables we need 00518 // but is that really worth the overhead ? 00519 waitForAllWritesToComplete(backend); 00520 00521 ResultSet rs = null; 00522 boolean badConnection; 00523 do 00524 { 00525 badConnection = false; 00526 // Use a connection just for this request 00527 Connection c = null; 00528 try 00529 { 00530 c = cm.getConnection(); 00531 } 00532 catch (UnreachableBackendException e1) 00533 { 00534 logger.error(Translate.get( 00535 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00536 backend.disable(); 00537 throw new UnreachableBackendException(Translate.get( 00538 "loadbalancer.backend.unreacheable", backend.getName())); 00539 } 00540 00541 // Sanity check 00542 if (c == null) 00543 throw new UnreachableBackendException( 00544 "No more connections on backend " + backend.getName()); 00545 00546 // Execute Query 00547 try 00548 { 00549 rs = executeStatementOnBackend(request, backend, c); 00550 cm.releaseConnection(c); 00551 } 00552 catch (SQLException e) 00553 { 00554 cm.releaseConnection(c); 00555 throw new SQLException(Translate.get( 00556 "loadbalancer.request.failed.on.backend", new String[]{ 00557 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00558 backend.getName(), e.getMessage()})); 00559 } 00560 catch (BadConnectionException e) 00561 { // Get rid of the bad connection 00562 cm.deleteConnection(c); 00563 badConnection = true; 00564 } 00565 } 00566 while (badConnection); 00567 if (logger.isDebugEnabled()) 00568 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00569 String.valueOf(request.getId()), backend.getName()})); 00570 return rs; 00571 } 00572 else 00573 { // Inside a transaction 00574 Connection c; 00575 long tid = request.getTransactionId(); 00576 Long lTid = new Long(tid); 00577 00578 // Wait for previous writes to complete 00579 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00580 waitForAllWritesToComplete(backend, request.getTransactionId()); 00581 00582 if (!backend.isStartedTransaction(lTid)) 00583 { // transaction has not been started yet on this backend 00584 try 00585 { 00586 c = cm.getConnection(tid); 00587 } 00588 catch (UnreachableBackendException e1) 00589 { 00590 logger.error(Translate.get( 00591 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00592 backend.disable(); 00593 throw new UnreachableBackendException(Translate.get( 00594 "loadbalancer.backend.unreacheable", backend.getName())); 00595 } 00596 00597 // Sanity check 00598 if (c == null) 00599 throw new SQLException(Translate.get( 00600 "loadbalancer.unable.get.connection", new String[]{ 00601 String.valueOf(tid), backend.getName()})); 00602 00603 // begin transaction 00604 backend.startTransaction(lTid); 00605 c.setAutoCommit(false); 00606 } 00607 else 00608 { // Re-use the connection used by this transaction 00609 c = cm.retrieveConnection(tid); 00610 00611 // Sanity check 00612 if (c == null) 00613 throw new SQLException(Translate.get( 00614 "loadbalancer.unable.retrieve.connection", new String[]{ 00615 String.valueOf(tid), backend.getName()})); 00616 } 00617 00618 // Execute Query 00619 ResultSet rs = null; 00620 try 00621 { 00622 rs = executeStatementOnBackend(request, backend, c); 00623 } 00624 catch (SQLException e) 00625 { 00626 throw new SQLException(Translate.get( 00627 "loadbalancer.request.failed.on.backend", new String[]{ 00628 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00629 backend.getName(), e.getMessage()})); 00630 } 00631 catch (BadConnectionException e) 00632 { // Connection failed, so did the transaction 00633 // Disable the backend. 00634 cm.deleteConnection(tid); 00635 String msg = Translate.get( 00636 "loadbalancer.backend.disabling.connection.failure", backend 00637 .getName()); 00638 logger.error(msg); 00639 backend.disable(); 00640 throw new SQLException(msg); 00641 } 00642 if (logger.isDebugEnabled()) 00643 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00644 new String[]{String.valueOf(tid), String.valueOf(request.getId()), 00645 backend.getName()})); 00646 return rs; 00647 } 00648 } |
|
Execute a statement on a backend. If the execution fails, the connection is checked for validity. If the connection was not valid, the query is automatically retried on a new connection.
AbstractLoadBalancer.java の 251 行で定義されています。 参照先 java.sql.Statement.executeQuery(), java.sql.Statement.setCursorName(), java.sql.Statement.setFetchSize(), java.sql.Statement.setMaxRows(), java.sql.Statement.setQueryTimeout(), org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetCursorName, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetFetchSize, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetMaxRows, と org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetQueryTimeout. 参照元 org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDB.execReadRequest().
00254 { 00255 ResultSet rs = null; 00256 try 00257 { 00258 backend.addPendingReadRequest(request); 00259 String sql = request.getSQL(); 00260 // Rewrite the query if needed 00261 sql = backend.rewriteQuery(sql); 00262 // Execute the query 00263 Statement s = c.createStatement(); 00264 DriverCompliance driverCompliance = backend.getDriverCompliance(); 00265 if (driverCompliance.supportSetQueryTimeout()) 00266 s.setQueryTimeout(request.getTimeout()); 00267 if ((request.getCursorName() != null) 00268 && (driverCompliance.supportSetCursorName())) 00269 s.setCursorName(request.getCursorName()); 00270 if ((request.getFetchSize() != 0) 00271 && driverCompliance.supportSetFetchSize()) 00272 s.setFetchSize(request.getFetchSize()); 00273 if ((request.getMaxRows() > 0) && driverCompliance.supportSetMaxRows()) 00274 s.setMaxRows(request.getMaxRows()); 00275 rs = s.executeQuery(sql); 00276 } 00277 catch (SQLException e) 00278 { // Something bad happened 00279 if (backend.isValidConnection(c)) 00280 throw e; // Connection is valid, throw the exception 00281 else 00282 throw new BadConnectionException(); 00283 } 00284 finally 00285 { 00286 backend.removePendingRequest(request); 00287 } 00288 return rs; 00289 } |
|
Execute a stored procedure on the selected backend.
RAIDb2.java の 658 行で定義されています。
00661 { 00662 // Handle macros 00663 proc.setSQL(RequestManager.handleSQLMacros(proc.getSQL(), 00664 timestampResolution, false)); 00665 00666 // Ok, we have a backend, let's execute the request 00667 AbstractConnectionManager cm = backend 00668 .getConnectionManager(proc.getLogin()); 00669 00670 // Sanity check 00671 if (cm == null) 00672 { 00673 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00674 new String[]{proc.getLogin(), backend.getName()}); 00675 logger.error(msg); 00676 throw new SQLException(msg); 00677 } 00678 00679 // Execute the query 00680 if (proc.isAutoCommit()) 00681 { 00682 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00683 // We could do something finer grain here by waiting 00684 // only for writes that depend on the tables we need 00685 // but is that really worth the overhead ? 00686 waitForAllWritesToComplete(backend); 00687 00688 // Use a connection just for this request 00689 Connection c = null; 00690 try 00691 { 00692 c = cm.getConnection(); 00693 } 00694 catch (UnreachableBackendException e1) 00695 { 00696 logger.error(Translate.get( 00697 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00698 backend.disable(); 00699 throw new UnreachableBackendException(Translate.get( 00700 "loadbalancer.backend.unreacheable", backend.getName())); 00701 } 00702 00703 // Sanity check 00704 if (c == null) 00705 throw new SQLException(Translate.get( 00706 "loadbalancer.backend.no.connection", backend.getName())); 00707 00708 // Execute Query 00709 ResultSet rs = null; 00710 try 00711 { 00712 // We suppose here that the request does not modify the schema since 00713 // it is a read-only query. 00714 CallableStatement cs = c.prepareCall(proc.getSQL()); 00715 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00716 cs.setQueryTimeout(proc.getTimeout()); 00717 if ((proc.getMaxRows() > 0) 00718 && backend.getDriverCompliance().supportSetMaxRows()) 00719 cs.setMaxRows(proc.getMaxRows()); 00720 rs = cs.executeQuery(); 00721 } 00722 catch (SQLException e) 00723 { 00724 throw new SQLException(Translate.get( 00725 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00726 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00727 backend.getName(), e.getMessage()})); 00728 } 00729 finally 00730 { 00731 cm.releaseConnection(c); 00732 } 00733 if (logger.isDebugEnabled()) 00734 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 00735 new String[]{String.valueOf(proc.getId()), backend.getName()})); 00736 return rs; 00737 } 00738 else 00739 { // Inside a transaction 00740 Connection c; 00741 long tid = proc.getTransactionId(); 00742 Long lTid = new Long(tid); 00743 00744 // Wait for previous writes to complete 00745 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00746 waitForAllWritesToComplete(backend, proc.getTransactionId()); 00747 00748 if (!backend.isStartedTransaction(lTid)) 00749 { // transaction has not been started yet on this backend 00750 try 00751 { 00752 c = cm.getConnection(tid); 00753 } 00754 catch (UnreachableBackendException e1) 00755 { 00756 logger.error(Translate.get( 00757 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00758 backend.disable(); 00759 throw new SQLException(Translate.get( 00760 "loadbalancer.backend.unreacheable", backend.getName())); 00761 } 00762 00763 // Sanity check 00764 if (c == null) 00765 throw new SQLException(Translate.get( 00766 "loadbalancer.unable.get.connection", new String[]{ 00767 String.valueOf(tid), backend.getName()})); 00768 00769 // begin transaction 00770 backend.startTransaction(lTid); 00771 c.setAutoCommit(false); 00772 } 00773 else 00774 { // Re-use the connection used by this transaction 00775 c = cm.retrieveConnection(tid); 00776 00777 // Sanity check 00778 if (c == null) 00779 throw new SQLException(Translate.get( 00780 "loadbalancer.unable.retrieve.connection", new String[]{ 00781 String.valueOf(tid), backend.getName()})); 00782 } 00783 00784 // Execute Query 00785 ResultSet rs; 00786 try 00787 { 00788 // We suppose here that the request does not modify the schema since 00789 // it is a read-only query. 00790 CallableStatement cs = c.prepareCall(proc.getSQL()); 00791 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00792 cs.setQueryTimeout(proc.getTimeout()); 00793 if ((proc.getMaxRows() > 0) 00794 && backend.getDriverCompliance().supportSetMaxRows()) 00795 cs.setMaxRows(proc.getMaxRows()); 00796 rs = cs.executeQuery(); 00797 } 00798 catch (SQLException e) 00799 { 00800 throw new SQLException(Translate.get( 00801 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00802 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00803 backend.getName(), e.getMessage()})); 00804 } 00805 if (logger.isDebugEnabled()) 00806 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00807 new String[]{String.valueOf(tid), String.valueOf(proc.getId()), 00808 backend.getName()})); 00809 return rs; 00810 } 00811 } |
|
Performs a write request. This request is broadcasted to all nodes that owns the table to be written.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 174 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestResult. 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeys().
00176 { 00177 execWriteRequest(request, false); 00178 return execWriteRequestResult; 00179 } |
|
Perform a write request and return the auto generated keys.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 190 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest(), と org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeysResult.
00192 { 00193 execWriteRequest(request, true); 00194 return execWriteRequestWithKeysResult; 00195 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 827 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask.getResult().
00828 { 00829 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure( 00830 proc, false); 00831 return task.getResult(); 00832 } |
|
Gets information about the request load balancer.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2_WRR.java の 144 行で定義されています。
|
|
Get the needed query parsing granularity.
AbstractLoadBalancer.java の 151 行で定義されています。 参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.setLoadBalancer().
00152 { 00153 return parsingGranularity; 00154 } |
|
org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2に実装されています. RAIDb2_WRR.java の 157 行で定義されています。
00158 {
00159 return WeightedBalancer.getRaidbXml(
00160 weights,
00161 DatabasesXmlTags.ELT_RAIDb_2_WeightedRoundRobin);
00162 }
|
|
Returns the RAIDbLevel.
AbstractLoadBalancer.java の 131 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.raidbLevel.
00132 { 00133 return raidbLevel; 00134 } |
|
AbstractLoadBalancer.java の 385 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getXmlImpl().
00386 { 00387 StringBuffer info = new StringBuffer(); 00388 info.append("<" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00389 info.append(getXmlImpl()); 00390 info.append("</" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00391 return info.toString(); 00392 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。 RAIDb2.java の 1515 行で定義されています。
01516 { 01517 StringBuffer info = new StringBuffer(); 01518 info.append("<" + DatabasesXmlTags.ELT_RAIDb_2 + " " 01519 + DatabasesXmlTags.ATT_timestampResolution + "=\"" 01520 + timestampResolution + "\" >"); 01521 if (createTablePolicy != null) 01522 info.append(createTablePolicy.getXml()); 01523 if (waitForCompletionPolicy != null) 01524 info.append(waitForCompletionPolicy.getXml()); 01525 this.getRaidb2Xml(); 01526 info.append("</" + DatabasesXmlTags.ELT_RAIDb_2 + ">"); 01527 return info.toString(); 01528 } |
|
Rollbacks a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 1131 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
01132 { 01133 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 01134 waitForAllWritesToComplete(tm.getTransactionId()); 01135 01136 try 01137 { 01138 backendNonBlockingThreadsRWLock.acquireRead(); 01139 } 01140 catch (InterruptedException e) 01141 { 01142 String msg = Translate.get( 01143 "loadbalancer.backendlist.acquire.readlock.failed", e); 01144 logger.error(msg); 01145 throw new SQLException(msg); 01146 } 01147 int nbOfThreads = backendNonBlockingThreads.size(); 01148 ArrayList rollbackList = new ArrayList(); 01149 Long iTid = new Long(tm.getTransactionId()); 01150 01151 // Build the list of backend that need to rollback this transaction 01152 for (int i = 0; i < nbOfThreads; i++) 01153 { 01154 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01155 .get(i); 01156 if (thread.getBackend().isStartedTransaction(iTid)) 01157 rollbackList.add(thread); 01158 } 01159 01160 nbOfThreads = rollbackList.size(); 01161 if (nbOfThreads == 0) 01162 { 01163 backendNonBlockingThreadsRWLock.releaseRead(); 01164 return; 01165 } 01166 01167 // Create the task 01168 RollbackTask task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, 01169 tm.getTimeout(), tm.getLogin(), tm.getTransactionId()); 01170 01171 synchronized (task) 01172 { 01173 // Post the task in each backendThread tasklist and wakeup the threads 01174 for (int i = 0; i < nbOfThreads; i++) 01175 { 01176 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 01177 synchronized (thread) 01178 { 01179 thread.addTask(task); 01180 thread.notify(); 01181 } 01182 } 01183 01184 backendNonBlockingThreadsRWLock.releaseRead(); 01185 01186 // Wait for completion (notified by the task) 01187 try 01188 { 01189 // Wait on task 01190 long timeout = tm.getTimeout(); 01191 if (timeout > 0) 01192 { 01193 long start = System.currentTimeMillis(); 01194 task.wait(timeout); 01195 long end = System.currentTimeMillis(); 01196 long remaining = timeout - (end - start); 01197 if (remaining <= 0) 01198 { 01199 String msg = Translate.get("loadbalancer.rollback.timeout", 01200 new String[]{String.valueOf(tm.getTransactionId()), 01201 String.valueOf(task.getSuccess()), 01202 String.valueOf(task.getFailed())}); 01203 logger.warn(msg); 01204 throw new SQLException(msg); 01205 } 01206 } 01207 else 01208 task.wait(); 01209 } 01210 catch (InterruptedException e) 01211 { 01212 throw new SQLException(Translate.get("loadbalancer.rollback.timeout", 01213 new String[]{String.valueOf(tm.getTransactionId()), 01214 String.valueOf(task.getSuccess()), 01215 String.valueOf(task.getFailed())})); 01216 } 01217 01218 if (task.getSuccess() > 0) 01219 return; 01220 else 01221 { // All tasks failed 01222 ArrayList exceptions = task.getExceptions(); 01223 if (exceptions == null) 01224 throw new SQLException(Translate.get( 01225 "loadbalancer.rollback.all.failed", tm.getTransactionId())); 01226 else 01227 { 01228 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 01229 tm.getTransactionId()) 01230 + "\n"; 01231 for (int i = 0; i < exceptions.size(); i++) 01232 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01233 logger.error(errorMsg); 01234 throw new SQLException(errorMsg); 01235 } 01236 } 01237 } 01238 } |
|
Set the needed query parsing granularity.
AbstractLoadBalancer.java の 161 行で定義されています。
00162 {
00163 this.parsingGranularity = parsingGranularity;
00164 }
|
|
Sets the RAIDbLevel.
AbstractLoadBalancer.java の 141 行で定義されています。
00142 {
00143 this.raidbLevel = raidbLevel;
00144 }
|
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerを再定義しています。 RAIDb2_WRR.java の 130 行で定義されています。
00131 { 00132 throw new SQLException("Weight is not supported with this load balancer"); 00133 } |
|
Waits for all writes in the blocking thread queue of the given backend to complete.
RAIDb2.java の 1312 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01314 { 01315 try 01316 { 01317 backendBlockingThreadsRWLock.acquireRead(); 01318 } 01319 catch (InterruptedException e) 01320 { 01321 String msg = Translate.get( 01322 "loadbalancer.backendlist.acquire.readlock.failed", e); 01323 logger.error(msg); 01324 throw new SQLException(msg); 01325 } 01326 01327 int nbOfThreads = backendBlockingThreads.size(); 01328 01329 for (int i = 0; i < nbOfThreads; i++) 01330 { 01331 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01332 .get(i); 01333 if (thread.getBackend() == backend) 01334 thread.waitForAllTasksToComplete(); 01335 } 01336 01337 backendBlockingThreadsRWLock.releaseRead(); 01338 } |
|
Waits for all writes of the given transaction in the blocking thread queue of the given backend to complete before being able to complete the transaction.
RAIDb2.java の 1278 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01280 { 01281 try 01282 { 01283 backendBlockingThreadsRWLock.acquireRead(); 01284 } 01285 catch (InterruptedException e) 01286 { 01287 String msg = Translate.get( 01288 "loadbalancer.backendlist.acquire.readlock.failed", e); 01289 logger.error(msg); 01290 throw new SQLException(msg); 01291 } 01292 01293 int nbOfThreads = backendBlockingThreads.size(); 01294 01295 for (int i = 0; i < nbOfThreads; i++) 01296 { 01297 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01298 .get(i); 01299 if (thread.getBackend() == backend) 01300 thread.waitForAllTasksToComplete(transactionId); 01301 } 01302 01303 backendBlockingThreadsRWLock.releaseRead(); 01304 } |
|
Waits for all writes of the given transaction in the blocking thread queue to complete before being able to complete the transaction. RAIDb2.java の 1244 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete(). 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest().
01246 { 01247 try 01248 { 01249 backendBlockingThreadsRWLock.acquireRead(); 01250 } 01251 catch (InterruptedException e) 01252 { 01253 String msg = Translate.get( 01254 "loadbalancer.backendlist.acquire.readlock.failed", e); 01255 logger.error(msg); 01256 throw new SQLException(msg); 01257 } 01258 01259 int nbOfThreads = backendBlockingThreads.size(); 01260 01261 for (int i = 0; i < nbOfThreads; i++) 01262 { 01263 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01264 .get(i); 01265 thread.waitForAllTasksToComplete(transactionId); 01266 } 01267 01268 backendBlockingThreadsRWLock.releaseRead(); 01269 } |
|
RAIDb2.java の 87 行で定義されています。 |
|
RAIDb2.java の 89 行で定義されています。 |
|
RAIDb2.java の 88 行で定義されています。 |
|
RAIDb2.java の 90 行で定義されています。 |
|
RAIDb2.java の 93 行で定義されています。 |
|
RAIDb2_WRR.java の 59 行で定義されています。 |
|
初期値: Trace
.getLogger("org.objectweb.cjdbc.controller.loadbalancer.raidb2")
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerを再定義しています。 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。 RAIDb2.java の 96 行で定義されています。 |
|
AbstractLoadBalancer.java の 74 行で定義されています。 |
|
|
AbstractLoadBalancer.java の 72 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.AbstractLoadBalancer(). |
|
RAIDb2.java の 92 行で定義されています。 |
|
RAIDb2_WRR.java の 58 行で定義されています。 |