This class is an abstract call because the read requests coming from the request manager are NOT treated here but in the subclasses. Transaction management and write requests are broadcasted to all backends.
RAIDb1ec.java の 50 行で定義されています。
|
Creates a new RAIDb-1 Round Robin request load balancer. A new backend worker thread is created for each backend.
RAIDb1ec.java の 80 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec.backendReadThreads, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec.errorCheckingPolicy, と org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec.nbOfConcurrentReads.
00084 { 00085 super(vdb, waitForCompletionPolicy, timestampResolution); 00086 backendReadThreads = new ArrayList(); 00087 this.errorCheckingPolicy = errorCheckingPolicy; 00088 this.nbOfConcurrentReads = nbOfConcurrentReads; 00089 } |
|
Begins a new transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 910 行で定義されています。
00911 { 00912 } |
|
Commits a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 920 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
00921 { 00922 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00923 waitForAllWritesToComplete(tm.getTransactionId()); 00924 00925 try 00926 { 00927 backendNonBlockingThreadsRWLock.acquireRead(); 00928 } 00929 catch (InterruptedException e) 00930 { 00931 String msg = Translate.get( 00932 "loadbalancer.backendlist.acquire.readlock.failed", e); 00933 logger.error(msg); 00934 throw new SQLException(msg); 00935 } 00936 00937 int nbOfThreads = backendNonBlockingThreads.size(); 00938 ArrayList commitList = new ArrayList(); 00939 Long lTid = new Long(tm.getTransactionId()); 00940 00941 // Build the list of backend that need to commit this transaction 00942 for (int i = 0; i < nbOfThreads; i++) 00943 { 00944 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 00945 .get(i); 00946 if (thread.getBackend().isStartedTransaction(lTid)) 00947 commitList.add(thread); 00948 } 00949 00950 nbOfThreads = commitList.size(); 00951 if (nbOfThreads == 0) 00952 { 00953 backendNonBlockingThreadsRWLock.releaseRead(); 00954 return; 00955 } 00956 00957 // Create the task 00958 CommitTask task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm 00959 .getTimeout(), tm.getLogin(), tm.getTransactionId()); 00960 00961 synchronized (task) 00962 { 00963 // Post the task in each backendThread tasklist and wakeup the threads 00964 for (int i = 0; i < nbOfThreads; i++) 00965 { 00966 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 00967 synchronized (thread) 00968 { 00969 thread.addTask(task); 00970 thread.notify(); 00971 } 00972 } 00973 00974 backendNonBlockingThreadsRWLock.releaseRead(); 00975 00976 // Wait for completion (notified by the task) 00977 try 00978 { 00979 // Wait on task 00980 long timeout = tm.getTimeout(); 00981 if (timeout > 0) 00982 { 00983 long start = System.currentTimeMillis(); 00984 task.wait(timeout); 00985 long end = System.currentTimeMillis(); 00986 long remaining = timeout - (end - start); 00987 if (remaining <= 0) 00988 { 00989 String msg = Translate.get("loadbalancer.commit.timeout", 00990 new String[]{String.valueOf(tm.getTransactionId()), 00991 String.valueOf(task.getSuccess()), 00992 String.valueOf(task.getFailed())}); 00993 logger.warn(msg); 00994 throw new SQLException(msg); 00995 } 00996 } 00997 else 00998 task.wait(); 00999 } 01000 catch (InterruptedException e) 01001 { 01002 throw new SQLException(Translate.get("loadbalancer.commit.timeout", 01003 new String[]{String.valueOf(tm.getTransactionId()), 01004 String.valueOf(task.getSuccess()), 01005 String.valueOf(task.getFailed())})); 01006 } 01007 01008 if (task.getSuccess() > 0) 01009 return; 01010 else 01011 { // All tasks failed 01012 ArrayList exceptions = task.getExceptions(); 01013 if (exceptions == null) 01014 throw new SQLException(Translate.get( 01015 "loadbalancer.commit.all.failed", tm.getTransactionId())); 01016 else 01017 { 01018 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 01019 tm.getTransactionId()) 01020 + "\n"; 01021 for (int i = 0; i < exceptions.size(); i++) 01022 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01023 logger.error(errorMsg); 01024 throw new SQLException(errorMsg); 01025 } 01026 } 01027 } 01028 } |
|
Disables a backend that was previously enabled. Ask the corresponding connection manager to finalize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1を再定義しています。 RAIDb1ec.java の 170 行で定義されています。 参照先 org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.acquireWrite(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.equals(), org.objectweb.cjdbc.common.log.Trace.error(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.common.log.Trace.info(), と org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.releaseWrite().
00172 { 00173 int nbOfThreads = backendBlockingThreads.size(); 00174 00175 // Find the right blocking thread 00176 for (int i = 0; i < nbOfThreads; i++) 00177 { 00178 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 00179 .get(i); 00180 if (thread.getBackend().equals(db)) 00181 { 00182 logger.info(Translate.get( 00183 "loadbalancer.backend.workerthread.blocking.remove", db.getName())); 00184 00185 // Remove it from the backendBlockingThread list 00186 try 00187 { 00188 backendBlockingThreadsRWLock.acquireWrite(); 00189 } 00190 catch (InterruptedException e) 00191 { 00192 String msg = Translate.get( 00193 "loadbalancer.backendlist.acquire.writelock.failed", e); 00194 logger.error(msg); 00195 throw new SQLException(msg); 00196 } 00197 backendBlockingThreads.remove(thread); 00198 backendBlockingThreadsRWLock.releaseWrite(); 00199 00200 synchronized (thread) 00201 { 00202 // Kill the thread 00203 thread.addPriorityTask(new KillThreadTask(1, 1)); 00204 thread.notify(); 00205 } 00206 break; 00207 } 00208 } 00209 00210 // Find the right non-blocking thread 00211 nbOfThreads = backendNonBlockingThreads.size(); 00212 for (int i = 0; i < nbOfThreads; i++) 00213 { 00214 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 00215 .get(i); 00216 if (thread.getBackend().equals(db)) 00217 { 00218 logger.info(Translate.get( 00219 "loadbalancer.backend.workerthread.non.blocking.remove", db 00220 .getName())); 00221 00222 // Remove it from the backendNonBlockingThreads list 00223 try 00224 { 00225 backendNonBlockingThreadsRWLock.acquireWrite(); 00226 } 00227 catch (InterruptedException e) 00228 { 00229 String msg = Translate.get( 00230 "loadbalancer.backendlist.acquire.writelock.failed", e); 00231 logger.error(msg); 00232 throw new SQLException(msg); 00233 } 00234 backendNonBlockingThreads.remove(thread); 00235 backendNonBlockingThreadsRWLock.releaseWrite(); 00236 00237 synchronized (thread) 00238 { 00239 // Kill the thread 00240 thread.addPriorityTask(new KillThreadTask(1, 1)); 00241 thread.notify(); 00242 } 00243 break; 00244 } 00245 } 00246 00247 db.disable(); 00248 if (db.isInitialized()) 00249 db.finalizeConnections(); 00250 } |
|
Enables a backend that was previously disabled. Ask the corresponding connection manager to initialize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1を再定義しています。 RAIDb1ec.java の 107 行で定義されています。 参照先 org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.acquireWrite(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.enableRead(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.enableWrite(), org.objectweb.cjdbc.common.log.Trace.error(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.getName(), org.objectweb.cjdbc.common.log.Trace.info(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.initializeConnections(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.isInitialized(), と org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.releaseWrite().
00109 { 00110 // Create 2 worker threads for writes 00111 BackendWorkerThread blockingThread = new BackendWorkerThread( 00112 (DatabaseBackend) db, this); 00113 BackendWorkerThread nonBlockingThread = new BackendWorkerThread( 00114 (DatabaseBackend) db, this); 00115 00116 // Add first to the blocking thread list 00117 try 00118 { 00119 backendBlockingThreadsRWLock.acquireWrite(); 00120 } 00121 catch (InterruptedException e) 00122 { 00123 String msg = Translate.get( 00124 "loadbalancer.backendlist.acquire.writelock.failed", e); 00125 logger.error(msg); 00126 throw new SQLException(msg); 00127 } 00128 backendBlockingThreads.add(blockingThread); 00129 backendBlockingThreadsRWLock.releaseWrite(); 00130 blockingThread.start(); 00131 logger.info(Translate.get("loadbalancer.backend.workerthread.blocking.add", 00132 db.getName())); 00133 00134 // Then add to the non-blocking thread list 00135 try 00136 { 00137 backendNonBlockingThreadsRWLock.acquireWrite(); 00138 } 00139 catch (InterruptedException e) 00140 { 00141 String msg = Translate.get( 00142 "loadbalancer.backendlist.acquire.writelock.failed", e); 00143 logger.error(msg); 00144 throw new SQLException(msg); 00145 } 00146 backendNonBlockingThreads.add(nonBlockingThread); 00147 backendNonBlockingThreadsRWLock.releaseWrite(); 00148 nonBlockingThread.start(); 00149 logger.info(Translate.get( 00150 "loadbalancer.backend.workerthread.non.blocking.add", db.getName())); 00151 00152 if (!db.isInitialized()) 00153 db.initializeConnections(); 00154 db.enableRead(); 00155 if (writeEnabled) 00156 db.enableWrite(); 00157 } |
|
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_RR, と org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRRを実装しています. |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 734 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask.getResult().
00736 { 00737 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure( 00738 proc, true); 00739 return task.getResult(); 00740 } |
|
Execute a read request on the selected backend.
RAIDb1.java の 162 行で定義されています。 参照先 org.objectweb.cjdbc.common.log.Trace.error().
00164 { 00165 // Handle macros 00166 request.setSQL(RequestManager.handleSQLMacros(request.getSQL(), 00167 timestampResolution, false)); 00168 00169 // Ok, we have a backend, let's execute the request 00170 AbstractConnectionManager cm = backend.getConnectionManager(request 00171 .getLogin()); 00172 00173 // Sanity check 00174 if (cm == null) 00175 { 00176 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00177 new String[]{request.getLogin(), backend.getName()}); 00178 logger.error(msg); 00179 throw new SQLException(msg); 00180 } 00181 00182 // Execute the query 00183 if (request.isAutoCommit()) 00184 { 00185 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00186 // We could do something finer grain here by waiting 00187 // only for writes that depend on the tables we need 00188 // but is that really worth the overhead ? 00189 waitForAllWritesToComplete(backend); 00190 00191 ResultSet rs = null; 00192 boolean badConnection; 00193 do 00194 { 00195 badConnection = false; 00196 // Use a connection just for this request 00197 Connection c = null; 00198 try 00199 { 00200 c = cm.getConnection(); 00201 } 00202 catch (UnreachableBackendException e1) 00203 { 00204 logger.error(Translate.get( 00205 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00206 backend.disable(); 00207 throw new UnreachableBackendException(Translate.get( 00208 "loadbalancer.backend.unreacheable", backend.getName())); 00209 } 00210 00211 // Sanity check 00212 if (c == null) 00213 throw new SQLException(Translate.get( 00214 "loadbalancer.backend.no.connection", backend.getName())); 00215 00216 // Execute Query 00217 try 00218 { 00219 rs = executeStatementOnBackend(request, backend, c); 00220 cm.releaseConnection(c); 00221 } 00222 catch (SQLException e) 00223 { 00224 cm.releaseConnection(c); 00225 throw new SQLException(Translate.get( 00226 "loadbalancer.request.failed.on.backend", new String[]{ 00227 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00228 backend.getName(), e.getMessage()})); 00229 } 00230 catch (BadConnectionException e) 00231 { // Get rid of the bad connection 00232 cm.deleteConnection(c); 00233 badConnection = true; 00234 } 00235 } 00236 while (badConnection); 00237 if (logger.isDebugEnabled()) 00238 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00239 String.valueOf(request.getId()), backend.getName()})); 00240 return rs; 00241 } 00242 else 00243 { // Inside a transaction 00244 Connection c; 00245 long tid = request.getTransactionId(); 00246 Long lTid = new Long(tid); 00247 00248 // Wait for previous writes to complete 00249 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00250 waitForAllWritesToComplete(backend, request.getTransactionId()); 00251 00252 if (!backend.isStartedTransaction(lTid)) 00253 { // transaction has not been started yet on this backend 00254 try 00255 { 00256 c = cm.getConnection(tid); 00257 } 00258 catch (UnreachableBackendException e1) 00259 { 00260 logger.error(Translate.get( 00261 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00262 backend.disable(); 00263 throw new SQLException(Translate.get( 00264 "loadbalancer.backend.unreacheable", backend.getName())); 00265 } 00266 00267 // Sanity check 00268 if (c == null) 00269 throw new SQLException(Translate.get( 00270 "loadbalancer.unable.get.connection", new String[]{ 00271 String.valueOf(tid), backend.getName()})); 00272 00273 // begin transaction 00274 backend.startTransaction(lTid); 00275 c.setAutoCommit(false); 00276 } 00277 else 00278 { // Re-use the connection used by this transaction 00279 c = cm.retrieveConnection(tid); 00280 00281 // Sanity check 00282 if (c == null) 00283 throw new SQLException(Translate.get( 00284 "loadbalancer.unable.retrieve.connection", new String[]{ 00285 String.valueOf(tid), backend.getName()})); 00286 } 00287 00288 // Execute Query 00289 ResultSet rs = null; 00290 try 00291 { 00292 rs = executeStatementOnBackend(request, backend, c); 00293 } 00294 catch (SQLException e) 00295 { 00296 throw new SQLException(Translate.get( 00297 "loadbalancer.request.failed.on.backend", new String[]{ 00298 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00299 backend.getName(), e.getMessage()})); 00300 } 00301 catch (BadConnectionException e) 00302 { // Connection failed, so did the transaction 00303 // Disable the backend. 00304 cm.deleteConnection(tid); 00305 String msg = Translate.get( 00306 "loadbalancer.backend.disabling.connection.failure", backend 00307 .getName()); 00308 logger.error(msg); 00309 backend.disable(); 00310 throw new SQLException(msg); 00311 } 00312 if (logger.isDebugEnabled()) 00313 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00314 new String[]{String.valueOf(tid), String.valueOf(request.getId()), 00315 backend.getName()})); 00316 return rs; 00317 } 00318 } |
|
Execute a statement on a backend. If the execution fails, the connection is checked for validity. If the connection was not valid, the query is automatically retried on a new connection.
AbstractLoadBalancer.java の 251 行で定義されています。 参照先 java.sql.Statement.executeQuery(), java.sql.Statement.setCursorName(), java.sql.Statement.setFetchSize(), java.sql.Statement.setMaxRows(), java.sql.Statement.setQueryTimeout(), org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetCursorName, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetFetchSize, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetMaxRows, と org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetQueryTimeout. 参照元 org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDB.execReadRequest().
00254 { 00255 ResultSet rs = null; 00256 try 00257 { 00258 backend.addPendingReadRequest(request); 00259 String sql = request.getSQL(); 00260 // Rewrite the query if needed 00261 sql = backend.rewriteQuery(sql); 00262 // Execute the query 00263 Statement s = c.createStatement(); 00264 DriverCompliance driverCompliance = backend.getDriverCompliance(); 00265 if (driverCompliance.supportSetQueryTimeout()) 00266 s.setQueryTimeout(request.getTimeout()); 00267 if ((request.getCursorName() != null) 00268 && (driverCompliance.supportSetCursorName())) 00269 s.setCursorName(request.getCursorName()); 00270 if ((request.getFetchSize() != 0) 00271 && driverCompliance.supportSetFetchSize()) 00272 s.setFetchSize(request.getFetchSize()); 00273 if ((request.getMaxRows() > 0) && driverCompliance.supportSetMaxRows()) 00274 s.setMaxRows(request.getMaxRows()); 00275 rs = s.executeQuery(sql); 00276 } 00277 catch (SQLException e) 00278 { // Something bad happened 00279 if (backend.isValidConnection(c)) 00280 throw e; // Connection is valid, throw the exception 00281 else 00282 throw new BadConnectionException(); 00283 } 00284 finally 00285 { 00286 backend.removePendingRequest(request); 00287 } 00288 return rs; 00289 } |
|
Execute a stored procedure on the selected backend.
RAIDb1.java の 576 行で定義されています。
00579 { 00580 // Handle macros 00581 proc.setSQL(RequestManager.handleSQLMacros(proc.getSQL(), 00582 timestampResolution, false)); 00583 00584 // Ok, we have a backend, let's execute the request 00585 AbstractConnectionManager cm = backend 00586 .getConnectionManager(proc.getLogin()); 00587 00588 // Sanity check 00589 if (cm == null) 00590 { 00591 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00592 new String[]{proc.getLogin(), backend.getName()}); 00593 logger.error(msg); 00594 throw new SQLException(msg); 00595 } 00596 00597 // Execute the query 00598 if (proc.isAutoCommit()) 00599 { 00600 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00601 // We could do something finer grain here by waiting 00602 // only for writes that depend on the tables we need 00603 // but is that really worth the overhead ? 00604 waitForAllWritesToComplete(backend); 00605 00606 // Use a connection just for this request 00607 Connection c = null; 00608 try 00609 { 00610 c = cm.getConnection(); 00611 } 00612 catch (UnreachableBackendException e1) 00613 { 00614 logger.error(Translate.get( 00615 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00616 backend.disable(); 00617 throw new UnreachableBackendException(Translate.get( 00618 "loadbalancer.backend.unreacheable", backend.getName())); 00619 } 00620 00621 // Sanity check 00622 if (c == null) 00623 throw new UnreachableBackendException(Translate.get( 00624 "loadbalancer.backend.no.connection", backend.getName())); 00625 00626 // Execute Query 00627 ResultSet rs = null; 00628 try 00629 { 00630 // We suppose here that the request does not modify the schema since 00631 // it is a read-only query. 00632 CallableStatement cs = c.prepareCall(proc.getSQL()); 00633 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00634 cs.setQueryTimeout(proc.getTimeout()); 00635 if ((proc.getMaxRows() > 0) 00636 && backend.getDriverCompliance().supportSetMaxRows()) 00637 cs.setMaxRows(proc.getMaxRows()); 00638 rs = cs.executeQuery(); 00639 } 00640 catch (SQLException e) 00641 { 00642 throw new SQLException(Translate.get( 00643 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00644 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00645 backend.getName(), e.getMessage()})); 00646 } 00647 finally 00648 { 00649 cm.releaseConnection(c); 00650 } 00651 if (logger.isDebugEnabled()) 00652 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 00653 new String[]{String.valueOf(proc.getId()), backend.getName()})); 00654 return rs; 00655 } 00656 else 00657 { // Inside a transaction 00658 Connection c; 00659 long tid = proc.getTransactionId(); 00660 Long lTid = new Long(tid); 00661 00662 // Wait for previous writes to complete 00663 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00664 waitForAllWritesToComplete(backend, proc.getTransactionId()); 00665 00666 if (!backend.isStartedTransaction(lTid)) 00667 { // transaction has not been started yet on this backend 00668 try 00669 { 00670 c = cm.getConnection(tid); 00671 } 00672 catch (UnreachableBackendException e1) 00673 { 00674 logger.error(Translate.get( 00675 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00676 backend.disable(); 00677 throw new SQLException(Translate.get( 00678 "loadbalancer.backend.unreacheable", backend.getName())); 00679 } 00680 00681 // Sanity check 00682 if (c == null) 00683 throw new SQLException(Translate.get( 00684 "loadbalancer.unable.get.connection", new String[]{ 00685 String.valueOf(tid), backend.getName()})); 00686 00687 // begin transaction 00688 backend.startTransaction(lTid); 00689 c.setAutoCommit(false); 00690 } 00691 else 00692 { // Re-use the connection used by this transaction 00693 c = cm.retrieveConnection(tid); 00694 00695 // Sanity check 00696 if (c == null) 00697 throw new SQLException(Translate.get( 00698 "loadbalancer.unable.retrieve.connection", new String[]{ 00699 String.valueOf(tid), backend.getName()})); 00700 } 00701 00702 // Execute Query 00703 ResultSet rs; 00704 try 00705 { 00706 // We suppose here that the request does not modify the schema since 00707 // it is a read-only query. 00708 CallableStatement cs = c.prepareCall(proc.getSQL()); 00709 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00710 cs.setQueryTimeout(proc.getTimeout()); 00711 if ((proc.getMaxRows() > 0) 00712 && backend.getDriverCompliance().supportSetMaxRows()) 00713 cs.setMaxRows(proc.getMaxRows()); 00714 rs = cs.executeQuery(); 00715 } 00716 catch (SQLException e) 00717 { 00718 throw new SQLException(Translate.get( 00719 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00720 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00721 backend.getName(), e.getMessage()})); 00722 } 00723 if (logger.isDebugEnabled()) 00724 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00725 new String[]{String.valueOf(tid), String.valueOf(proc.getId()), 00726 backend.getName()})); 00727 return rs; 00728 } 00729 } |
|
Performs a write request. This request is broadcasted to all nodes.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 329 行で定義されています。
00331 { 00332 if (!request.isParsed() && (request.isCreate() || request.isDrop())) 00333 request.parse(null, ParsingGranularities.TABLE, true); 00334 return ((WriteRequestTask) execWriteRequest(request, false)).getResult(); 00335 } |
|
Perform a write request and return the auto generated keys.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 346 行で定義されています。
00348 { 00349 if (!request.isParsed() && (request.isCreate() || request.isDrop())) 00350 request.parse(null, ParsingGranularities.TABLE, true); 00351 return ((WriteRequestWithKeysTask) execWriteRequest(request, false)) 00352 .getResult(); 00353 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 745 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask.getResult().
00746 { 00747 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure( 00748 proc, false); 00749 return task.getResult(); 00750 } |
|
Get information about the Request Load Balancer
org.objectweb.cjdbc.controller.loadbalancer.paralleldb.ParallelDB_LPRF, org.objectweb.cjdbc.controller.loadbalancer.paralleldb.ParallelDB_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRR, と org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDBを実装しています. |
|
Get the needed query parsing granularity.
AbstractLoadBalancer.java の 151 行で定義されています。 参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.setLoadBalancer().
00152 { 00153 return parsingGranularity; 00154 } |
|
Surrounding raidb1 tags can be treated by getXmlImpl above, but more detailed content have to be returned by the method getRaidb1Xml below.
org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_RR, と org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRRを実装しています. |
|
Returns the RAIDbLevel.
AbstractLoadBalancer.java の 131 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.raidbLevel.
00132 { 00133 return raidbLevel; 00134 } |
|
AbstractLoadBalancer.java の 385 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getXmlImpl().
00386 { 00387 StringBuffer info = new StringBuffer(); 00388 info.append("<" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00389 info.append(getXmlImpl()); 00390 info.append("</" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00391 return info.toString(); 00392 } |
|
org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1を再定義しています。 RAIDb1ec.java の 255 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy.getXml().
00256 { 00257 StringBuffer info = new StringBuffer(); 00258 info.append("<" + DatabasesXmlTags.ELT_RAIDb_1ec + " " 00259 + DatabasesXmlTags.ATT_nbOfConcurrentReads + "=\"" 00260 + this.nbOfConcurrentReads + "\">"); 00261 this.getRaidb1Xml(); 00262 if (waitForCompletionPolicy != null) 00263 info.append(waitForCompletionPolicy.getXml()); 00264 info.append("</" + DatabasesXmlTags.ELT_RAIDb_1ec + ">"); 00265 return info.toString(); 00266 } |
|
Rollbacks a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 1036 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
01037 { 01038 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 01039 waitForAllWritesToComplete(tm.getTransactionId()); 01040 01041 try 01042 { 01043 backendNonBlockingThreadsRWLock.acquireRead(); 01044 } 01045 catch (InterruptedException e) 01046 { 01047 String msg = Translate.get( 01048 "loadbalancer.backendlist.acquire.readlock.failed", e); 01049 logger.error(msg); 01050 throw new SQLException(msg); 01051 } 01052 int nbOfThreads = backendNonBlockingThreads.size(); 01053 ArrayList rollbackList = new ArrayList(); 01054 Long lTid = new Long(tm.getTransactionId()); 01055 01056 // Build the list of backend that need to rollback this transaction 01057 for (int i = 0; i < nbOfThreads; i++) 01058 { 01059 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01060 .get(i); 01061 if (thread.getBackend().isStartedTransaction(lTid)) 01062 rollbackList.add(thread); 01063 } 01064 01065 nbOfThreads = rollbackList.size(); 01066 if (nbOfThreads == 0) 01067 { 01068 backendNonBlockingThreadsRWLock.releaseRead(); 01069 return; 01070 } 01071 01072 // Create the task 01073 RollbackTask task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, 01074 tm.getTimeout(), tm.getLogin(), tm.getTransactionId()); 01075 01076 synchronized (task) 01077 { 01078 // Post the task in each backendThread tasklist and wakeup the threads 01079 for (int i = 0; i < nbOfThreads; i++) 01080 { 01081 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 01082 synchronized (thread) 01083 { 01084 thread.addTask(task); 01085 thread.notify(); 01086 } 01087 } 01088 01089 backendNonBlockingThreadsRWLock.releaseRead(); 01090 01091 // Wait for completion (notified by the task) 01092 try 01093 { 01094 // Wait on task 01095 long timeout = tm.getTimeout(); 01096 if (timeout > 0) 01097 { 01098 long start = System.currentTimeMillis(); 01099 task.wait(timeout); 01100 long end = System.currentTimeMillis(); 01101 long remaining = timeout - (end - start); 01102 if (remaining <= 0) 01103 { 01104 String msg = Translate.get("loadbalancer.rollback.timeout", 01105 new String[]{String.valueOf(tm.getTransactionId()), 01106 String.valueOf(task.getSuccess()), 01107 String.valueOf(task.getFailed())}); 01108 logger.warn(msg); 01109 throw new SQLException(msg); 01110 } 01111 } 01112 else 01113 task.wait(); 01114 } 01115 catch (InterruptedException e) 01116 { 01117 throw new SQLException(Translate.get("loadbalancer.rollback.timeout", 01118 new String[]{String.valueOf(tm.getTransactionId()), 01119 String.valueOf(task.getSuccess()), 01120 String.valueOf(task.getFailed())})); 01121 } 01122 01123 if (task.getSuccess() > 0) 01124 return; 01125 else 01126 { // All tasks failed 01127 ArrayList exceptions = task.getExceptions(); 01128 if (exceptions == null) 01129 throw new SQLException(Translate.get( 01130 "loadbalancer.rollback.all.failed", tm.getTransactionId())); 01131 else 01132 { 01133 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 01134 tm.getTransactionId()) 01135 + "\n"; 01136 for (int i = 0; i < exceptions.size(); i++) 01137 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01138 logger.error(errorMsg); 01139 throw new SQLException(errorMsg); 01140 } 01141 } 01142 } 01143 } |
|
Set the needed query parsing granularity.
AbstractLoadBalancer.java の 161 行で定義されています。
00162 {
00163 this.parsingGranularity = parsingGranularity;
00164 }
|
|
Sets the RAIDbLevel.
AbstractLoadBalancer.java の 141 行で定義されています。
00142 {
00143 this.raidbLevel = raidbLevel;
00144 }
|
|
Associate a weight to a backend identified by its logical name.
org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRR, と org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDBで再定義されています。 AbstractLoadBalancer.java の 359 行で定義されています。
00360 { 00361 throw new SQLException("Weight is not supported by this load balancer"); 00362 } |
|
Waits for all writes in the blocking thread queue of the given backend to complete.
RAIDb1.java の 1217 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01219 { 01220 try 01221 { 01222 backendBlockingThreadsRWLock.acquireRead(); 01223 } 01224 catch (InterruptedException e) 01225 { 01226 String msg = Translate.get( 01227 "loadbalancer.backendlist.acquire.readlock.failed", e); 01228 logger.error(msg); 01229 throw new SQLException(msg); 01230 } 01231 01232 int nbOfThreads = backendBlockingThreads.size(); 01233 01234 for (int i = 0; i < nbOfThreads; i++) 01235 { 01236 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01237 .get(i); 01238 if (thread.getBackend() == backend) 01239 thread.waitForAllTasksToComplete(); 01240 } 01241 01242 backendBlockingThreadsRWLock.releaseRead(); 01243 } |
|
Waits for all writes of the given transaction in the blocking thread queue of the given backend to complete before being able to complete the transaction.
RAIDb1.java の 1183 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01185 { 01186 try 01187 { 01188 backendBlockingThreadsRWLock.acquireRead(); 01189 } 01190 catch (InterruptedException e) 01191 { 01192 String msg = Translate.get( 01193 "loadbalancer.backendlist.acquire.readlock.failed", e); 01194 logger.error(msg); 01195 throw new SQLException(msg); 01196 } 01197 01198 int nbOfThreads = backendBlockingThreads.size(); 01199 01200 for (int i = 0; i < nbOfThreads; i++) 01201 { 01202 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01203 .get(i); 01204 if (thread.getBackend() == backend) 01205 thread.waitForAllTasksToComplete(transactionId); 01206 } 01207 01208 backendBlockingThreadsRWLock.releaseRead(); 01209 } |
|
Waits for all writes of the given transaction in the blocking thread queue to complete before being able to complete the transaction. RAIDb1.java の 1149 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01151 { 01152 try 01153 { 01154 backendBlockingThreadsRWLock.acquireRead(); 01155 } 01156 catch (InterruptedException e) 01157 { 01158 String msg = Translate.get( 01159 "loadbalancer.backendlist.acquire.readlock.failed", e); 01160 logger.error(msg); 01161 throw new SQLException(msg); 01162 } 01163 01164 int nbOfThreads = backendBlockingThreads.size(); 01165 01166 for (int i = 0; i < nbOfThreads; i++) 01167 { 01168 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01169 .get(i); 01170 thread.waitForAllTasksToComplete(transactionId); 01171 } 01172 01173 backendBlockingThreadsRWLock.releaseRead(); 01174 } |
|
RAIDb1.java の 79 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.RAIDb1(). |
|
RAIDb1.java の 81 行で定義されています。 |
|
RAIDb1.java の 80 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.RAIDb1(). |
|
RAIDb1.java の 82 行で定義されています。 |
|
RAIDb1ec.java の 57 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec.RAIDb1ec(). |
|
RAIDb1ec.java の 59 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec.RAIDb1ec(). |
|
初期値: Trace
.getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb1ec")
org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1を再定義しています。 RAIDb1ec.java の 61 行で定義されています。 |
|
RAIDb1ec.java の 58 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec.RAIDb1ec(). |
|
AbstractLoadBalancer.java の 74 行で定義されています。 |
|
AbstractLoadBalancer.java の 73 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.AbstractLoadBalancer(), と org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getRAIDbLevel(). |
|
AbstractLoadBalancer.java の 72 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.AbstractLoadBalancer(). |
|
RAIDb1.java の 84 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.getNbToWait(), と org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.RAIDb1(). |