This load balancer tolerates byzantine failures of databases. The read requests coming from the request manager are sent to multiple backend nodes and the results are compared. Write requests are broadcasted to all backends.
RAIDb2ec_WRR.java の 53 行で定義されています。
|
Creates a new RAIDb-2 weighted round robin with error checking request load balancer.
RAIDb2ec_WRR.java の 82 行で定義されています。
00090 { 00091 super( 00092 vdb, 00093 waitForCompletionPolicy, 00094 createTablePolicy, 00095 errorCheckingPolicy, 00096 nbOfConcurrentReads, 00097 timestampResolution); 00098 index = -1; 00099 } |
|
Begins a new transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 1005 行で定義されています。
01006 { 01007 } |
|
Commits a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 1015 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
01016 { 01017 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 01018 waitForAllWritesToComplete(tm.getTransactionId()); 01019 01020 try 01021 { 01022 backendNonBlockingThreadsRWLock.acquireRead(); 01023 } 01024 catch (InterruptedException e) 01025 { 01026 String msg = Translate.get( 01027 "loadbalancer.backendlist.acquire.readlock.failed", e); 01028 logger.error(msg); 01029 throw new SQLException(msg); 01030 } 01031 01032 int nbOfThreads = backendNonBlockingThreads.size(); 01033 ArrayList commitList = new ArrayList(); 01034 Long iTid = new Long(tm.getTransactionId()); 01035 01036 // Build the list of backend that need to commit this transaction 01037 for (int i = 0; i < nbOfThreads; i++) 01038 { 01039 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01040 .get(i); 01041 if (thread.getBackend().isStartedTransaction(iTid)) 01042 commitList.add(thread); 01043 } 01044 01045 nbOfThreads = commitList.size(); 01046 if (nbOfThreads == 0) 01047 { 01048 backendNonBlockingThreadsRWLock.releaseRead(); 01049 return; 01050 } 01051 01052 // Create the task 01053 CommitTask task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm 01054 .getTimeout(), tm.getLogin(), tm.getTransactionId()); 01055 01056 synchronized (task) 01057 { 01058 // Post the task in each backendThread tasklist and wakeup the threads 01059 for (int i = 0; i < nbOfThreads; i++) 01060 { 01061 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 01062 synchronized (thread) 01063 { 01064 thread.addTask(task); 01065 thread.notify(); 01066 } 01067 } 01068 01069 backendNonBlockingThreadsRWLock.releaseRead(); 01070 01071 // Wait for completion (notified by the task) 01072 try 01073 { 01074 // Wait on task 01075 long timeout = tm.getTimeout(); 01076 if (timeout > 0) 01077 { 01078 long start = System.currentTimeMillis(); 01079 task.wait(timeout); 01080 long end = System.currentTimeMillis(); 01081 long remaining = timeout - (end - start); 01082 if (remaining <= 0) 01083 { 01084 String msg = Translate.get("loadbalancer.commit.timeout", 01085 new String[]{String.valueOf(tm.getTransactionId()), 01086 String.valueOf(task.getSuccess()), 01087 String.valueOf(task.getFailed())}); 01088 logger.warn(msg); 01089 throw new SQLException(msg); 01090 } 01091 } 01092 else 01093 task.wait(); 01094 } 01095 catch (InterruptedException e) 01096 { 01097 throw new SQLException(Translate.get("loadbalancer.commit.timeout", 01098 new String[]{String.valueOf(tm.getTransactionId()), 01099 String.valueOf(task.getSuccess()), 01100 String.valueOf(task.getFailed())})); 01101 } 01102 01103 if (task.getSuccess() > 0) 01104 return; 01105 else 01106 { // All tasks failed 01107 ArrayList exceptions = task.getExceptions(); 01108 if (exceptions == null) 01109 throw new SQLException(Translate.get( 01110 "loadbalancer.commit.all.failed", tm.getTransactionId())); 01111 else 01112 { 01113 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 01114 tm.getTransactionId()) 01115 + "\n"; 01116 for (int i = 0; i < exceptions.size(); i++) 01117 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01118 logger.error(errorMsg); 01119 throw new SQLException(errorMsg); 01120 } 01121 } 01122 } 01123 } |
|
Disables a backend that was previously enabled. Ask the corresponding connection manager to finalize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2を再定義しています。 RAIDb2ec.java の 174 行で定義されています。 参照先 org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.acquireWrite(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.equals(), org.objectweb.cjdbc.common.log.Trace.error(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.common.log.Trace.info(), と org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.releaseWrite().
00176 { 00177 int nbOfThreads = backendBlockingThreads.size(); 00178 00179 // Find the right blocking thread 00180 for (int i = 0; i < nbOfThreads; i++) 00181 { 00182 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 00183 .get(i); 00184 if (thread.getBackend().equals(db)) 00185 { 00186 logger.info(Translate.get( 00187 "loadbalancer.backend.workerthread.blocking.remove", db.getName())); 00188 00189 // Remove it from the backendBlockingThread list 00190 try 00191 { 00192 backendBlockingThreadsRWLock.acquireWrite(); 00193 } 00194 catch (InterruptedException e) 00195 { 00196 String msg = Translate.get( 00197 "loadbalancer.backendlist.acquire.writelock.failed", e); 00198 logger.error(msg); 00199 throw new SQLException(msg); 00200 } 00201 backendBlockingThreads.remove(thread); 00202 backendBlockingThreadsRWLock.releaseWrite(); 00203 00204 synchronized (thread) 00205 { 00206 // Kill the thread 00207 thread.addPriorityTask(new KillThreadTask(1, 1)); 00208 thread.notify(); 00209 } 00210 break; 00211 } 00212 } 00213 00214 // Find the right non-blocking thread 00215 nbOfThreads = backendNonBlockingThreads.size(); 00216 for (int i = 0; i < nbOfThreads; i++) 00217 { 00218 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 00219 .get(i); 00220 if (thread.getBackend().equals(db)) 00221 { 00222 logger.info(Translate.get( 00223 "loadbalancer.backend.workerthread.non.blocking.remove", db 00224 .getName())); 00225 00226 // Remove it from the backendNonBlockingThreads list 00227 try 00228 { 00229 backendNonBlockingThreadsRWLock.acquireWrite(); 00230 } 00231 catch (InterruptedException e) 00232 { 00233 String msg = Translate.get( 00234 "loadbalancer.backendlist.acquire.writelock.failed", e); 00235 logger.error(msg); 00236 throw new SQLException(msg); 00237 } 00238 backendNonBlockingThreads.remove(thread); 00239 backendNonBlockingThreadsRWLock.releaseWrite(); 00240 00241 synchronized (thread) 00242 { 00243 // Kill the thread 00244 thread.addPriorityTask(new KillThreadTask(1, 1)); 00245 thread.notify(); 00246 } 00247 break; 00248 } 00249 } 00250 00251 db.disable(); 00252 if (db.isInitialized()) 00253 db.finalizeConnections(); 00254 } |
|
Enables a backend that was previously disabled. Ask the corresponding connection manager to initialize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2を再定義しています。 RAIDb2ec.java の 111 行で定義されています。 参照先 org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.acquireWrite(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.enableRead(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.enableWrite(), org.objectweb.cjdbc.common.log.Trace.error(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.getName(), org.objectweb.cjdbc.common.log.Trace.info(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.initializeConnections(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.isInitialized(), と org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.releaseWrite().
00113 { 00114 // Create 2 worker threads for writes 00115 BackendWorkerThread blockingThread = new BackendWorkerThread( 00116 (DatabaseBackend) db, this); 00117 BackendWorkerThread nonBlockingThread = new BackendWorkerThread( 00118 (DatabaseBackend) db, this); 00119 00120 // Add first to the blocking thread list 00121 try 00122 { 00123 backendBlockingThreadsRWLock.acquireWrite(); 00124 } 00125 catch (InterruptedException e) 00126 { 00127 String msg = Translate.get( 00128 "loadbalancer.backendlist.acquire.writelock.failed", e); 00129 logger.error(msg); 00130 throw new SQLException(msg); 00131 } 00132 backendBlockingThreads.add(blockingThread); 00133 backendBlockingThreadsRWLock.releaseWrite(); 00134 blockingThread.start(); 00135 logger.info(Translate.get("loadbalancer.backend.workerthread.blocking.add", 00136 db.getName())); 00137 00138 // Then add to the non-blocking thread list 00139 try 00140 { 00141 backendNonBlockingThreadsRWLock.acquireWrite(); 00142 } 00143 catch (InterruptedException e) 00144 { 00145 String msg = Translate.get( 00146 "loadbalancer.backendlist.acquire.writelock.failed", e); 00147 logger.error(msg); 00148 throw new SQLException(msg); 00149 } 00150 backendNonBlockingThreads.add(nonBlockingThread); 00151 backendNonBlockingThreadsRWLock.releaseWrite(); 00152 nonBlockingThread.start(); 00153 logger.info(Translate.get( 00154 "loadbalancer.backend.workerthread.non.blocking.add", db.getName())); 00155 00156 if (!db.isInitialized()) 00157 db.initializeConnections(); 00158 db.enableRead(); 00159 if (writeEnabled) 00160 db.enableWrite(); 00161 } |
|
Not implemented.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2ec_WRR.java の 127 行で定義されています。
00129 { 00130 throw new NotImplementedException( 00131 this.getClass().getName() + ":execReadStoredProcedure"); 00132 } |
|
Performs a read request. It is up to the implementation to choose to which backend node(s) this request should be sent.
org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2に実装されています. RAIDb2ec_WRR.java の 115 行で定義されています。
00117 { 00118 throw new NotImplementedException( 00119 this.getClass().getName() + ":execReadRequest"); 00120 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 816 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask.getResult().
00818 { 00819 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure( 00820 proc, true); 00821 return task.getResult(); 00822 } |
|
Execute a read request on the selected backend.
RAIDb2.java の 492 行で定義されています。
00494 { 00495 // Handle macros 00496 request.setSQL(RequestManager.handleSQLMacros(request.getSQL(), 00497 timestampResolution, false)); 00498 00499 // Ok, we have a backend, let's execute the request 00500 AbstractConnectionManager cm = backend.getConnectionManager(request 00501 .getLogin()); 00502 00503 // Sanity check 00504 if (cm == null) 00505 { 00506 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00507 new String[]{request.getLogin(), backend.getName()}); 00508 logger.error(msg); 00509 throw new SQLException(msg); 00510 } 00511 00512 // Execute the query 00513 if (request.isAutoCommit()) 00514 { 00515 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00516 // We could do something finer grain here by waiting 00517 // only for writes that depend on the tables we need 00518 // but is that really worth the overhead ? 00519 waitForAllWritesToComplete(backend); 00520 00521 ResultSet rs = null; 00522 boolean badConnection; 00523 do 00524 { 00525 badConnection = false; 00526 // Use a connection just for this request 00527 Connection c = null; 00528 try 00529 { 00530 c = cm.getConnection(); 00531 } 00532 catch (UnreachableBackendException e1) 00533 { 00534 logger.error(Translate.get( 00535 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00536 backend.disable(); 00537 throw new UnreachableBackendException(Translate.get( 00538 "loadbalancer.backend.unreacheable", backend.getName())); 00539 } 00540 00541 // Sanity check 00542 if (c == null) 00543 throw new UnreachableBackendException( 00544 "No more connections on backend " + backend.getName()); 00545 00546 // Execute Query 00547 try 00548 { 00549 rs = executeStatementOnBackend(request, backend, c); 00550 cm.releaseConnection(c); 00551 } 00552 catch (SQLException e) 00553 { 00554 cm.releaseConnection(c); 00555 throw new SQLException(Translate.get( 00556 "loadbalancer.request.failed.on.backend", new String[]{ 00557 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00558 backend.getName(), e.getMessage()})); 00559 } 00560 catch (BadConnectionException e) 00561 { // Get rid of the bad connection 00562 cm.deleteConnection(c); 00563 badConnection = true; 00564 } 00565 } 00566 while (badConnection); 00567 if (logger.isDebugEnabled()) 00568 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00569 String.valueOf(request.getId()), backend.getName()})); 00570 return rs; 00571 } 00572 else 00573 { // Inside a transaction 00574 Connection c; 00575 long tid = request.getTransactionId(); 00576 Long lTid = new Long(tid); 00577 00578 // Wait for previous writes to complete 00579 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00580 waitForAllWritesToComplete(backend, request.getTransactionId()); 00581 00582 if (!backend.isStartedTransaction(lTid)) 00583 { // transaction has not been started yet on this backend 00584 try 00585 { 00586 c = cm.getConnection(tid); 00587 } 00588 catch (UnreachableBackendException e1) 00589 { 00590 logger.error(Translate.get( 00591 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00592 backend.disable(); 00593 throw new UnreachableBackendException(Translate.get( 00594 "loadbalancer.backend.unreacheable", backend.getName())); 00595 } 00596 00597 // Sanity check 00598 if (c == null) 00599 throw new SQLException(Translate.get( 00600 "loadbalancer.unable.get.connection", new String[]{ 00601 String.valueOf(tid), backend.getName()})); 00602 00603 // begin transaction 00604 backend.startTransaction(lTid); 00605 c.setAutoCommit(false); 00606 } 00607 else 00608 { // Re-use the connection used by this transaction 00609 c = cm.retrieveConnection(tid); 00610 00611 // Sanity check 00612 if (c == null) 00613 throw new SQLException(Translate.get( 00614 "loadbalancer.unable.retrieve.connection", new String[]{ 00615 String.valueOf(tid), backend.getName()})); 00616 } 00617 00618 // Execute Query 00619 ResultSet rs = null; 00620 try 00621 { 00622 rs = executeStatementOnBackend(request, backend, c); 00623 } 00624 catch (SQLException e) 00625 { 00626 throw new SQLException(Translate.get( 00627 "loadbalancer.request.failed.on.backend", new String[]{ 00628 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00629 backend.getName(), e.getMessage()})); 00630 } 00631 catch (BadConnectionException e) 00632 { // Connection failed, so did the transaction 00633 // Disable the backend. 00634 cm.deleteConnection(tid); 00635 String msg = Translate.get( 00636 "loadbalancer.backend.disabling.connection.failure", backend 00637 .getName()); 00638 logger.error(msg); 00639 backend.disable(); 00640 throw new SQLException(msg); 00641 } 00642 if (logger.isDebugEnabled()) 00643 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00644 new String[]{String.valueOf(tid), String.valueOf(request.getId()), 00645 backend.getName()})); 00646 return rs; 00647 } 00648 } |
|
Execute a statement on a backend. If the execution fails, the connection is checked for validity. If the connection was not valid, the query is automatically retried on a new connection.
AbstractLoadBalancer.java の 251 行で定義されています。 参照先 java.sql.Statement.executeQuery(), java.sql.Statement.setCursorName(), java.sql.Statement.setFetchSize(), java.sql.Statement.setMaxRows(), java.sql.Statement.setQueryTimeout(), org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetCursorName, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetFetchSize, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetMaxRows, と org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetQueryTimeout. 参照元 org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDB.execReadRequest().
00254 { 00255 ResultSet rs = null; 00256 try 00257 { 00258 backend.addPendingReadRequest(request); 00259 String sql = request.getSQL(); 00260 // Rewrite the query if needed 00261 sql = backend.rewriteQuery(sql); 00262 // Execute the query 00263 Statement s = c.createStatement(); 00264 DriverCompliance driverCompliance = backend.getDriverCompliance(); 00265 if (driverCompliance.supportSetQueryTimeout()) 00266 s.setQueryTimeout(request.getTimeout()); 00267 if ((request.getCursorName() != null) 00268 && (driverCompliance.supportSetCursorName())) 00269 s.setCursorName(request.getCursorName()); 00270 if ((request.getFetchSize() != 0) 00271 && driverCompliance.supportSetFetchSize()) 00272 s.setFetchSize(request.getFetchSize()); 00273 if ((request.getMaxRows() > 0) && driverCompliance.supportSetMaxRows()) 00274 s.setMaxRows(request.getMaxRows()); 00275 rs = s.executeQuery(sql); 00276 } 00277 catch (SQLException e) 00278 { // Something bad happened 00279 if (backend.isValidConnection(c)) 00280 throw e; // Connection is valid, throw the exception 00281 else 00282 throw new BadConnectionException(); 00283 } 00284 finally 00285 { 00286 backend.removePendingRequest(request); 00287 } 00288 return rs; 00289 } |
|
Execute a stored procedure on the selected backend.
RAIDb2.java の 658 行で定義されています。
00661 { 00662 // Handle macros 00663 proc.setSQL(RequestManager.handleSQLMacros(proc.getSQL(), 00664 timestampResolution, false)); 00665 00666 // Ok, we have a backend, let's execute the request 00667 AbstractConnectionManager cm = backend 00668 .getConnectionManager(proc.getLogin()); 00669 00670 // Sanity check 00671 if (cm == null) 00672 { 00673 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00674 new String[]{proc.getLogin(), backend.getName()}); 00675 logger.error(msg); 00676 throw new SQLException(msg); 00677 } 00678 00679 // Execute the query 00680 if (proc.isAutoCommit()) 00681 { 00682 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00683 // We could do something finer grain here by waiting 00684 // only for writes that depend on the tables we need 00685 // but is that really worth the overhead ? 00686 waitForAllWritesToComplete(backend); 00687 00688 // Use a connection just for this request 00689 Connection c = null; 00690 try 00691 { 00692 c = cm.getConnection(); 00693 } 00694 catch (UnreachableBackendException e1) 00695 { 00696 logger.error(Translate.get( 00697 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00698 backend.disable(); 00699 throw new UnreachableBackendException(Translate.get( 00700 "loadbalancer.backend.unreacheable", backend.getName())); 00701 } 00702 00703 // Sanity check 00704 if (c == null) 00705 throw new SQLException(Translate.get( 00706 "loadbalancer.backend.no.connection", backend.getName())); 00707 00708 // Execute Query 00709 ResultSet rs = null; 00710 try 00711 { 00712 // We suppose here that the request does not modify the schema since 00713 // it is a read-only query. 00714 CallableStatement cs = c.prepareCall(proc.getSQL()); 00715 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00716 cs.setQueryTimeout(proc.getTimeout()); 00717 if ((proc.getMaxRows() > 0) 00718 && backend.getDriverCompliance().supportSetMaxRows()) 00719 cs.setMaxRows(proc.getMaxRows()); 00720 rs = cs.executeQuery(); 00721 } 00722 catch (SQLException e) 00723 { 00724 throw new SQLException(Translate.get( 00725 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00726 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00727 backend.getName(), e.getMessage()})); 00728 } 00729 finally 00730 { 00731 cm.releaseConnection(c); 00732 } 00733 if (logger.isDebugEnabled()) 00734 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 00735 new String[]{String.valueOf(proc.getId()), backend.getName()})); 00736 return rs; 00737 } 00738 else 00739 { // Inside a transaction 00740 Connection c; 00741 long tid = proc.getTransactionId(); 00742 Long lTid = new Long(tid); 00743 00744 // Wait for previous writes to complete 00745 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00746 waitForAllWritesToComplete(backend, proc.getTransactionId()); 00747 00748 if (!backend.isStartedTransaction(lTid)) 00749 { // transaction has not been started yet on this backend 00750 try 00751 { 00752 c = cm.getConnection(tid); 00753 } 00754 catch (UnreachableBackendException e1) 00755 { 00756 logger.error(Translate.get( 00757 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00758 backend.disable(); 00759 throw new SQLException(Translate.get( 00760 "loadbalancer.backend.unreacheable", backend.getName())); 00761 } 00762 00763 // Sanity check 00764 if (c == null) 00765 throw new SQLException(Translate.get( 00766 "loadbalancer.unable.get.connection", new String[]{ 00767 String.valueOf(tid), backend.getName()})); 00768 00769 // begin transaction 00770 backend.startTransaction(lTid); 00771 c.setAutoCommit(false); 00772 } 00773 else 00774 { // Re-use the connection used by this transaction 00775 c = cm.retrieveConnection(tid); 00776 00777 // Sanity check 00778 if (c == null) 00779 throw new SQLException(Translate.get( 00780 "loadbalancer.unable.retrieve.connection", new String[]{ 00781 String.valueOf(tid), backend.getName()})); 00782 } 00783 00784 // Execute Query 00785 ResultSet rs; 00786 try 00787 { 00788 // We suppose here that the request does not modify the schema since 00789 // it is a read-only query. 00790 CallableStatement cs = c.prepareCall(proc.getSQL()); 00791 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00792 cs.setQueryTimeout(proc.getTimeout()); 00793 if ((proc.getMaxRows() > 0) 00794 && backend.getDriverCompliance().supportSetMaxRows()) 00795 cs.setMaxRows(proc.getMaxRows()); 00796 rs = cs.executeQuery(); 00797 } 00798 catch (SQLException e) 00799 { 00800 throw new SQLException(Translate.get( 00801 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00802 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00803 backend.getName(), e.getMessage()})); 00804 } 00805 if (logger.isDebugEnabled()) 00806 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00807 new String[]{String.valueOf(tid), String.valueOf(proc.getId()), 00808 backend.getName()})); 00809 return rs; 00810 } 00811 } |
|
Performs a write request. This request is broadcasted to all nodes that owns the table to be written.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 174 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestResult. 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeys().
00176 { 00177 execWriteRequest(request, false); 00178 return execWriteRequestResult; 00179 } |
|
Perform a write request and return the auto generated keys.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 190 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest(), と org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeysResult.
00192 { 00193 execWriteRequest(request, true); 00194 return execWriteRequestWithKeysResult; 00195 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 827 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask.getResult().
00828 { 00829 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure( 00830 proc, false); 00831 return task.getResult(); 00832 } |
|
Gets information about the request load balancer.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2ec_WRR.java の 156 行で定義されています。
00157 { 00158 if (backends == null) 00159 return "RAIDb-2 Error Checking with Weighted Round Robin Request load balancer: " 00160 + "!!!Warning!!! No backend nodes found\n"; 00161 else 00162 return "RAIDb-2 Error Checking with Weighted Round Robin Request load balancer balancing over " 00163 + backends.size() 00164 + " nodes\n"; 00165 } |
|
Get the needed query parsing granularity.
AbstractLoadBalancer.java の 151 行で定義されています。 参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.setLoadBalancer().
00152 { 00153 return parsingGranularity; 00154 } |
|
org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2に実装されています. RAIDb2ec_WRR.java の 169 行で定義されています。
00170 {
00171 return WeightedBalancer.getRaidbXml(
00172 backends,
00173 DatabasesXmlTags.ELT_RAIDb_2ec_WeightedRoundRobin);
00174 }
|
|
Returns the RAIDbLevel.
AbstractLoadBalancer.java の 131 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.raidbLevel.
00132 { 00133 return raidbLevel; 00134 } |
|
AbstractLoadBalancer.java の 385 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getXmlImpl().
00386 { 00387 StringBuffer info = new StringBuffer(); 00388 info.append("<" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00389 info.append(getXmlImpl()); 00390 info.append("</" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00391 return info.toString(); 00392 } |
|
org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2を再定義しています。 RAIDb2ec.java の 259 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy.getXml().
00260 { 00261 StringBuffer info = new StringBuffer(); 00262 info.append("<" + DatabasesXmlTags.ELT_RAIDb_2ec + " " 00263 + DatabasesXmlTags.ATT_nbOfConcurrentReads + "=\"" 00264 + this.nbOfConcurrentReads + "\">"); 00265 this.getRaidb2Xml(); 00266 if (waitForCompletionPolicy != null) 00267 info.append(waitForCompletionPolicy.getXml()); 00268 info.append("</" + DatabasesXmlTags.ELT_RAIDb_2ec + ">"); 00269 return info.toString(); 00270 } |
|
Rollbacks a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 1131 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
01132 { 01133 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 01134 waitForAllWritesToComplete(tm.getTransactionId()); 01135 01136 try 01137 { 01138 backendNonBlockingThreadsRWLock.acquireRead(); 01139 } 01140 catch (InterruptedException e) 01141 { 01142 String msg = Translate.get( 01143 "loadbalancer.backendlist.acquire.readlock.failed", e); 01144 logger.error(msg); 01145 throw new SQLException(msg); 01146 } 01147 int nbOfThreads = backendNonBlockingThreads.size(); 01148 ArrayList rollbackList = new ArrayList(); 01149 Long iTid = new Long(tm.getTransactionId()); 01150 01151 // Build the list of backend that need to rollback this transaction 01152 for (int i = 0; i < nbOfThreads; i++) 01153 { 01154 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01155 .get(i); 01156 if (thread.getBackend().isStartedTransaction(iTid)) 01157 rollbackList.add(thread); 01158 } 01159 01160 nbOfThreads = rollbackList.size(); 01161 if (nbOfThreads == 0) 01162 { 01163 backendNonBlockingThreadsRWLock.releaseRead(); 01164 return; 01165 } 01166 01167 // Create the task 01168 RollbackTask task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, 01169 tm.getTimeout(), tm.getLogin(), tm.getTransactionId()); 01170 01171 synchronized (task) 01172 { 01173 // Post the task in each backendThread tasklist and wakeup the threads 01174 for (int i = 0; i < nbOfThreads; i++) 01175 { 01176 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 01177 synchronized (thread) 01178 { 01179 thread.addTask(task); 01180 thread.notify(); 01181 } 01182 } 01183 01184 backendNonBlockingThreadsRWLock.releaseRead(); 01185 01186 // Wait for completion (notified by the task) 01187 try 01188 { 01189 // Wait on task 01190 long timeout = tm.getTimeout(); 01191 if (timeout > 0) 01192 { 01193 long start = System.currentTimeMillis(); 01194 task.wait(timeout); 01195 long end = System.currentTimeMillis(); 01196 long remaining = timeout - (end - start); 01197 if (remaining <= 0) 01198 { 01199 String msg = Translate.get("loadbalancer.rollback.timeout", 01200 new String[]{String.valueOf(tm.getTransactionId()), 01201 String.valueOf(task.getSuccess()), 01202 String.valueOf(task.getFailed())}); 01203 logger.warn(msg); 01204 throw new SQLException(msg); 01205 } 01206 } 01207 else 01208 task.wait(); 01209 } 01210 catch (InterruptedException e) 01211 { 01212 throw new SQLException(Translate.get("loadbalancer.rollback.timeout", 01213 new String[]{String.valueOf(tm.getTransactionId()), 01214 String.valueOf(task.getSuccess()), 01215 String.valueOf(task.getFailed())})); 01216 } 01217 01218 if (task.getSuccess() > 0) 01219 return; 01220 else 01221 { // All tasks failed 01222 ArrayList exceptions = task.getExceptions(); 01223 if (exceptions == null) 01224 throw new SQLException(Translate.get( 01225 "loadbalancer.rollback.all.failed", tm.getTransactionId())); 01226 else 01227 { 01228 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 01229 tm.getTransactionId()) 01230 + "\n"; 01231 for (int i = 0; i < exceptions.size(); i++) 01232 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01233 logger.error(errorMsg); 01234 throw new SQLException(errorMsg); 01235 } 01236 } 01237 } 01238 } |
|
Set the needed query parsing granularity.
AbstractLoadBalancer.java の 161 行で定義されています。
00162 {
00163 this.parsingGranularity = parsingGranularity;
00164 }
|
|
Sets the RAIDbLevel.
AbstractLoadBalancer.java の 141 行で定義されています。
00142 {
00143 this.raidbLevel = raidbLevel;
00144 }
|
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerを再定義しています。 RAIDb2ec_WRR.java の 142 行で定義されています。
00143 { 00144 throw new SQLException("Weight is not supported with this load balancer"); 00145 } |
|
Waits for all writes in the blocking thread queue of the given backend to complete.
RAIDb2.java の 1312 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01314 { 01315 try 01316 { 01317 backendBlockingThreadsRWLock.acquireRead(); 01318 } 01319 catch (InterruptedException e) 01320 { 01321 String msg = Translate.get( 01322 "loadbalancer.backendlist.acquire.readlock.failed", e); 01323 logger.error(msg); 01324 throw new SQLException(msg); 01325 } 01326 01327 int nbOfThreads = backendBlockingThreads.size(); 01328 01329 for (int i = 0; i < nbOfThreads; i++) 01330 { 01331 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01332 .get(i); 01333 if (thread.getBackend() == backend) 01334 thread.waitForAllTasksToComplete(); 01335 } 01336 01337 backendBlockingThreadsRWLock.releaseRead(); 01338 } |
|
Waits for all writes of the given transaction in the blocking thread queue of the given backend to complete before being able to complete the transaction.
RAIDb2.java の 1278 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01280 { 01281 try 01282 { 01283 backendBlockingThreadsRWLock.acquireRead(); 01284 } 01285 catch (InterruptedException e) 01286 { 01287 String msg = Translate.get( 01288 "loadbalancer.backendlist.acquire.readlock.failed", e); 01289 logger.error(msg); 01290 throw new SQLException(msg); 01291 } 01292 01293 int nbOfThreads = backendBlockingThreads.size(); 01294 01295 for (int i = 0; i < nbOfThreads; i++) 01296 { 01297 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01298 .get(i); 01299 if (thread.getBackend() == backend) 01300 thread.waitForAllTasksToComplete(transactionId); 01301 } 01302 01303 backendBlockingThreadsRWLock.releaseRead(); 01304 } |
|
Waits for all writes of the given transaction in the blocking thread queue to complete before being able to complete the transaction. RAIDb2.java の 1244 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete(). 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest().
01246 { 01247 try 01248 { 01249 backendBlockingThreadsRWLock.acquireRead(); 01250 } 01251 catch (InterruptedException e) 01252 { 01253 String msg = Translate.get( 01254 "loadbalancer.backendlist.acquire.readlock.failed", e); 01255 logger.error(msg); 01256 throw new SQLException(msg); 01257 } 01258 01259 int nbOfThreads = backendBlockingThreads.size(); 01260 01261 for (int i = 0; i < nbOfThreads; i++) 01262 { 01263 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01264 .get(i); 01265 thread.waitForAllTasksToComplete(transactionId); 01266 } 01267 01268 backendBlockingThreadsRWLock.releaseRead(); 01269 } |
|
RAIDb2.java の 87 行で定義されています。 |
|
RAIDb2.java の 89 行で定義されています。 |
|
RAIDb2.java の 88 行で定義されています。 |
|
RAIDb2.java の 90 行で定義されています。 |
|
RAIDb2ec.java の 58 行で定義されています。 |
|
RAIDb2ec_WRR.java の 61 行で定義されています。 |
|
RAIDb2.java の 93 行で定義されています。 |
|
RAIDb2ec.java の 60 行で定義されています。 |
|
RAIDb2ec_WRR.java の 62 行で定義されています。 |
|
初期値: Trace
.getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb2ec")
org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2を再定義しています。 RAIDb2ec.java の 62 行で定義されています。 |
|
RAIDb2ec.java の 59 行で定義されています。 |
|
AbstractLoadBalancer.java の 74 行で定義されています。 |
|
|
AbstractLoadBalancer.java の 72 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.AbstractLoadBalancer(). |
|
RAIDb2.java の 92 行で定義されています。 |