The read requests coming from the request manager are sent to the node that has the Least pending read requests among the nodes that can execute the request.
RAIDb2_LPRF.java の 52 行で定義されています。
|
Creates a new RAIDb-2 Round Robin request load balancer.
RAIDb2_LPRF.java の 75 行で定義されています。
00079 { 00080 super(vdb, waitForCompletionPolicy, createTablePolicy, timestampResolution); 00081 } |
|
Begins a new transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 1005 行で定義されています。
01006 { 01007 } |
|
Commits a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 1015 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
01016 { 01017 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 01018 waitForAllWritesToComplete(tm.getTransactionId()); 01019 01020 try 01021 { 01022 backendNonBlockingThreadsRWLock.acquireRead(); 01023 } 01024 catch (InterruptedException e) 01025 { 01026 String msg = Translate.get( 01027 "loadbalancer.backendlist.acquire.readlock.failed", e); 01028 logger.error(msg); 01029 throw new SQLException(msg); 01030 } 01031 01032 int nbOfThreads = backendNonBlockingThreads.size(); 01033 ArrayList commitList = new ArrayList(); 01034 Long iTid = new Long(tm.getTransactionId()); 01035 01036 // Build the list of backend that need to commit this transaction 01037 for (int i = 0; i < nbOfThreads; i++) 01038 { 01039 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01040 .get(i); 01041 if (thread.getBackend().isStartedTransaction(iTid)) 01042 commitList.add(thread); 01043 } 01044 01045 nbOfThreads = commitList.size(); 01046 if (nbOfThreads == 0) 01047 { 01048 backendNonBlockingThreadsRWLock.releaseRead(); 01049 return; 01050 } 01051 01052 // Create the task 01053 CommitTask task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm 01054 .getTimeout(), tm.getLogin(), tm.getTransactionId()); 01055 01056 synchronized (task) 01057 { 01058 // Post the task in each backendThread tasklist and wakeup the threads 01059 for (int i = 0; i < nbOfThreads; i++) 01060 { 01061 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 01062 synchronized (thread) 01063 { 01064 thread.addTask(task); 01065 thread.notify(); 01066 } 01067 } 01068 01069 backendNonBlockingThreadsRWLock.releaseRead(); 01070 01071 // Wait for completion (notified by the task) 01072 try 01073 { 01074 // Wait on task 01075 long timeout = tm.getTimeout(); 01076 if (timeout > 0) 01077 { 01078 long start = System.currentTimeMillis(); 01079 task.wait(timeout); 01080 long end = System.currentTimeMillis(); 01081 long remaining = timeout - (end - start); 01082 if (remaining <= 0) 01083 { 01084 String msg = Translate.get("loadbalancer.commit.timeout", 01085 new String[]{String.valueOf(tm.getTransactionId()), 01086 String.valueOf(task.getSuccess()), 01087 String.valueOf(task.getFailed())}); 01088 logger.warn(msg); 01089 throw new SQLException(msg); 01090 } 01091 } 01092 else 01093 task.wait(); 01094 } 01095 catch (InterruptedException e) 01096 { 01097 throw new SQLException(Translate.get("loadbalancer.commit.timeout", 01098 new String[]{String.valueOf(tm.getTransactionId()), 01099 String.valueOf(task.getSuccess()), 01100 String.valueOf(task.getFailed())})); 01101 } 01102 01103 if (task.getSuccess() > 0) 01104 return; 01105 else 01106 { // All tasks failed 01107 ArrayList exceptions = task.getExceptions(); 01108 if (exceptions == null) 01109 throw new SQLException(Translate.get( 01110 "loadbalancer.commit.all.failed", tm.getTransactionId())); 01111 else 01112 { 01113 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 01114 tm.getTransactionId()) 01115 + "\n"; 01116 for (int i = 0; i < exceptions.size(); i++) 01117 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01118 logger.error(errorMsg); 01119 throw new SQLException(errorMsg); 01120 } 01121 } 01122 } 01123 } |
|
Disables a backend that was previously enabled. Ask the corresponding connection manager to finalize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。 RAIDb2.java の 1421 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.equals(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend().
01423 { 01424 if (db.isWriteEnabled()) 01425 { 01426 // Start with the backendBlockingThread list 01427 try 01428 { 01429 backendBlockingThreadsRWLock.acquireWrite(); 01430 } 01431 catch (InterruptedException e) 01432 { 01433 String msg = Translate.get( 01434 "loadbalancer.backendlist.acquire.writelock.failed", e); 01435 logger.error(msg); 01436 throw new SQLException(msg); 01437 } 01438 01439 int nbOfThreads = backendBlockingThreads.size(); 01440 01441 // Find the right blocking thread 01442 for (int i = 0; i < nbOfThreads; i++) 01443 { 01444 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01445 .get(i); 01446 if (thread.getBackend().equals(db)) 01447 { 01448 logger.info(Translate 01449 .get("loadbalancer.backend.workerthread.blocking.remove", db 01450 .getName())); 01451 01452 backendBlockingThreads.remove(thread); 01453 01454 synchronized (thread) 01455 { 01456 // Kill the thread 01457 thread.addPriorityTask(new KillThreadTask(1, 1)); 01458 thread.notify(); 01459 } 01460 break; 01461 } 01462 } 01463 01464 backendBlockingThreadsRWLock.releaseWrite(); 01465 01466 // Continue with the backendNonBlockingThread list 01467 01468 try 01469 { 01470 backendNonBlockingThreadsRWLock.acquireWrite(); 01471 } 01472 catch (InterruptedException e) 01473 { 01474 String msg = Translate.get( 01475 "loadbalancer.backendlist.acquire.writelock.failed", e); 01476 logger.error(msg); 01477 throw new SQLException(msg); 01478 } 01479 01480 // Find the right non-blocking thread 01481 nbOfThreads = backendNonBlockingThreads.size(); 01482 for (int i = 0; i < nbOfThreads; i++) 01483 { 01484 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01485 .get(i); 01486 if (thread.getBackend().equals(db)) 01487 { 01488 logger.info(Translate.get( 01489 "loadbalancer.backend.workerthread.non.blocking.remove", db 01490 .getName())); 01491 01492 backendNonBlockingThreads.remove(thread); 01493 01494 synchronized (thread) 01495 { 01496 // Kill the thread 01497 thread.addPriorityTask(new KillThreadTask(1, 1)); 01498 thread.notify(); 01499 } 01500 break; 01501 } 01502 } 01503 01504 backendNonBlockingThreadsRWLock.releaseWrite(); 01505 } 01506 01507 db.disable(); 01508 if (db.isInitialized()) 01509 db.finalizeConnections(); 01510 } |
|
Enables a Backend that was previously disabled. Ask the corresponding connection manager to initialize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。 RAIDb2.java の 1356 行で定義されています。 参照先 org.objectweb.cjdbc.controller.backend.DatabaseBackend.enableWrite(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.getName().
01358 { 01359 if (writeEnabled) 01360 { 01361 // Create 2 worker threads 01362 BackendWorkerThread blockingThread = new BackendWorkerThread( 01363 (DatabaseBackend) db, this); 01364 BackendWorkerThread nonBlockingThread = new BackendWorkerThread( 01365 (DatabaseBackend) db, this); 01366 01367 // Add first to the blocking thread list 01368 try 01369 { 01370 backendBlockingThreadsRWLock.acquireWrite(); 01371 } 01372 catch (InterruptedException e) 01373 { 01374 String msg = Translate.get( 01375 "loadbalancer.backendlist.acquire.writelock.failed", e); 01376 logger.error(msg); 01377 throw new SQLException(msg); 01378 } 01379 backendBlockingThreads.add(blockingThread); 01380 backendBlockingThreadsRWLock.releaseWrite(); 01381 blockingThread.start(); 01382 logger.info(Translate.get( 01383 "loadbalancer.backend.workerthread.blocking.add", db.getName())); 01384 01385 // Then add to the non-blocking thread list 01386 try 01387 { 01388 backendNonBlockingThreadsRWLock.acquireWrite(); 01389 } 01390 catch (InterruptedException e) 01391 { 01392 String msg = Translate.get( 01393 "loadbalancer.backendlist.acquire.writelock.failed", e); 01394 logger.error(msg); 01395 throw new SQLException(msg); 01396 } 01397 backendNonBlockingThreads.add(nonBlockingThread); 01398 backendNonBlockingThreadsRWLock.releaseWrite(); 01399 nonBlockingThread.start(); 01400 logger.info(Translate.get( 01401 "loadbalancer.backend.workerthread.non.blocking.add", db.getName())); 01402 db.enableWrite(); 01403 } 01404 01405 if (!db.isInitialized()) 01406 db.initializeConnections(); 01407 db.enableRead(); 01408 } |
|
Chooses the node to execute the stored procedure using a LPRF algorithm. If the next node has not the needed stored procedure, we try the next one and so on until a suitable backend is found.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2_LPRF.java の 216 行で定義されています。 参照先 org.objectweb.cjdbc.controller.backend.DatabaseBackend.getName(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.getPendingRequests(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.hasStoredProcedure(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isReadEnabled.
00218 { 00219 // Choose a backend 00220 try 00221 { 00222 vdb.acquireReadLockBackendLists(); 00223 } 00224 catch (InterruptedException e) 00225 { 00226 String msg = Translate.get( 00227 "loadbalancer.backendlist.acquire.readlock.failed", e); 00228 logger.error(msg); 00229 throw new SQLException(msg); 00230 } 00231 00232 DatabaseBackend backend = null; // The backend that will execute the query 00233 00234 // Note that vdb lock is released in the finally clause of this try/catch 00235 // block 00236 try 00237 { 00238 DatabaseBackend failedBackend = null; 00239 SQLException failedException = null; 00240 ResultSet rs = null; 00241 do 00242 { 00243 ArrayList backends = vdb.getBackends(); 00244 int size = backends.size(); 00245 00246 if (size == 0) 00247 throw new SQLException(Translate.get( 00248 "loadbalancer.execute.no.backend.available", proc.getId())); 00249 00250 // Choose the backend that has the least pending requests 00251 int leastRequests = 0; 00252 int enabledBackends = 0; 00253 00254 for (int i = 0; i < size; i++) 00255 { 00256 DatabaseBackend b = (DatabaseBackend) backends.get(i); 00257 if (b.isReadEnabled()) 00258 { 00259 enabledBackends++; 00260 if (b.hasStoredProcedure(proc.getProcedureName())) 00261 { 00262 int pending = b.getPendingRequests().size(); 00263 if ((b != failedBackend) 00264 && ((backend == null) || (pending < leastRequests))) 00265 { 00266 backend = b; 00267 if (pending == 0) 00268 break; // Stop here we will never find a less loaded node 00269 else 00270 leastRequests = pending; 00271 } 00272 } 00273 } 00274 } 00275 00276 if (backend == null) 00277 { 00278 if (enabledBackends == 0) 00279 throw new SQLException(Translate 00280 .get("loadbalancer.storedprocedure.backend.no.enabled", proc 00281 .getId())); 00282 else if (failedBackend == null) 00283 throw new SQLException(Translate.get( 00284 "loadbalancer.backend.no.required.storedprocedure", proc 00285 .getProcedureName())); 00286 else 00287 // Bad query, the only backend that could execute it has failed 00288 throw failedException; 00289 } 00290 00291 // Execute the request on the chosen backend 00292 boolean toDisable = false; 00293 try 00294 { 00295 rs = executeStoredProcedureOnBackend(proc, backend); 00296 if (failedBackend != null) 00297 { // Previous backend failed 00298 if (logger.isWarnEnabled()) 00299 logger.warn(Translate.get("loadbalancer.storedprocedure.status", 00300 new String[]{String.valueOf(proc.getId()), backend.getName(), 00301 failedBackend.getName()})); 00302 toDisable = true; 00303 } 00304 } 00305 catch (UnreachableBackendException se) 00306 { 00307 // Retry on an other backend. 00308 continue; 00309 } 00310 catch (SQLException se) 00311 { 00312 if (failedBackend != null) 00313 { // Bad query, no backend can execute it 00314 String msg = Translate.get( 00315 "loadbalancer.storedprocedure.failed.twice", new String[]{ 00316 String.valueOf(proc.getId()), se.getMessage()}); 00317 if (logger.isInfoEnabled()) 00318 logger.info(msg); 00319 throw new SQLException(msg); 00320 } 00321 else 00322 { // We are the first to fail on this query 00323 failedBackend = backend; 00324 failedException = se; 00325 if (logger.isInfoEnabled()) 00326 logger.info(Translate.get( 00327 "loadbalancer.storedprocedure.failed.on.backend", 00328 new String[]{ 00329 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00330 backend.getName(), se.getMessage()})); 00331 continue; 00332 } 00333 } 00334 00335 if (toDisable) 00336 { // retry has succeeded and we need to disable the first node that 00337 // failed 00338 try 00339 { 00340 if (logger.isWarnEnabled()) 00341 logger.warn(Translate.get("loadbalancer.backend.disabling", 00342 failedBackend.getName())); 00343 failedBackend.disable(); 00344 failedBackend.finalizeConnections(); 00345 } 00346 catch (SQLException ignore) 00347 { 00348 } 00349 finally 00350 { 00351 failedBackend = null; // to exit the do{}while 00352 } 00353 } 00354 } 00355 while (failedBackend != null); 00356 return rs; 00357 } 00358 catch (RuntimeException e) 00359 { 00360 String msg = Translate.get( 00361 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00362 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00363 backend.getName(), e.getMessage()}); 00364 logger.fatal(msg, e); 00365 throw new SQLException(msg); 00366 } 00367 finally 00368 { 00369 vdb.releaseReadLockBackendLists(); 00370 } 00371 } |
|
Chooses the node to execute the request using a round-robin algorithm. If the next node has not the tables needed to execute the requests, we try the next one and so on until a suitable backend is found.
org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2に実装されています. RAIDb2_LPRF.java の 98 行で定義されています。 参照先 org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.acquireReadLockBackendLists(), org.objectweb.cjdbc.common.log.Trace.error(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getBackends(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.getName(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.getPendingRequests(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getSQLShortFormLength(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.hasTables(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isReadEnabled.
00100 { 00101 // Choose a backend 00102 try 00103 { 00104 vdb.acquireReadLockBackendLists(); 00105 } 00106 catch (InterruptedException e) 00107 { 00108 String msg = Translate.get( 00109 "loadbalancer.backendlist.acquire.readlock.failed", e); 00110 logger.error(msg); 00111 throw new SQLException(msg); 00112 } 00113 00114 DatabaseBackend backend = null; // The backend that will execute the query 00115 00116 // Note that vdb lock is released in the finally clause of this try/catch 00117 // block 00118 try 00119 { 00120 ArrayList backends = vdb.getBackends(); 00121 int size = backends.size(); 00122 00123 if (size == 0) 00124 throw new SQLException(Translate.get( 00125 "loadbalancer.execute.no.backend.available", request.getId())); 00126 00127 // Choose the backend that has the least pending requests 00128 int leastRequests = 0; 00129 int enabledBackends = 0; 00130 ArrayList tables = request.getFrom(); 00131 00132 for (int i = 0; i < size; i++) 00133 { 00134 DatabaseBackend b = (DatabaseBackend) backends.get(i); 00135 if (b.isReadEnabled()) 00136 { 00137 enabledBackends++; 00138 if (b.hasTables(tables)) 00139 { 00140 int pending = b.getPendingRequests().size(); 00141 if (((backend == null) || (pending < leastRequests))) 00142 { 00143 backend = b; 00144 if (pending == 0) 00145 break; // Stop here we will never find a less loaded node 00146 else 00147 leastRequests = pending; 00148 } 00149 } 00150 } 00151 } 00152 00153 if (backend == null) 00154 { 00155 if (enabledBackends == 0) 00156 throw new SQLException(Translate.get( 00157 "loadbalancer.execute.no.backend.enabled", request.getId())); 00158 else 00159 throw new SQLException(Translate.get( 00160 "loadbalancer.backend.no.required.tables", tables.toString())); 00161 } 00162 00163 } 00164 catch (RuntimeException e) 00165 { 00166 String msg = Translate.get("loadbalancer.request.failed.on.backend", 00167 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()), 00168 backend.getName(), e.getMessage()}); 00169 logger.error(msg, e); 00170 throw new SQLException(msg); 00171 } 00172 finally 00173 { 00174 vdb.releaseReadLockBackendLists(); 00175 } 00176 00177 // Execute the request on the chosen backend 00178 ResultSet rs = null; 00179 boolean toDisable = false; 00180 try 00181 { 00182 rs = executeRequestOnBackend(request, backend); 00183 } 00184 catch (UnreachableBackendException se) 00185 { 00186 // Try on another backend 00187 return execReadRequest(request); 00188 } 00189 catch (SQLException se) 00190 { 00191 String msg = Translate.get("loadbalancer.request.failed", new String[]{ 00192 String.valueOf(request.getId()), se.getMessage()}); 00193 if (logger.isInfoEnabled()) 00194 logger.info(msg); 00195 throw new SQLException(msg); 00196 } 00197 catch (RuntimeException e) 00198 { 00199 String msg = Translate.get("loadbalancer.request.failed.on.backend", 00200 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()), 00201 backend.getName(), e.getMessage()}); 00202 logger.error(msg, e); 00203 throw new SQLException(msg); 00204 } 00205 00206 return rs; 00207 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 816 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask.getResult().
00818 { 00819 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure( 00820 proc, true); 00821 return task.getResult(); 00822 } |
|
Execute a read request on the selected backend.
RAIDb2.java の 492 行で定義されています。
00494 { 00495 // Handle macros 00496 request.setSQL(RequestManager.handleSQLMacros(request.getSQL(), 00497 timestampResolution, false)); 00498 00499 // Ok, we have a backend, let's execute the request 00500 AbstractConnectionManager cm = backend.getConnectionManager(request 00501 .getLogin()); 00502 00503 // Sanity check 00504 if (cm == null) 00505 { 00506 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00507 new String[]{request.getLogin(), backend.getName()}); 00508 logger.error(msg); 00509 throw new SQLException(msg); 00510 } 00511 00512 // Execute the query 00513 if (request.isAutoCommit()) 00514 { 00515 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00516 // We could do something finer grain here by waiting 00517 // only for writes that depend on the tables we need 00518 // but is that really worth the overhead ? 00519 waitForAllWritesToComplete(backend); 00520 00521 ResultSet rs = null; 00522 boolean badConnection; 00523 do 00524 { 00525 badConnection = false; 00526 // Use a connection just for this request 00527 Connection c = null; 00528 try 00529 { 00530 c = cm.getConnection(); 00531 } 00532 catch (UnreachableBackendException e1) 00533 { 00534 logger.error(Translate.get( 00535 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00536 backend.disable(); 00537 throw new UnreachableBackendException(Translate.get( 00538 "loadbalancer.backend.unreacheable", backend.getName())); 00539 } 00540 00541 // Sanity check 00542 if (c == null) 00543 throw new UnreachableBackendException( 00544 "No more connections on backend " + backend.getName()); 00545 00546 // Execute Query 00547 try 00548 { 00549 rs = executeStatementOnBackend(request, backend, c); 00550 cm.releaseConnection(c); 00551 } 00552 catch (SQLException e) 00553 { 00554 cm.releaseConnection(c); 00555 throw new SQLException(Translate.get( 00556 "loadbalancer.request.failed.on.backend", new String[]{ 00557 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00558 backend.getName(), e.getMessage()})); 00559 } 00560 catch (BadConnectionException e) 00561 { // Get rid of the bad connection 00562 cm.deleteConnection(c); 00563 badConnection = true; 00564 } 00565 } 00566 while (badConnection); 00567 if (logger.isDebugEnabled()) 00568 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00569 String.valueOf(request.getId()), backend.getName()})); 00570 return rs; 00571 } 00572 else 00573 { // Inside a transaction 00574 Connection c; 00575 long tid = request.getTransactionId(); 00576 Long lTid = new Long(tid); 00577 00578 // Wait for previous writes to complete 00579 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00580 waitForAllWritesToComplete(backend, request.getTransactionId()); 00581 00582 if (!backend.isStartedTransaction(lTid)) 00583 { // transaction has not been started yet on this backend 00584 try 00585 { 00586 c = cm.getConnection(tid); 00587 } 00588 catch (UnreachableBackendException e1) 00589 { 00590 logger.error(Translate.get( 00591 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00592 backend.disable(); 00593 throw new UnreachableBackendException(Translate.get( 00594 "loadbalancer.backend.unreacheable", backend.getName())); 00595 } 00596 00597 // Sanity check 00598 if (c == null) 00599 throw new SQLException(Translate.get( 00600 "loadbalancer.unable.get.connection", new String[]{ 00601 String.valueOf(tid), backend.getName()})); 00602 00603 // begin transaction 00604 backend.startTransaction(lTid); 00605 c.setAutoCommit(false); 00606 } 00607 else 00608 { // Re-use the connection used by this transaction 00609 c = cm.retrieveConnection(tid); 00610 00611 // Sanity check 00612 if (c == null) 00613 throw new SQLException(Translate.get( 00614 "loadbalancer.unable.retrieve.connection", new String[]{ 00615 String.valueOf(tid), backend.getName()})); 00616 } 00617 00618 // Execute Query 00619 ResultSet rs = null; 00620 try 00621 { 00622 rs = executeStatementOnBackend(request, backend, c); 00623 } 00624 catch (SQLException e) 00625 { 00626 throw new SQLException(Translate.get( 00627 "loadbalancer.request.failed.on.backend", new String[]{ 00628 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00629 backend.getName(), e.getMessage()})); 00630 } 00631 catch (BadConnectionException e) 00632 { // Connection failed, so did the transaction 00633 // Disable the backend. 00634 cm.deleteConnection(tid); 00635 String msg = Translate.get( 00636 "loadbalancer.backend.disabling.connection.failure", backend 00637 .getName()); 00638 logger.error(msg); 00639 backend.disable(); 00640 throw new SQLException(msg); 00641 } 00642 if (logger.isDebugEnabled()) 00643 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00644 new String[]{String.valueOf(tid), String.valueOf(request.getId()), 00645 backend.getName()})); 00646 return rs; 00647 } 00648 } |
|
Execute a statement on a backend. If the execution fails, the connection is checked for validity. If the connection was not valid, the query is automatically retried on a new connection.
AbstractLoadBalancer.java の 251 行で定義されています。 参照先 java.sql.Statement.executeQuery(), java.sql.Statement.setCursorName(), java.sql.Statement.setFetchSize(), java.sql.Statement.setMaxRows(), java.sql.Statement.setQueryTimeout(), org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetCursorName, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetFetchSize, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetMaxRows, と org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetQueryTimeout. 参照元 org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDB.execReadRequest().
00254 { 00255 ResultSet rs = null; 00256 try 00257 { 00258 backend.addPendingReadRequest(request); 00259 String sql = request.getSQL(); 00260 // Rewrite the query if needed 00261 sql = backend.rewriteQuery(sql); 00262 // Execute the query 00263 Statement s = c.createStatement(); 00264 DriverCompliance driverCompliance = backend.getDriverCompliance(); 00265 if (driverCompliance.supportSetQueryTimeout()) 00266 s.setQueryTimeout(request.getTimeout()); 00267 if ((request.getCursorName() != null) 00268 && (driverCompliance.supportSetCursorName())) 00269 s.setCursorName(request.getCursorName()); 00270 if ((request.getFetchSize() != 0) 00271 && driverCompliance.supportSetFetchSize()) 00272 s.setFetchSize(request.getFetchSize()); 00273 if ((request.getMaxRows() > 0) && driverCompliance.supportSetMaxRows()) 00274 s.setMaxRows(request.getMaxRows()); 00275 rs = s.executeQuery(sql); 00276 } 00277 catch (SQLException e) 00278 { // Something bad happened 00279 if (backend.isValidConnection(c)) 00280 throw e; // Connection is valid, throw the exception 00281 else 00282 throw new BadConnectionException(); 00283 } 00284 finally 00285 { 00286 backend.removePendingRequest(request); 00287 } 00288 return rs; 00289 } |
|
Execute a stored procedure on the selected backend.
RAIDb2.java の 658 行で定義されています。
00661 { 00662 // Handle macros 00663 proc.setSQL(RequestManager.handleSQLMacros(proc.getSQL(), 00664 timestampResolution, false)); 00665 00666 // Ok, we have a backend, let's execute the request 00667 AbstractConnectionManager cm = backend 00668 .getConnectionManager(proc.getLogin()); 00669 00670 // Sanity check 00671 if (cm == null) 00672 { 00673 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00674 new String[]{proc.getLogin(), backend.getName()}); 00675 logger.error(msg); 00676 throw new SQLException(msg); 00677 } 00678 00679 // Execute the query 00680 if (proc.isAutoCommit()) 00681 { 00682 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00683 // We could do something finer grain here by waiting 00684 // only for writes that depend on the tables we need 00685 // but is that really worth the overhead ? 00686 waitForAllWritesToComplete(backend); 00687 00688 // Use a connection just for this request 00689 Connection c = null; 00690 try 00691 { 00692 c = cm.getConnection(); 00693 } 00694 catch (UnreachableBackendException e1) 00695 { 00696 logger.error(Translate.get( 00697 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00698 backend.disable(); 00699 throw new UnreachableBackendException(Translate.get( 00700 "loadbalancer.backend.unreacheable", backend.getName())); 00701 } 00702 00703 // Sanity check 00704 if (c == null) 00705 throw new SQLException(Translate.get( 00706 "loadbalancer.backend.no.connection", backend.getName())); 00707 00708 // Execute Query 00709 ResultSet rs = null; 00710 try 00711 { 00712 // We suppose here that the request does not modify the schema since 00713 // it is a read-only query. 00714 CallableStatement cs = c.prepareCall(proc.getSQL()); 00715 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00716 cs.setQueryTimeout(proc.getTimeout()); 00717 if ((proc.getMaxRows() > 0) 00718 && backend.getDriverCompliance().supportSetMaxRows()) 00719 cs.setMaxRows(proc.getMaxRows()); 00720 rs = cs.executeQuery(); 00721 } 00722 catch (SQLException e) 00723 { 00724 throw new SQLException(Translate.get( 00725 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00726 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00727 backend.getName(), e.getMessage()})); 00728 } 00729 finally 00730 { 00731 cm.releaseConnection(c); 00732 } 00733 if (logger.isDebugEnabled()) 00734 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 00735 new String[]{String.valueOf(proc.getId()), backend.getName()})); 00736 return rs; 00737 } 00738 else 00739 { // Inside a transaction 00740 Connection c; 00741 long tid = proc.getTransactionId(); 00742 Long lTid = new Long(tid); 00743 00744 // Wait for previous writes to complete 00745 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00746 waitForAllWritesToComplete(backend, proc.getTransactionId()); 00747 00748 if (!backend.isStartedTransaction(lTid)) 00749 { // transaction has not been started yet on this backend 00750 try 00751 { 00752 c = cm.getConnection(tid); 00753 } 00754 catch (UnreachableBackendException e1) 00755 { 00756 logger.error(Translate.get( 00757 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00758 backend.disable(); 00759 throw new SQLException(Translate.get( 00760 "loadbalancer.backend.unreacheable", backend.getName())); 00761 } 00762 00763 // Sanity check 00764 if (c == null) 00765 throw new SQLException(Translate.get( 00766 "loadbalancer.unable.get.connection", new String[]{ 00767 String.valueOf(tid), backend.getName()})); 00768 00769 // begin transaction 00770 backend.startTransaction(lTid); 00771 c.setAutoCommit(false); 00772 } 00773 else 00774 { // Re-use the connection used by this transaction 00775 c = cm.retrieveConnection(tid); 00776 00777 // Sanity check 00778 if (c == null) 00779 throw new SQLException(Translate.get( 00780 "loadbalancer.unable.retrieve.connection", new String[]{ 00781 String.valueOf(tid), backend.getName()})); 00782 } 00783 00784 // Execute Query 00785 ResultSet rs; 00786 try 00787 { 00788 // We suppose here that the request does not modify the schema since 00789 // it is a read-only query. 00790 CallableStatement cs = c.prepareCall(proc.getSQL()); 00791 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00792 cs.setQueryTimeout(proc.getTimeout()); 00793 if ((proc.getMaxRows() > 0) 00794 && backend.getDriverCompliance().supportSetMaxRows()) 00795 cs.setMaxRows(proc.getMaxRows()); 00796 rs = cs.executeQuery(); 00797 } 00798 catch (SQLException e) 00799 { 00800 throw new SQLException(Translate.get( 00801 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00802 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00803 backend.getName(), e.getMessage()})); 00804 } 00805 if (logger.isDebugEnabled()) 00806 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00807 new String[]{String.valueOf(tid), String.valueOf(proc.getId()), 00808 backend.getName()})); 00809 return rs; 00810 } 00811 } |
|
Performs a write request. This request is broadcasted to all nodes that owns the table to be written.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 174 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestResult. 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeys().
00176 { 00177 execWriteRequest(request, false); 00178 return execWriteRequestResult; 00179 } |
|
Perform a write request and return the auto generated keys.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 190 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest(), と org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeysResult.
00192 { 00193 execWriteRequest(request, true); 00194 return execWriteRequestWithKeysResult; 00195 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 827 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask.getResult().
00828 { 00829 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure( 00830 proc, false); 00831 return task.getResult(); 00832 } |
|
Gets information about the request load balancer.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2_LPRF.java の 382 行で定義されています。
00383 { 00384 // We don't lock since we don't need a completely accurate value 00385 int size = vdb.getBackends().size(); 00386 00387 if (size == 0) 00388 return "RAIDb-2 Least Pending Requests First load balancer: !!!Warning!!! No backend nodes found\n"; 00389 else 00390 return "RAIDb-2 Least Pending Requests First load balancer (" + size 00391 + " backends)\n"; 00392 } |
|
Get the needed query parsing granularity.
AbstractLoadBalancer.java の 151 行で定義されています。 参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.setLoadBalancer().
00152 { 00153 return parsingGranularity; 00154 } |
|
org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2に実装されています. RAIDb2_LPRF.java の 397 行で定義されています。
00398 { 00399 return "<" + DatabasesXmlTags.ELT_RAIDb_2_LeastPendingRequestsFirst + "/>"; 00400 } |
|
Returns the RAIDbLevel.
AbstractLoadBalancer.java の 131 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.raidbLevel.
00132 { 00133 return raidbLevel; 00134 } |
|
AbstractLoadBalancer.java の 385 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getXmlImpl().
00386 { 00387 StringBuffer info = new StringBuffer(); 00388 info.append("<" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00389 info.append(getXmlImpl()); 00390 info.append("</" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00391 return info.toString(); 00392 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。 RAIDb2.java の 1515 行で定義されています。
01516 { 01517 StringBuffer info = new StringBuffer(); 01518 info.append("<" + DatabasesXmlTags.ELT_RAIDb_2 + " " 01519 + DatabasesXmlTags.ATT_timestampResolution + "=\"" 01520 + timestampResolution + "\" >"); 01521 if (createTablePolicy != null) 01522 info.append(createTablePolicy.getXml()); 01523 if (waitForCompletionPolicy != null) 01524 info.append(waitForCompletionPolicy.getXml()); 01525 this.getRaidb2Xml(); 01526 info.append("</" + DatabasesXmlTags.ELT_RAIDb_2 + ">"); 01527 return info.toString(); 01528 } |
|
Rollbacks a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 1131 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
01132 { 01133 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 01134 waitForAllWritesToComplete(tm.getTransactionId()); 01135 01136 try 01137 { 01138 backendNonBlockingThreadsRWLock.acquireRead(); 01139 } 01140 catch (InterruptedException e) 01141 { 01142 String msg = Translate.get( 01143 "loadbalancer.backendlist.acquire.readlock.failed", e); 01144 logger.error(msg); 01145 throw new SQLException(msg); 01146 } 01147 int nbOfThreads = backendNonBlockingThreads.size(); 01148 ArrayList rollbackList = new ArrayList(); 01149 Long iTid = new Long(tm.getTransactionId()); 01150 01151 // Build the list of backend that need to rollback this transaction 01152 for (int i = 0; i < nbOfThreads; i++) 01153 { 01154 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01155 .get(i); 01156 if (thread.getBackend().isStartedTransaction(iTid)) 01157 rollbackList.add(thread); 01158 } 01159 01160 nbOfThreads = rollbackList.size(); 01161 if (nbOfThreads == 0) 01162 { 01163 backendNonBlockingThreadsRWLock.releaseRead(); 01164 return; 01165 } 01166 01167 // Create the task 01168 RollbackTask task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, 01169 tm.getTimeout(), tm.getLogin(), tm.getTransactionId()); 01170 01171 synchronized (task) 01172 { 01173 // Post the task in each backendThread tasklist and wakeup the threads 01174 for (int i = 0; i < nbOfThreads; i++) 01175 { 01176 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 01177 synchronized (thread) 01178 { 01179 thread.addTask(task); 01180 thread.notify(); 01181 } 01182 } 01183 01184 backendNonBlockingThreadsRWLock.releaseRead(); 01185 01186 // Wait for completion (notified by the task) 01187 try 01188 { 01189 // Wait on task 01190 long timeout = tm.getTimeout(); 01191 if (timeout > 0) 01192 { 01193 long start = System.currentTimeMillis(); 01194 task.wait(timeout); 01195 long end = System.currentTimeMillis(); 01196 long remaining = timeout - (end - start); 01197 if (remaining <= 0) 01198 { 01199 String msg = Translate.get("loadbalancer.rollback.timeout", 01200 new String[]{String.valueOf(tm.getTransactionId()), 01201 String.valueOf(task.getSuccess()), 01202 String.valueOf(task.getFailed())}); 01203 logger.warn(msg); 01204 throw new SQLException(msg); 01205 } 01206 } 01207 else 01208 task.wait(); 01209 } 01210 catch (InterruptedException e) 01211 { 01212 throw new SQLException(Translate.get("loadbalancer.rollback.timeout", 01213 new String[]{String.valueOf(tm.getTransactionId()), 01214 String.valueOf(task.getSuccess()), 01215 String.valueOf(task.getFailed())})); 01216 } 01217 01218 if (task.getSuccess() > 0) 01219 return; 01220 else 01221 { // All tasks failed 01222 ArrayList exceptions = task.getExceptions(); 01223 if (exceptions == null) 01224 throw new SQLException(Translate.get( 01225 "loadbalancer.rollback.all.failed", tm.getTransactionId())); 01226 else 01227 { 01228 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 01229 tm.getTransactionId()) 01230 + "\n"; 01231 for (int i = 0; i < exceptions.size(); i++) 01232 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01233 logger.error(errorMsg); 01234 throw new SQLException(errorMsg); 01235 } 01236 } 01237 } 01238 } |
|
Set the needed query parsing granularity.
AbstractLoadBalancer.java の 161 行で定義されています。
00162 {
00163 this.parsingGranularity = parsingGranularity;
00164 }
|
|
Sets the RAIDbLevel.
AbstractLoadBalancer.java の 141 行で定義されています。
00142 {
00143 this.raidbLevel = raidbLevel;
00144 }
|
|
Associate a weight to a backend identified by its logical name.
org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRR, と org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDBで再定義されています。 AbstractLoadBalancer.java の 359 行で定義されています。
00360 { 00361 throw new SQLException("Weight is not supported by this load balancer"); 00362 } |
|
Waits for all writes in the blocking thread queue of the given backend to complete.
RAIDb2.java の 1312 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01314 { 01315 try 01316 { 01317 backendBlockingThreadsRWLock.acquireRead(); 01318 } 01319 catch (InterruptedException e) 01320 { 01321 String msg = Translate.get( 01322 "loadbalancer.backendlist.acquire.readlock.failed", e); 01323 logger.error(msg); 01324 throw new SQLException(msg); 01325 } 01326 01327 int nbOfThreads = backendBlockingThreads.size(); 01328 01329 for (int i = 0; i < nbOfThreads; i++) 01330 { 01331 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01332 .get(i); 01333 if (thread.getBackend() == backend) 01334 thread.waitForAllTasksToComplete(); 01335 } 01336 01337 backendBlockingThreadsRWLock.releaseRead(); 01338 } |
|
Waits for all writes of the given transaction in the blocking thread queue of the given backend to complete before being able to complete the transaction.
RAIDb2.java の 1278 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01280 { 01281 try 01282 { 01283 backendBlockingThreadsRWLock.acquireRead(); 01284 } 01285 catch (InterruptedException e) 01286 { 01287 String msg = Translate.get( 01288 "loadbalancer.backendlist.acquire.readlock.failed", e); 01289 logger.error(msg); 01290 throw new SQLException(msg); 01291 } 01292 01293 int nbOfThreads = backendBlockingThreads.size(); 01294 01295 for (int i = 0; i < nbOfThreads; i++) 01296 { 01297 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01298 .get(i); 01299 if (thread.getBackend() == backend) 01300 thread.waitForAllTasksToComplete(transactionId); 01301 } 01302 01303 backendBlockingThreadsRWLock.releaseRead(); 01304 } |
|
Waits for all writes of the given transaction in the blocking thread queue to complete before being able to complete the transaction. RAIDb2.java の 1244 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete(). 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest().
01246 { 01247 try 01248 { 01249 backendBlockingThreadsRWLock.acquireRead(); 01250 } 01251 catch (InterruptedException e) 01252 { 01253 String msg = Translate.get( 01254 "loadbalancer.backendlist.acquire.readlock.failed", e); 01255 logger.error(msg); 01256 throw new SQLException(msg); 01257 } 01258 01259 int nbOfThreads = backendBlockingThreads.size(); 01260 01261 for (int i = 0; i < nbOfThreads; i++) 01262 { 01263 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01264 .get(i); 01265 thread.waitForAllTasksToComplete(transactionId); 01266 } 01267 01268 backendBlockingThreadsRWLock.releaseRead(); 01269 } |
|
RAIDb2.java の 87 行で定義されています。 |
|
RAIDb2.java の 89 行で定義されています。 |
|
RAIDb2.java の 88 行で定義されています。 |
|
RAIDb2.java の 90 行で定義されています。 |
|
RAIDb2.java の 93 行で定義されています。 |
|
初期値: Trace
.getLogger("org.objectweb.cjdbc.controller.loadbalancer.raidb2")
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerを再定義しています。 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。 RAIDb2.java の 96 行で定義されています。 |
|
AbstractLoadBalancer.java の 74 行で定義されています。 |
|
|
AbstractLoadBalancer.java の 72 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.AbstractLoadBalancer(). |
|
RAIDb2.java の 92 行で定義されています。 |