This class is an abstract call because the read requests coming from the request controller are NOT treated here but in the subclasses. Transaction management and write requests are broadcasted to all backends.
RAIDb1.java の 72 行で定義されています。
Public メソッド | |
RAIDb1 (VirtualDatabase vdb, WaitForCompletionPolicy waitForCompletionPolicy, long timestampResolution) throws SQLException | |
abstract java.sql.ResultSet | execReadRequest (SelectRequest request) throws SQLException |
int | execWriteRequest (AbstractWriteRequest request) throws AllBackendsFailedException, SQLException |
ResultSet | execWriteRequestWithKeys (AbstractWriteRequest request) throws AllBackendsFailedException, SQLException |
ResultSet | execReadStoredProcedure (StoredProcedure proc) throws SQLException |
int | execWriteStoredProcedure (StoredProcedure proc) throws SQLException |
final void | begin (TransactionMarkerMetaData tm) throws SQLException |
void | commit (TransactionMarkerMetaData tm) throws SQLException |
void | rollback (TransactionMarkerMetaData tm) throws SQLException |
void | enableBackend (DatabaseBackend db, boolean writeEnabled) throws SQLException |
synchronized void | disableBackend (DatabaseBackend db) throws SQLException |
String | getXmlImpl () |
abstract String | getRaidb1Xml () |
int | getRAIDbLevel () |
void | setRAIDbLevel (int raidbLevel) |
int | getParsingGranularity () |
void | setParsingGranularity (int parsingGranularity) |
abstract ResultSet | execReadOnlyReadStoredProcedure (StoredProcedure proc) throws SQLException |
void | setWeight (String name, int w) throws SQLException |
abstract String | getInformation () |
String | getXml () |
Protected メソッド | |
java.sql.ResultSet | executeRequestOnBackend (SelectRequest request, DatabaseBackend backend) throws SQLException, UnreachableBackendException |
java.sql.ResultSet | executeStoredProcedureOnBackend (StoredProcedure proc, DatabaseBackend backend) throws SQLException, UnreachableBackendException |
void | waitForAllWritesToComplete (long transactionId) throws SQLException |
void | waitForAllWritesToComplete (DatabaseBackend backend, long transactionId) throws SQLException |
void | waitForAllWritesToComplete (DatabaseBackend backend) throws SQLException |
ResultSet | executeStatementOnBackend (SelectRequest request, DatabaseBackend backend, Connection c) throws SQLException, BadConnectionException |
Protected 変数 | |
ArrayList | backendBlockingThreads |
ArrayList | backendNonBlockingThreads |
ReadPrioritaryFIFOWriteLock | backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock() |
ReadPrioritaryFIFOWriteLock | backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock() |
WaitForCompletionPolicy | waitForCompletionPolicy |
VirtualDatabase | vdb |
int | raidbLevel |
int | parsingGranularity |
Static Protected 変数 | |
Trace | logger |
Private メソッド | |
int | getNbToWait (int nbOfThreads) |
AbstractTask | execWriteRequest (AbstractWriteRequest request, boolean useKeys) throws AllBackendsFailedException, SQLException |
AbstractTask | callStoredProcedure (StoredProcedure proc, boolean isRead) throws SQLException |
Private 変数 | |
long | timestampResolution |
|
Creates a new RAIDb-1 Round Robin request load balancer. A new backend worker thread is created for each backend.
RAIDb1.java の 104 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.backendBlockingThreads, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.backendNonBlockingThreads, と org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.waitForCompletionPolicy.
00107 { 00108 super(vdb, RAIDbLevels.RAIDb1, ParsingGranularities.NO_PARSING); 00109 00110 this.waitForCompletionPolicy = waitForCompletionPolicy; 00111 this.timestampResolution = timestampResolution; 00112 backendBlockingThreads = new ArrayList(); 00113 backendNonBlockingThreads = new ArrayList(); 00114 } |
|
Begins a new transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 910 行で定義されています。
00911 { 00912 } |
|
Post the stored procedure call in the threads task list.
RAIDb1.java の 761 行で定義されています。 参照先 org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.acquireRead(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.releaseRead(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.removeTask().
00763 { 00764 ArrayList backendThreads = backendBlockingThreads; 00765 ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock; 00766 00767 try 00768 { 00769 lock.acquireRead(); 00770 } 00771 catch (InterruptedException e) 00772 { 00773 String msg = Translate.get( 00774 "loadbalancer.backendlist.acquire.readlock.failed", e); 00775 logger.error(msg); 00776 throw new SQLException(msg); 00777 } 00778 00779 int nbOfThreads = backendThreads.size(); 00780 00781 // Create the task 00782 AbstractTask task; 00783 if (isRead) 00784 task = new ReadStoredProcedureTask(getNbToWait(nbOfThreads), nbOfThreads, 00785 proc); 00786 else 00787 task = new WriteStoredProcedureTask(getNbToWait(nbOfThreads), 00788 nbOfThreads, proc); 00789 00790 synchronized (task) 00791 { 00792 // Post the task in each backendThread tasklist and wakeup the threads 00793 for (int i = 0; i < nbOfThreads; i++) 00794 { 00795 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00796 .get(i); 00797 synchronized (thread) 00798 { 00799 if ((waitForCompletionPolicy.getPolicy() == WaitForCompletionPolicy.ALL)) 00800 thread.addTask(task); 00801 else 00802 thread.addTask(task, proc.getTransactionId()); 00803 thread.notify(); 00804 } 00805 } 00806 00807 lock.releaseRead(); 00808 00809 // Wait for completion (notified by the task) 00810 try 00811 { 00812 // Wait on task 00813 long timeout = proc.getTimeout() * 1000; 00814 if (timeout > 0) 00815 { 00816 long start = System.currentTimeMillis(); 00817 task.wait(timeout); 00818 long end = System.currentTimeMillis(); 00819 long remaining = timeout - (end - start); 00820 if (remaining <= 0) 00821 { 00822 String msg = Translate.get("loadbalancer.storedprocedure.timeout", 00823 new String[]{String.valueOf(proc.getId()), 00824 String.valueOf(task.getSuccess()), 00825 String.valueOf(task.getFailed())}); 00826 // Try to remove the request from the task list 00827 lock.acquireRead(); 00828 nbOfThreads = backendThreads.size(); 00829 for (int i = 0; i < nbOfThreads; i++) 00830 { 00831 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00832 .get(i); 00833 synchronized (thread) 00834 { 00835 thread.removeTask(task); 00836 } 00837 } 00838 lock.releaseRead(); 00839 00840 logger.warn(msg); 00841 throw new SQLException(msg); 00842 } 00843 // No need to update request timeout since the execution is finished 00844 } 00845 else 00846 task.wait(); 00847 } 00848 catch (InterruptedException e) 00849 { 00850 // Try to remove the request from the task list 00851 try 00852 { 00853 lock.acquireRead(); 00854 nbOfThreads = backendThreads.size(); 00855 } 00856 catch (InterruptedException ignore) 00857 { 00858 nbOfThreads = 0; // Give up 00859 } 00860 for (int i = 0; i < nbOfThreads; i++) 00861 { 00862 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00863 .get(i); 00864 synchronized (thread) 00865 { 00866 thread.removeTask(task); 00867 } 00868 } 00869 lock.releaseRead(); 00870 00871 throw new SQLException(Translate.get( 00872 "loadbalancer.storedprocedure.timeout", new String[]{ 00873 String.valueOf(proc.getId()), 00874 String.valueOf(task.getSuccess()), 00875 String.valueOf(task.getFailed())})); 00876 } 00877 00878 if (task.getSuccess() > 0) 00879 return task; 00880 else 00881 { // All tasks failed 00882 ArrayList exceptions = task.getExceptions(); 00883 if (exceptions == null) 00884 throw new SQLException(Translate.get( 00885 "loadbalancer.storedprocedure.all.failed", proc.getId())); 00886 else 00887 { 00888 String errorMsg = Translate.get( 00889 "loadbalancer.storedprocedure.failed.stack", proc.getId()) 00890 + "\n"; 00891 for (int i = 0; i < exceptions.size(); i++) 00892 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 00893 logger.error(errorMsg); 00894 throw new SQLException(errorMsg); 00895 } 00896 } 00897 } 00898 } |
|
Commits a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 920 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
00921 { 00922 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00923 waitForAllWritesToComplete(tm.getTransactionId()); 00924 00925 try 00926 { 00927 backendNonBlockingThreadsRWLock.acquireRead(); 00928 } 00929 catch (InterruptedException e) 00930 { 00931 String msg = Translate.get( 00932 "loadbalancer.backendlist.acquire.readlock.failed", e); 00933 logger.error(msg); 00934 throw new SQLException(msg); 00935 } 00936 00937 int nbOfThreads = backendNonBlockingThreads.size(); 00938 ArrayList commitList = new ArrayList(); 00939 Long lTid = new Long(tm.getTransactionId()); 00940 00941 // Build the list of backend that need to commit this transaction 00942 for (int i = 0; i < nbOfThreads; i++) 00943 { 00944 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 00945 .get(i); 00946 if (thread.getBackend().isStartedTransaction(lTid)) 00947 commitList.add(thread); 00948 } 00949 00950 nbOfThreads = commitList.size(); 00951 if (nbOfThreads == 0) 00952 { 00953 backendNonBlockingThreadsRWLock.releaseRead(); 00954 return; 00955 } 00956 00957 // Create the task 00958 CommitTask task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm 00959 .getTimeout(), tm.getLogin(), tm.getTransactionId()); 00960 00961 synchronized (task) 00962 { 00963 // Post the task in each backendThread tasklist and wakeup the threads 00964 for (int i = 0; i < nbOfThreads; i++) 00965 { 00966 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 00967 synchronized (thread) 00968 { 00969 thread.addTask(task); 00970 thread.notify(); 00971 } 00972 } 00973 00974 backendNonBlockingThreadsRWLock.releaseRead(); 00975 00976 // Wait for completion (notified by the task) 00977 try 00978 { 00979 // Wait on task 00980 long timeout = tm.getTimeout(); 00981 if (timeout > 0) 00982 { 00983 long start = System.currentTimeMillis(); 00984 task.wait(timeout); 00985 long end = System.currentTimeMillis(); 00986 long remaining = timeout - (end - start); 00987 if (remaining <= 0) 00988 { 00989 String msg = Translate.get("loadbalancer.commit.timeout", 00990 new String[]{String.valueOf(tm.getTransactionId()), 00991 String.valueOf(task.getSuccess()), 00992 String.valueOf(task.getFailed())}); 00993 logger.warn(msg); 00994 throw new SQLException(msg); 00995 } 00996 } 00997 else 00998 task.wait(); 00999 } 01000 catch (InterruptedException e) 01001 { 01002 throw new SQLException(Translate.get("loadbalancer.commit.timeout", 01003 new String[]{String.valueOf(tm.getTransactionId()), 01004 String.valueOf(task.getSuccess()), 01005 String.valueOf(task.getFailed())})); 01006 } 01007 01008 if (task.getSuccess() > 0) 01009 return; 01010 else 01011 { // All tasks failed 01012 ArrayList exceptions = task.getExceptions(); 01013 if (exceptions == null) 01014 throw new SQLException(Translate.get( 01015 "loadbalancer.commit.all.failed", tm.getTransactionId())); 01016 else 01017 { 01018 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 01019 tm.getTransactionId()) 01020 + "\n"; 01021 for (int i = 0; i < exceptions.size(); i++) 01022 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01023 logger.error(errorMsg); 01024 throw new SQLException(errorMsg); 01025 } 01026 } 01027 } 01028 } |
|
Disables a backend that was previously enabled. Ask the corresponding connection manager to finalize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ecで再定義されています。 RAIDb1.java の 1326 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.equals(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend().
01328 { 01329 if (db.isWriteEnabled()) 01330 { 01331 // Starts with backendBlockingThreads 01332 try 01333 { 01334 backendBlockingThreadsRWLock.acquireWrite(); 01335 } 01336 catch (InterruptedException e) 01337 { 01338 String msg = Translate.get( 01339 "loadbalancer.backendlist.acquire.writelock.failed", e); 01340 logger.error(msg); 01341 throw new SQLException(msg); 01342 } 01343 01344 int nbOfThreads = backendBlockingThreads.size(); 01345 01346 // Find the right blocking thread 01347 for (int i = 0; i < nbOfThreads; i++) 01348 { 01349 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01350 .get(i); 01351 if (thread.getBackend().equals(db)) 01352 { 01353 logger.info(Translate 01354 .get("loadbalancer.backend.workerthread.blocking.remove", db 01355 .getName())); 01356 01357 // Remove it from the backendBlockingThread list 01358 backendBlockingThreads.remove(thread); 01359 01360 synchronized (thread) 01361 { 01362 // Kill the thread 01363 thread.addPriorityTask(new KillThreadTask(1, 1)); 01364 thread.notify(); 01365 } 01366 break; 01367 } 01368 } 01369 01370 backendBlockingThreadsRWLock.releaseWrite(); 01371 01372 // Continue with backendNonBlockingThreads 01373 01374 try 01375 { 01376 backendNonBlockingThreadsRWLock.acquireWrite(); 01377 } 01378 catch (InterruptedException e) 01379 { 01380 String msg = Translate.get( 01381 "loadbalancer.backendlist.acquire.writelock.failed", e); 01382 logger.error(msg); 01383 throw new SQLException(msg); 01384 } 01385 01386 // Find the right non-blocking thread 01387 nbOfThreads = backendNonBlockingThreads.size(); 01388 for (int i = 0; i < nbOfThreads; i++) 01389 { 01390 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01391 .get(i); 01392 if (thread.getBackend().equals(db)) 01393 { 01394 logger.info(Translate.get( 01395 "loadbalancer.backend.workerthread.non.blocking.remove", db 01396 .getName())); 01397 01398 // Remove it from the backendNonBlockingThreads list 01399 backendNonBlockingThreads.remove(thread); 01400 01401 synchronized (thread) 01402 { 01403 // Kill the thread 01404 thread.addPriorityTask(new KillThreadTask(1, 1)); 01405 thread.notify(); 01406 } 01407 break; 01408 } 01409 } 01410 01411 backendNonBlockingThreadsRWLock.releaseWrite(); 01412 } 01413 01414 db.disable(); 01415 if (db.isInitialized()) 01416 db.finalizeConnections(); 01417 } |
|
Enables a Backend that was previously disabled. Ask the corresponding connection manager to initialize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ecで再定義されています。 RAIDb1.java の 1261 行で定義されています。 参照先 org.objectweb.cjdbc.controller.backend.DatabaseBackend.enableWrite(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.getName().
01263 { 01264 if (writeEnabled) 01265 { 01266 // Create 2 worker threads 01267 BackendWorkerThread blockingThread = new BackendWorkerThread( 01268 (DatabaseBackend) db, this); 01269 BackendWorkerThread nonBlockingThread = new BackendWorkerThread( 01270 (DatabaseBackend) db, this); 01271 01272 // Add first to the blocking thread list 01273 try 01274 { 01275 backendBlockingThreadsRWLock.acquireWrite(); 01276 } 01277 catch (InterruptedException e) 01278 { 01279 String msg = Translate.get( 01280 "loadbalancer.backendlist.acquire.writelock.failed", e); 01281 logger.error(msg); 01282 throw new SQLException(msg); 01283 } 01284 backendBlockingThreads.add(blockingThread); 01285 backendBlockingThreadsRWLock.releaseWrite(); 01286 blockingThread.start(); 01287 logger.info(Translate.get( 01288 "loadbalancer.backend.workerthread.blocking.add", db.getName())); 01289 01290 // Then add to the non-blocking thread list 01291 try 01292 { 01293 backendNonBlockingThreadsRWLock.acquireWrite(); 01294 } 01295 catch (InterruptedException e) 01296 { 01297 String msg = Translate.get( 01298 "loadbalancer.backendlist.acquire.writelock.failed", e); 01299 logger.error(msg); 01300 throw new SQLException(msg); 01301 } 01302 backendNonBlockingThreads.add(nonBlockingThread); 01303 backendNonBlockingThreadsRWLock.releaseWrite(); 01304 nonBlockingThread.start(); 01305 logger.info(Translate.get( 01306 "loadbalancer.backend.workerthread.non.blocking.add", db.getName())); 01307 db.enableWrite(); 01308 } 01309 01310 if (!db.isInitialized()) 01311 db.initializeConnections(); 01312 db.enableRead(); 01313 } |
|
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_RR, と org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRRを実装しています. |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 734 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask.getResult().
00736 { 00737 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure( 00738 proc, true); 00739 return task.getResult(); 00740 } |
|
Execute a read request on the selected backend.
RAIDb1.java の 162 行で定義されています。 参照先 org.objectweb.cjdbc.common.log.Trace.error().
00164 { 00165 // Handle macros 00166 request.setSQL(RequestManager.handleSQLMacros(request.getSQL(), 00167 timestampResolution, false)); 00168 00169 // Ok, we have a backend, let's execute the request 00170 AbstractConnectionManager cm = backend.getConnectionManager(request 00171 .getLogin()); 00172 00173 // Sanity check 00174 if (cm == null) 00175 { 00176 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00177 new String[]{request.getLogin(), backend.getName()}); 00178 logger.error(msg); 00179 throw new SQLException(msg); 00180 } 00181 00182 // Execute the query 00183 if (request.isAutoCommit()) 00184 { 00185 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00186 // We could do something finer grain here by waiting 00187 // only for writes that depend on the tables we need 00188 // but is that really worth the overhead ? 00189 waitForAllWritesToComplete(backend); 00190 00191 ResultSet rs = null; 00192 boolean badConnection; 00193 do 00194 { 00195 badConnection = false; 00196 // Use a connection just for this request 00197 Connection c = null; 00198 try 00199 { 00200 c = cm.getConnection(); 00201 } 00202 catch (UnreachableBackendException e1) 00203 { 00204 logger.error(Translate.get( 00205 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00206 backend.disable(); 00207 throw new UnreachableBackendException(Translate.get( 00208 "loadbalancer.backend.unreacheable", backend.getName())); 00209 } 00210 00211 // Sanity check 00212 if (c == null) 00213 throw new SQLException(Translate.get( 00214 "loadbalancer.backend.no.connection", backend.getName())); 00215 00216 // Execute Query 00217 try 00218 { 00219 rs = executeStatementOnBackend(request, backend, c); 00220 cm.releaseConnection(c); 00221 } 00222 catch (SQLException e) 00223 { 00224 cm.releaseConnection(c); 00225 throw new SQLException(Translate.get( 00226 "loadbalancer.request.failed.on.backend", new String[]{ 00227 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00228 backend.getName(), e.getMessage()})); 00229 } 00230 catch (BadConnectionException e) 00231 { // Get rid of the bad connection 00232 cm.deleteConnection(c); 00233 badConnection = true; 00234 } 00235 } 00236 while (badConnection); 00237 if (logger.isDebugEnabled()) 00238 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00239 String.valueOf(request.getId()), backend.getName()})); 00240 return rs; 00241 } 00242 else 00243 { // Inside a transaction 00244 Connection c; 00245 long tid = request.getTransactionId(); 00246 Long lTid = new Long(tid); 00247 00248 // Wait for previous writes to complete 00249 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00250 waitForAllWritesToComplete(backend, request.getTransactionId()); 00251 00252 if (!backend.isStartedTransaction(lTid)) 00253 { // transaction has not been started yet on this backend 00254 try 00255 { 00256 c = cm.getConnection(tid); 00257 } 00258 catch (UnreachableBackendException e1) 00259 { 00260 logger.error(Translate.get( 00261 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00262 backend.disable(); 00263 throw new SQLException(Translate.get( 00264 "loadbalancer.backend.unreacheable", backend.getName())); 00265 } 00266 00267 // Sanity check 00268 if (c == null) 00269 throw new SQLException(Translate.get( 00270 "loadbalancer.unable.get.connection", new String[]{ 00271 String.valueOf(tid), backend.getName()})); 00272 00273 // begin transaction 00274 backend.startTransaction(lTid); 00275 c.setAutoCommit(false); 00276 } 00277 else 00278 { // Re-use the connection used by this transaction 00279 c = cm.retrieveConnection(tid); 00280 00281 // Sanity check 00282 if (c == null) 00283 throw new SQLException(Translate.get( 00284 "loadbalancer.unable.retrieve.connection", new String[]{ 00285 String.valueOf(tid), backend.getName()})); 00286 } 00287 00288 // Execute Query 00289 ResultSet rs = null; 00290 try 00291 { 00292 rs = executeStatementOnBackend(request, backend, c); 00293 } 00294 catch (SQLException e) 00295 { 00296 throw new SQLException(Translate.get( 00297 "loadbalancer.request.failed.on.backend", new String[]{ 00298 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00299 backend.getName(), e.getMessage()})); 00300 } 00301 catch (BadConnectionException e) 00302 { // Connection failed, so did the transaction 00303 // Disable the backend. 00304 cm.deleteConnection(tid); 00305 String msg = Translate.get( 00306 "loadbalancer.backend.disabling.connection.failure", backend 00307 .getName()); 00308 logger.error(msg); 00309 backend.disable(); 00310 throw new SQLException(msg); 00311 } 00312 if (logger.isDebugEnabled()) 00313 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00314 new String[]{String.valueOf(tid), String.valueOf(request.getId()), 00315 backend.getName()})); 00316 return rs; 00317 } 00318 } |
|
Execute a statement on a backend. If the execution fails, the connection is checked for validity. If the connection was not valid, the query is automatically retried on a new connection.
AbstractLoadBalancer.java の 251 行で定義されています。 参照先 java.sql.Statement.executeQuery(), java.sql.Statement.setCursorName(), java.sql.Statement.setFetchSize(), java.sql.Statement.setMaxRows(), java.sql.Statement.setQueryTimeout(), org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetCursorName, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetFetchSize, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetMaxRows, と org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetQueryTimeout. 参照元 org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDB.execReadRequest().
00254 { 00255 ResultSet rs = null; 00256 try 00257 { 00258 backend.addPendingReadRequest(request); 00259 String sql = request.getSQL(); 00260 // Rewrite the query if needed 00261 sql = backend.rewriteQuery(sql); 00262 // Execute the query 00263 Statement s = c.createStatement(); 00264 DriverCompliance driverCompliance = backend.getDriverCompliance(); 00265 if (driverCompliance.supportSetQueryTimeout()) 00266 s.setQueryTimeout(request.getTimeout()); 00267 if ((request.getCursorName() != null) 00268 && (driverCompliance.supportSetCursorName())) 00269 s.setCursorName(request.getCursorName()); 00270 if ((request.getFetchSize() != 0) 00271 && driverCompliance.supportSetFetchSize()) 00272 s.setFetchSize(request.getFetchSize()); 00273 if ((request.getMaxRows() > 0) && driverCompliance.supportSetMaxRows()) 00274 s.setMaxRows(request.getMaxRows()); 00275 rs = s.executeQuery(sql); 00276 } 00277 catch (SQLException e) 00278 { // Something bad happened 00279 if (backend.isValidConnection(c)) 00280 throw e; // Connection is valid, throw the exception 00281 else 00282 throw new BadConnectionException(); 00283 } 00284 finally 00285 { 00286 backend.removePendingRequest(request); 00287 } 00288 return rs; 00289 } |
|
Execute a stored procedure on the selected backend.
RAIDb1.java の 576 行で定義されています。
00579 { 00580 // Handle macros 00581 proc.setSQL(RequestManager.handleSQLMacros(proc.getSQL(), 00582 timestampResolution, false)); 00583 00584 // Ok, we have a backend, let's execute the request 00585 AbstractConnectionManager cm = backend 00586 .getConnectionManager(proc.getLogin()); 00587 00588 // Sanity check 00589 if (cm == null) 00590 { 00591 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00592 new String[]{proc.getLogin(), backend.getName()}); 00593 logger.error(msg); 00594 throw new SQLException(msg); 00595 } 00596 00597 // Execute the query 00598 if (proc.isAutoCommit()) 00599 { 00600 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00601 // We could do something finer grain here by waiting 00602 // only for writes that depend on the tables we need 00603 // but is that really worth the overhead ? 00604 waitForAllWritesToComplete(backend); 00605 00606 // Use a connection just for this request 00607 Connection c = null; 00608 try 00609 { 00610 c = cm.getConnection(); 00611 } 00612 catch (UnreachableBackendException e1) 00613 { 00614 logger.error(Translate.get( 00615 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00616 backend.disable(); 00617 throw new UnreachableBackendException(Translate.get( 00618 "loadbalancer.backend.unreacheable", backend.getName())); 00619 } 00620 00621 // Sanity check 00622 if (c == null) 00623 throw new UnreachableBackendException(Translate.get( 00624 "loadbalancer.backend.no.connection", backend.getName())); 00625 00626 // Execute Query 00627 ResultSet rs = null; 00628 try 00629 { 00630 // We suppose here that the request does not modify the schema since 00631 // it is a read-only query. 00632 CallableStatement cs = c.prepareCall(proc.getSQL()); 00633 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00634 cs.setQueryTimeout(proc.getTimeout()); 00635 if ((proc.getMaxRows() > 0) 00636 && backend.getDriverCompliance().supportSetMaxRows()) 00637 cs.setMaxRows(proc.getMaxRows()); 00638 rs = cs.executeQuery(); 00639 } 00640 catch (SQLException e) 00641 { 00642 throw new SQLException(Translate.get( 00643 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00644 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00645 backend.getName(), e.getMessage()})); 00646 } 00647 finally 00648 { 00649 cm.releaseConnection(c); 00650 } 00651 if (logger.isDebugEnabled()) 00652 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 00653 new String[]{String.valueOf(proc.getId()), backend.getName()})); 00654 return rs; 00655 } 00656 else 00657 { // Inside a transaction 00658 Connection c; 00659 long tid = proc.getTransactionId(); 00660 Long lTid = new Long(tid); 00661 00662 // Wait for previous writes to complete 00663 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00664 waitForAllWritesToComplete(backend, proc.getTransactionId()); 00665 00666 if (!backend.isStartedTransaction(lTid)) 00667 { // transaction has not been started yet on this backend 00668 try 00669 { 00670 c = cm.getConnection(tid); 00671 } 00672 catch (UnreachableBackendException e1) 00673 { 00674 logger.error(Translate.get( 00675 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00676 backend.disable(); 00677 throw new SQLException(Translate.get( 00678 "loadbalancer.backend.unreacheable", backend.getName())); 00679 } 00680 00681 // Sanity check 00682 if (c == null) 00683 throw new SQLException(Translate.get( 00684 "loadbalancer.unable.get.connection", new String[]{ 00685 String.valueOf(tid), backend.getName()})); 00686 00687 // begin transaction 00688 backend.startTransaction(lTid); 00689 c.setAutoCommit(false); 00690 } 00691 else 00692 { // Re-use the connection used by this transaction 00693 c = cm.retrieveConnection(tid); 00694 00695 // Sanity check 00696 if (c == null) 00697 throw new SQLException(Translate.get( 00698 "loadbalancer.unable.retrieve.connection", new String[]{ 00699 String.valueOf(tid), backend.getName()})); 00700 } 00701 00702 // Execute Query 00703 ResultSet rs; 00704 try 00705 { 00706 // We suppose here that the request does not modify the schema since 00707 // it is a read-only query. 00708 CallableStatement cs = c.prepareCall(proc.getSQL()); 00709 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00710 cs.setQueryTimeout(proc.getTimeout()); 00711 if ((proc.getMaxRows() > 0) 00712 && backend.getDriverCompliance().supportSetMaxRows()) 00713 cs.setMaxRows(proc.getMaxRows()); 00714 rs = cs.executeQuery(); 00715 } 00716 catch (SQLException e) 00717 { 00718 throw new SQLException(Translate.get( 00719 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00720 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00721 backend.getName(), e.getMessage()})); 00722 } 00723 if (logger.isDebugEnabled()) 00724 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00725 new String[]{String.valueOf(tid), String.valueOf(proc.getId()), 00726 backend.getName()})); 00727 return rs; 00728 } 00729 } |
|
Common code for execWriteRequest(AbstractWriteRequest) and execWriteRequestWithKeys(AbstractWriteRequest). The result is given back using member variables execWriteRequestResult and execWriteRequestWithKeysResult defined above.
RAIDb1.java の 367 行で定義されています。 参照先 org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.acquireRead(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.releaseRead(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.removeTask().
00369 { 00370 ArrayList backendThreads; 00371 ReadPrioritaryFIFOWriteLock lock; 00372 00373 // Handle macros 00374 request.setSQL(RequestManager.handleSQLMacros(request.getSQL(), 00375 timestampResolution, false)); 00376 00377 // Determine which list (blocking or not) to use 00378 if (request.mightBlock()) 00379 { // Blocking 00380 backendThreads = backendBlockingThreads; 00381 lock = backendBlockingThreadsRWLock; 00382 } 00383 else 00384 { // Non-blocking 00385 backendThreads = backendNonBlockingThreads; 00386 lock = backendNonBlockingThreadsRWLock; 00387 if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00388 && (request.getTransactionId() != 0)) 00389 waitForAllWritesToComplete(request.getTransactionId()); 00390 } 00391 00392 try 00393 { 00394 lock.acquireRead(); 00395 } 00396 catch (InterruptedException e) 00397 { 00398 String msg = Translate.get( 00399 "loadbalancer.backendlist.acquire.readlock.failed", e); 00400 logger.error(msg); 00401 throw new SQLException(msg); 00402 } 00403 00404 int nbOfThreads = backendThreads.size(); 00405 00406 // Create the task 00407 AbstractTask task; 00408 if (useKeys) 00409 task = new WriteRequestWithKeysTask(getNbToWait(nbOfThreads), 00410 nbOfThreads, request); 00411 else 00412 task = new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads, 00413 request); 00414 00415 synchronized (task) 00416 { 00417 if (waitForCompletionPolicy.getPolicy() == WaitForCompletionPolicy.ALL) 00418 { // Post the task in each backendThread tasklist and wakeup the threads 00419 for (int i = 0; i < nbOfThreads; i++) 00420 { 00421 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00422 .get(i); 00423 synchronized (thread) 00424 { 00425 thread.addTask(task); 00426 thread.notify(); 00427 } 00428 } 00429 } 00430 else 00431 { 00432 // We have to first post the request on each backend before letting the 00433 // first backend to execute the request. Therefore we have 2 phases: 00434 // 1. post the task in each thread queue 00435 // 2. notify each thread to execute the query 00436 00437 // 1. Post the task 00438 if (request.mightBlock()) 00439 { 00440 for (int i = 0; i < nbOfThreads; i++) 00441 { 00442 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00443 .get(i); 00444 synchronized (thread) 00445 { 00446 thread.addTask(task, request.getTransactionId()); 00447 } 00448 } 00449 } 00450 else 00451 { 00452 for (int i = 0; i < nbOfThreads; i++) 00453 { 00454 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00455 .get(i); 00456 synchronized (thread) 00457 { 00458 thread.addTask(task); 00459 } 00460 } 00461 } 00462 00463 // 2. Start the task execution on each backend 00464 for (int i = 0; i < nbOfThreads; i++) 00465 { 00466 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00467 .get(i); 00468 synchronized (thread) 00469 { 00470 thread.notify(); 00471 } 00472 } 00473 } 00474 00475 lock.releaseRead(); 00476 00477 // Wait for completion (notified by the task) 00478 try 00479 { 00480 // Wait on task 00481 long timeout = request.getTimeout() * 1000; 00482 if (timeout > 0) 00483 { 00484 long start = System.currentTimeMillis(); 00485 task.wait(timeout); 00486 long end = System.currentTimeMillis(); 00487 long remaining = timeout - (end - start); 00488 if (remaining <= 0) 00489 { 00490 String msg = Translate.get("loadbalancer.request.timeout", 00491 new String[]{String.valueOf(request.getId()), 00492 String.valueOf(task.getSuccess()), 00493 String.valueOf(task.getFailed())}); 00494 00495 // Try to remove the request from the task list 00496 lock.acquireRead(); 00497 nbOfThreads = backendThreads.size(); 00498 for (int i = 0; i < nbOfThreads; i++) 00499 { 00500 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00501 .get(i); 00502 synchronized (thread) 00503 { 00504 thread.removeTask(task); 00505 } 00506 } 00507 lock.releaseRead(); 00508 00509 logger.warn(msg); 00510 throw new SQLException(msg); 00511 } 00512 // No need to update request timeout since the execution is finished 00513 } 00514 else 00515 task.wait(); 00516 } 00517 catch (InterruptedException e) 00518 { 00519 // Try to remove the request from the task list 00520 try 00521 { 00522 lock.acquireRead(); 00523 nbOfThreads = backendThreads.size(); 00524 } 00525 catch (InterruptedException ignore) 00526 { 00527 nbOfThreads = 0; // Give up 00528 } 00529 for (int i = 0; i < nbOfThreads; i++) 00530 { 00531 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00532 .get(i); 00533 synchronized (thread) 00534 { 00535 thread.removeTask(task); 00536 } 00537 } 00538 lock.releaseRead(); 00539 00540 throw new SQLException(Translate.get("loadbalancer.request.timeout", 00541 new String[]{String.valueOf(request.getId()), 00542 String.valueOf(task.getSuccess()), 00543 String.valueOf(task.getFailed())})); 00544 } 00545 00546 if (task.getSuccess() > 0) 00547 return task; 00548 else 00549 { // All tasks failed 00550 ArrayList exceptions = task.getExceptions(); 00551 if (exceptions == null) 00552 throw new AllBackendsFailedException(Translate.get( 00553 "loadbalancer.request.failed.all", request.getId())); 00554 else 00555 { 00556 String errorMsg = Translate.get("loadbalancer.request.failed.stack", 00557 request.getId()) 00558 + "\n"; 00559 for (int i = 0; i < exceptions.size(); i++) 00560 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 00561 logger.error(errorMsg); 00562 throw new SQLException(errorMsg); 00563 } 00564 } 00565 } 00566 } |
|
Performs a write request. This request is broadcasted to all nodes.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 329 行で定義されています。
00331 { 00332 if (!request.isParsed() && (request.isCreate() || request.isDrop())) 00333 request.parse(null, ParsingGranularities.TABLE, true); 00334 return ((WriteRequestTask) execWriteRequest(request, false)).getResult(); 00335 } |
|
Perform a write request and return the auto generated keys.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 346 行で定義されています。
00348 { 00349 if (!request.isParsed() && (request.isCreate() || request.isDrop())) 00350 request.parse(null, ParsingGranularities.TABLE, true); 00351 return ((WriteRequestWithKeysTask) execWriteRequest(request, false)) 00352 .getResult(); 00353 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 745 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask.getResult().
00746 { 00747 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure( 00748 proc, false); 00749 return task.getResult(); 00750 } |
|
Get information about the Request Load Balancer
org.objectweb.cjdbc.controller.loadbalancer.paralleldb.ParallelDB_LPRF, org.objectweb.cjdbc.controller.loadbalancer.paralleldb.ParallelDB_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRR, と org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDBを実装しています. |
|
Returns the number of nodes to wait for according to the defined
RAIDb1.java の 127 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy.getPolicy(), org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.waitForCompletionPolicy, と org.objectweb.cjdbc.common.log.Trace.warn().
00128 { 00129 int nbToWait; 00130 switch (waitForCompletionPolicy.getPolicy()) 00131 { 00132 case WaitForCompletionPolicy.FIRST : 00133 nbToWait = 1; 00134 break; 00135 case WaitForCompletionPolicy.MAJORITY : 00136 nbToWait = nbOfThreads / 2 + 1; 00137 break; 00138 default : 00139 logger 00140 .warn(Translate.get("loadbalancer.waitforcompletion.unsupported")); 00141 case WaitForCompletionPolicy.ALL : 00142 nbToWait = nbOfThreads; 00143 break; 00144 } 00145 return nbToWait; 00146 } |
|
Get the needed query parsing granularity.
AbstractLoadBalancer.java の 151 行で定義されています。 参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.setLoadBalancer().
00152 { 00153 return parsingGranularity; 00154 } |
|
Surrounding raidb1 tags can be treated by getXmlImpl above, but more detailed content have to be returned by the method getRaidb1Xml below.
org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_RR, と org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRRを実装しています. |
|
Returns the RAIDbLevel.
AbstractLoadBalancer.java の 131 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.raidbLevel.
00132 { 00133 return raidbLevel; 00134 } |
|
AbstractLoadBalancer.java の 385 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getXmlImpl().
00386 { 00387 StringBuffer info = new StringBuffer(); 00388 info.append("<" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00389 info.append(getXmlImpl()); 00390 info.append("</" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00391 return info.toString(); 00392 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ecで再定義されています。 RAIDb1.java の 1422 行で定義されています。
01423 { 01424 StringBuffer info = new StringBuffer(); 01425 info.append("<" + DatabasesXmlTags.ELT_RAIDb_1 + " " 01426 + DatabasesXmlTags.ATT_timestampResolution + "=\"" 01427 + timestampResolution + "\" >"); 01428 if (waitForCompletionPolicy != null) 01429 info.append(waitForCompletionPolicy.getXml()); 01430 info.append(getRaidb1Xml()); 01431 info.append("</" + DatabasesXmlTags.ELT_RAIDb_1 + ">"); 01432 return info.toString(); 01433 } |
|
Rollbacks a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 1036 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
01037 { 01038 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 01039 waitForAllWritesToComplete(tm.getTransactionId()); 01040 01041 try 01042 { 01043 backendNonBlockingThreadsRWLock.acquireRead(); 01044 } 01045 catch (InterruptedException e) 01046 { 01047 String msg = Translate.get( 01048 "loadbalancer.backendlist.acquire.readlock.failed", e); 01049 logger.error(msg); 01050 throw new SQLException(msg); 01051 } 01052 int nbOfThreads = backendNonBlockingThreads.size(); 01053 ArrayList rollbackList = new ArrayList(); 01054 Long lTid = new Long(tm.getTransactionId()); 01055 01056 // Build the list of backend that need to rollback this transaction 01057 for (int i = 0; i < nbOfThreads; i++) 01058 { 01059 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01060 .get(i); 01061 if (thread.getBackend().isStartedTransaction(lTid)) 01062 rollbackList.add(thread); 01063 } 01064 01065 nbOfThreads = rollbackList.size(); 01066 if (nbOfThreads == 0) 01067 { 01068 backendNonBlockingThreadsRWLock.releaseRead(); 01069 return; 01070 } 01071 01072 // Create the task 01073 RollbackTask task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, 01074 tm.getTimeout(), tm.getLogin(), tm.getTransactionId()); 01075 01076 synchronized (task) 01077 { 01078 // Post the task in each backendThread tasklist and wakeup the threads 01079 for (int i = 0; i < nbOfThreads; i++) 01080 { 01081 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 01082 synchronized (thread) 01083 { 01084 thread.addTask(task); 01085 thread.notify(); 01086 } 01087 } 01088 01089 backendNonBlockingThreadsRWLock.releaseRead(); 01090 01091 // Wait for completion (notified by the task) 01092 try 01093 { 01094 // Wait on task 01095 long timeout = tm.getTimeout(); 01096 if (timeout > 0) 01097 { 01098 long start = System.currentTimeMillis(); 01099 task.wait(timeout); 01100 long end = System.currentTimeMillis(); 01101 long remaining = timeout - (end - start); 01102 if (remaining <= 0) 01103 { 01104 String msg = Translate.get("loadbalancer.rollback.timeout", 01105 new String[]{String.valueOf(tm.getTransactionId()), 01106 String.valueOf(task.getSuccess()), 01107 String.valueOf(task.getFailed())}); 01108 logger.warn(msg); 01109 throw new SQLException(msg); 01110 } 01111 } 01112 else 01113 task.wait(); 01114 } 01115 catch (InterruptedException e) 01116 { 01117 throw new SQLException(Translate.get("loadbalancer.rollback.timeout", 01118 new String[]{String.valueOf(tm.getTransactionId()), 01119 String.valueOf(task.getSuccess()), 01120 String.valueOf(task.getFailed())})); 01121 } 01122 01123 if (task.getSuccess() > 0) 01124 return; 01125 else 01126 { // All tasks failed 01127 ArrayList exceptions = task.getExceptions(); 01128 if (exceptions == null) 01129 throw new SQLException(Translate.get( 01130 "loadbalancer.rollback.all.failed", tm.getTransactionId())); 01131 else 01132 { 01133 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 01134 tm.getTransactionId()) 01135 + "\n"; 01136 for (int i = 0; i < exceptions.size(); i++) 01137 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01138 logger.error(errorMsg); 01139 throw new SQLException(errorMsg); 01140 } 01141 } 01142 } 01143 } |
|
Set the needed query parsing granularity.
AbstractLoadBalancer.java の 161 行で定義されています。
00162 {
00163 this.parsingGranularity = parsingGranularity;
00164 }
|
|
Sets the RAIDbLevel.
AbstractLoadBalancer.java の 141 行で定義されています。
00142 {
00143 this.raidbLevel = raidbLevel;
00144 }
|
|
Associate a weight to a backend identified by its logical name.
org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRR, と org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDBで再定義されています。 AbstractLoadBalancer.java の 359 行で定義されています。
00360 { 00361 throw new SQLException("Weight is not supported by this load balancer"); 00362 } |
|
Waits for all writes in the blocking thread queue of the given backend to complete.
RAIDb1.java の 1217 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01219 { 01220 try 01221 { 01222 backendBlockingThreadsRWLock.acquireRead(); 01223 } 01224 catch (InterruptedException e) 01225 { 01226 String msg = Translate.get( 01227 "loadbalancer.backendlist.acquire.readlock.failed", e); 01228 logger.error(msg); 01229 throw new SQLException(msg); 01230 } 01231 01232 int nbOfThreads = backendBlockingThreads.size(); 01233 01234 for (int i = 0; i < nbOfThreads; i++) 01235 { 01236 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01237 .get(i); 01238 if (thread.getBackend() == backend) 01239 thread.waitForAllTasksToComplete(); 01240 } 01241 01242 backendBlockingThreadsRWLock.releaseRead(); 01243 } |
|
Waits for all writes of the given transaction in the blocking thread queue of the given backend to complete before being able to complete the transaction.
RAIDb1.java の 1183 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01185 { 01186 try 01187 { 01188 backendBlockingThreadsRWLock.acquireRead(); 01189 } 01190 catch (InterruptedException e) 01191 { 01192 String msg = Translate.get( 01193 "loadbalancer.backendlist.acquire.readlock.failed", e); 01194 logger.error(msg); 01195 throw new SQLException(msg); 01196 } 01197 01198 int nbOfThreads = backendBlockingThreads.size(); 01199 01200 for (int i = 0; i < nbOfThreads; i++) 01201 { 01202 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01203 .get(i); 01204 if (thread.getBackend() == backend) 01205 thread.waitForAllTasksToComplete(transactionId); 01206 } 01207 01208 backendBlockingThreadsRWLock.releaseRead(); 01209 } |
|
Waits for all writes of the given transaction in the blocking thread queue to complete before being able to complete the transaction. RAIDb1.java の 1149 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01151 { 01152 try 01153 { 01154 backendBlockingThreadsRWLock.acquireRead(); 01155 } 01156 catch (InterruptedException e) 01157 { 01158 String msg = Translate.get( 01159 "loadbalancer.backendlist.acquire.readlock.failed", e); 01160 logger.error(msg); 01161 throw new SQLException(msg); 01162 } 01163 01164 int nbOfThreads = backendBlockingThreads.size(); 01165 01166 for (int i = 0; i < nbOfThreads; i++) 01167 { 01168 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01169 .get(i); 01170 thread.waitForAllTasksToComplete(transactionId); 01171 } 01172 01173 backendBlockingThreadsRWLock.releaseRead(); 01174 } |
|
RAIDb1.java の 79 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.RAIDb1(). |
|
RAIDb1.java の 81 行で定義されています。 |
|
RAIDb1.java の 80 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.RAIDb1(). |
|
RAIDb1.java の 82 行で定義されています。 |
|
初期値: Trace
.getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb1")
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerを再定義しています。 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ecで再定義されています。 RAIDb1.java の 87 行で定義されています。 |
|
AbstractLoadBalancer.java の 74 行で定義されています。 |
|
AbstractLoadBalancer.java の 73 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.AbstractLoadBalancer(), と org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getRAIDbLevel(). |
|
RAIDb1.java の 85 行で定義されています。 |
|
AbstractLoadBalancer.java の 72 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.AbstractLoadBalancer(). |
|
RAIDb1.java の 84 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.getNbToWait(), と org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.RAIDb1(). |