This class is an abstract call because the read requests coming from the Request Manager are NOT treated here but in the subclasses. Transaction management and write requests are broadcasted to all backends owning the written table.
RAIDb2.java の 76 行で定義されています。
Public メソッド | |
RAIDb2 (VirtualDatabase vdb, WaitForCompletionPolicy waitForCompletionPolicy, CreateTablePolicy createTablePolicy, long timestampResolution) throws SQLException | |
int | execWriteRequest (AbstractWriteRequest request) throws AllBackendsFailedException, SQLException |
ResultSet | execWriteRequestWithKeys (AbstractWriteRequest request) throws AllBackendsFailedException, SQLException |
abstract java.sql.ResultSet | execReadRequest (SelectRequest request) throws SQLException |
ResultSet | execReadStoredProcedure (StoredProcedure proc) throws SQLException |
int | execWriteStoredProcedure (StoredProcedure proc) throws SQLException |
final void | begin (TransactionMarkerMetaData tm) throws SQLException |
void | commit (TransactionMarkerMetaData tm) throws SQLException |
void | rollback (TransactionMarkerMetaData tm) throws SQLException |
void | enableBackend (DatabaseBackend db, boolean writeEnabled) throws SQLException |
synchronized void | disableBackend (DatabaseBackend db) throws SQLException |
String | getXmlImpl () |
abstract String | getRaidb2Xml () |
int | getRAIDbLevel () |
void | setRAIDbLevel (int raidbLevel) |
int | getParsingGranularity () |
void | setParsingGranularity (int parsingGranularity) |
abstract ResultSet | execReadOnlyReadStoredProcedure (StoredProcedure proc) throws SQLException |
void | setWeight (String name, int w) throws SQLException |
abstract String | getInformation () |
String | getXml () |
Protected メソッド | |
java.sql.ResultSet | executeRequestOnBackend (SelectRequest request, DatabaseBackend backend) throws SQLException, UnreachableBackendException |
java.sql.ResultSet | executeStoredProcedureOnBackend (StoredProcedure proc, DatabaseBackend backend) throws SQLException, UnreachableBackendException |
void | waitForAllWritesToComplete (long transactionId) throws SQLException |
void | waitForAllWritesToComplete (DatabaseBackend backend, long transactionId) throws SQLException |
void | waitForAllWritesToComplete (DatabaseBackend backend) throws SQLException |
ResultSet | executeStatementOnBackend (SelectRequest request, DatabaseBackend backend, Connection c) throws SQLException, BadConnectionException |
Protected 変数 | |
ArrayList | backendBlockingThreads |
ArrayList | backendNonBlockingThreads |
ReadPrioritaryFIFOWriteLock | backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock() |
ReadPrioritaryFIFOWriteLock | backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock() |
WaitForCompletionPolicy | waitForCompletionPolicy |
CreateTablePolicy | createTablePolicy |
VirtualDatabase | vdb |
int | raidbLevel |
int | parsingGranularity |
Static Protected 変数 | |
Trace | logger |
Private メソッド | |
int | getNbToWait (int nbOfThreads) |
void | execWriteRequest (AbstractWriteRequest request, boolean useKeys) throws AllBackendsFailedException, SQLException |
AbstractTask | callStoredProcedure (StoredProcedure proc, boolean isRead) throws SQLException |
Private 変数 | |
long | timestampResolution |
int | execWriteRequestResult = 0 |
ResultSet | execWriteRequestWithKeysResult = null |
|
Creates a new RAIDb-1 Round Robin request load balancer. A new backend worker thread is created for each backend.
RAIDb2.java の 115 行で定義されています。
00119 { 00120 super(vdb, RAIDbLevels.RAIDb2, ParsingGranularities.TABLE); 00121 00122 this.waitForCompletionPolicy = waitForCompletionPolicy; 00123 backendBlockingThreads = new ArrayList(); 00124 backendNonBlockingThreads = new ArrayList(); 00125 this.createTablePolicy = createTablePolicy; 00126 this.timestampResolution = timestampResolution; 00127 } |
|
Begins a new transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 1005 行で定義されています。
01006 { 01007 } |
|
Post the stored procedure call in the threads task list.
RAIDb2.java の 843 行で定義されています。 参照先 org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.acquireRead(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.hasStoredProcedure(), org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.releaseRead(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.removeTask(), と org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.setTotalNb().
00845 { 00846 ArrayList backendThreads = backendBlockingThreads; 00847 ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock; 00848 00849 try 00850 { 00851 lock.acquireRead(); 00852 } 00853 catch (InterruptedException e) 00854 { 00855 String msg = Translate.get( 00856 "loadbalancer.backendlist.acquire.readlock.failed", e); 00857 logger.error(msg); 00858 throw new SQLException(msg); 00859 } 00860 00861 int nbOfThreads = backendThreads.size(); 00862 00863 // Create the task 00864 AbstractTask task; 00865 if (isRead) 00866 task = new ReadStoredProcedureTask(getNbToWait(nbOfThreads), nbOfThreads, 00867 proc); 00868 else 00869 task = new WriteStoredProcedureTask(getNbToWait(nbOfThreads), 00870 nbOfThreads, proc); 00871 00872 synchronized (task) 00873 { 00874 int nbOfBackends = 0; 00875 00876 // Post the task in each backendThread tasklist and wakeup the threads 00877 for (int i = 0; i < nbOfThreads; i++) 00878 { 00879 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00880 .get(i); 00881 if (thread.getBackend().hasStoredProcedure(proc.getProcedureName())) 00882 { 00883 nbOfBackends++; 00884 synchronized (thread) 00885 { 00886 if ((waitForCompletionPolicy.getPolicy() == WaitForCompletionPolicy.ALL)) 00887 thread.addTask(task); 00888 else 00889 thread.addTask(task, proc.getTransactionId()); 00890 thread.notify(); 00891 } 00892 } 00893 if (nbOfBackends == 0) 00894 throw new SQLException(Translate.get( 00895 "loadbalancer.backend.no.required.storedprocedure", proc 00896 .getProcedureName())); 00897 else 00898 task.setTotalNb(nbOfBackends); 00899 } 00900 00901 lock.releaseRead(); 00902 00903 // Wait for completion (notified by the task) 00904 try 00905 { 00906 // Wait on task 00907 long timeout = proc.getTimeout() * 1000; 00908 if (timeout > 0) 00909 { 00910 long start = System.currentTimeMillis(); 00911 task.wait(timeout); 00912 long end = System.currentTimeMillis(); 00913 long remaining = timeout - (end - start); 00914 if (remaining <= 0) 00915 { 00916 String msg = Translate.get("loadbalancer.storedprocedure.timeout", 00917 new String[]{String.valueOf(proc.getId()), 00918 String.valueOf(task.getSuccess()), 00919 String.valueOf(task.getFailed())}); 00920 00921 // Try to remove the request from the task list 00922 lock.acquireRead(); 00923 nbOfThreads = backendThreads.size(); 00924 for (int i = 0; i < nbOfThreads; i++) 00925 { 00926 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00927 .get(i); 00928 synchronized (thread) 00929 { 00930 thread.removeTask(task); 00931 } 00932 } 00933 lock.releaseRead(); 00934 00935 logger.warn(msg); 00936 throw new SQLException(msg); 00937 } 00938 // No need to update request timeout since the execution is finished 00939 } 00940 else 00941 task.wait(); 00942 } 00943 catch (InterruptedException e) 00944 { 00945 // Try to remove the request from the task list 00946 try 00947 { 00948 lock.acquireRead(); 00949 nbOfThreads = backendThreads.size(); 00950 } 00951 catch (InterruptedException ignore) 00952 { 00953 nbOfThreads = 0; // Give up 00954 } 00955 for (int i = 0; i < nbOfThreads; i++) 00956 { 00957 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00958 .get(i); 00959 synchronized (thread) 00960 { 00961 thread.removeTask(task); 00962 } 00963 } 00964 lock.releaseRead(); 00965 00966 throw new SQLException(Translate.get( 00967 "loadbalancer.storedprocedure.timeout", new String[]{ 00968 String.valueOf(proc.getId()), 00969 String.valueOf(task.getSuccess()), 00970 String.valueOf(task.getFailed())})); 00971 } 00972 00973 if (task.getSuccess() > 0) 00974 return task; 00975 else 00976 { // All tasks failed 00977 ArrayList exceptions = task.getExceptions(); 00978 if (exceptions == null) 00979 throw new SQLException(Translate.get( 00980 "loadbalancer.storedprocedure.all.failed", proc.getId())); 00981 else 00982 { 00983 String errorMsg = Translate.get( 00984 "loadbalancer.storedprocedure.failed.stack", proc.getId()) 00985 + "\n"; 00986 for (int i = 0; i < exceptions.size(); i++) 00987 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 00988 logger.error(errorMsg); 00989 throw new SQLException(errorMsg); 00990 } 00991 } 00992 } 00993 } |
|
Commits a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 1015 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
01016 { 01017 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 01018 waitForAllWritesToComplete(tm.getTransactionId()); 01019 01020 try 01021 { 01022 backendNonBlockingThreadsRWLock.acquireRead(); 01023 } 01024 catch (InterruptedException e) 01025 { 01026 String msg = Translate.get( 01027 "loadbalancer.backendlist.acquire.readlock.failed", e); 01028 logger.error(msg); 01029 throw new SQLException(msg); 01030 } 01031 01032 int nbOfThreads = backendNonBlockingThreads.size(); 01033 ArrayList commitList = new ArrayList(); 01034 Long iTid = new Long(tm.getTransactionId()); 01035 01036 // Build the list of backend that need to commit this transaction 01037 for (int i = 0; i < nbOfThreads; i++) 01038 { 01039 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01040 .get(i); 01041 if (thread.getBackend().isStartedTransaction(iTid)) 01042 commitList.add(thread); 01043 } 01044 01045 nbOfThreads = commitList.size(); 01046 if (nbOfThreads == 0) 01047 { 01048 backendNonBlockingThreadsRWLock.releaseRead(); 01049 return; 01050 } 01051 01052 // Create the task 01053 CommitTask task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm 01054 .getTimeout(), tm.getLogin(), tm.getTransactionId()); 01055 01056 synchronized (task) 01057 { 01058 // Post the task in each backendThread tasklist and wakeup the threads 01059 for (int i = 0; i < nbOfThreads; i++) 01060 { 01061 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 01062 synchronized (thread) 01063 { 01064 thread.addTask(task); 01065 thread.notify(); 01066 } 01067 } 01068 01069 backendNonBlockingThreadsRWLock.releaseRead(); 01070 01071 // Wait for completion (notified by the task) 01072 try 01073 { 01074 // Wait on task 01075 long timeout = tm.getTimeout(); 01076 if (timeout > 0) 01077 { 01078 long start = System.currentTimeMillis(); 01079 task.wait(timeout); 01080 long end = System.currentTimeMillis(); 01081 long remaining = timeout - (end - start); 01082 if (remaining <= 0) 01083 { 01084 String msg = Translate.get("loadbalancer.commit.timeout", 01085 new String[]{String.valueOf(tm.getTransactionId()), 01086 String.valueOf(task.getSuccess()), 01087 String.valueOf(task.getFailed())}); 01088 logger.warn(msg); 01089 throw new SQLException(msg); 01090 } 01091 } 01092 else 01093 task.wait(); 01094 } 01095 catch (InterruptedException e) 01096 { 01097 throw new SQLException(Translate.get("loadbalancer.commit.timeout", 01098 new String[]{String.valueOf(tm.getTransactionId()), 01099 String.valueOf(task.getSuccess()), 01100 String.valueOf(task.getFailed())})); 01101 } 01102 01103 if (task.getSuccess() > 0) 01104 return; 01105 else 01106 { // All tasks failed 01107 ArrayList exceptions = task.getExceptions(); 01108 if (exceptions == null) 01109 throw new SQLException(Translate.get( 01110 "loadbalancer.commit.all.failed", tm.getTransactionId())); 01111 else 01112 { 01113 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 01114 tm.getTransactionId()) 01115 + "\n"; 01116 for (int i = 0; i < exceptions.size(); i++) 01117 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01118 logger.error(errorMsg); 01119 throw new SQLException(errorMsg); 01120 } 01121 } 01122 } 01123 } |
|
Disables a backend that was previously enabled. Ask the corresponding connection manager to finalize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。 RAIDb2.java の 1421 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.equals(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend().
01423 { 01424 if (db.isWriteEnabled()) 01425 { 01426 // Start with the backendBlockingThread list 01427 try 01428 { 01429 backendBlockingThreadsRWLock.acquireWrite(); 01430 } 01431 catch (InterruptedException e) 01432 { 01433 String msg = Translate.get( 01434 "loadbalancer.backendlist.acquire.writelock.failed", e); 01435 logger.error(msg); 01436 throw new SQLException(msg); 01437 } 01438 01439 int nbOfThreads = backendBlockingThreads.size(); 01440 01441 // Find the right blocking thread 01442 for (int i = 0; i < nbOfThreads; i++) 01443 { 01444 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01445 .get(i); 01446 if (thread.getBackend().equals(db)) 01447 { 01448 logger.info(Translate 01449 .get("loadbalancer.backend.workerthread.blocking.remove", db 01450 .getName())); 01451 01452 backendBlockingThreads.remove(thread); 01453 01454 synchronized (thread) 01455 { 01456 // Kill the thread 01457 thread.addPriorityTask(new KillThreadTask(1, 1)); 01458 thread.notify(); 01459 } 01460 break; 01461 } 01462 } 01463 01464 backendBlockingThreadsRWLock.releaseWrite(); 01465 01466 // Continue with the backendNonBlockingThread list 01467 01468 try 01469 { 01470 backendNonBlockingThreadsRWLock.acquireWrite(); 01471 } 01472 catch (InterruptedException e) 01473 { 01474 String msg = Translate.get( 01475 "loadbalancer.backendlist.acquire.writelock.failed", e); 01476 logger.error(msg); 01477 throw new SQLException(msg); 01478 } 01479 01480 // Find the right non-blocking thread 01481 nbOfThreads = backendNonBlockingThreads.size(); 01482 for (int i = 0; i < nbOfThreads; i++) 01483 { 01484 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01485 .get(i); 01486 if (thread.getBackend().equals(db)) 01487 { 01488 logger.info(Translate.get( 01489 "loadbalancer.backend.workerthread.non.blocking.remove", db 01490 .getName())); 01491 01492 backendNonBlockingThreads.remove(thread); 01493 01494 synchronized (thread) 01495 { 01496 // Kill the thread 01497 thread.addPriorityTask(new KillThreadTask(1, 1)); 01498 thread.notify(); 01499 } 01500 break; 01501 } 01502 } 01503 01504 backendNonBlockingThreadsRWLock.releaseWrite(); 01505 } 01506 01507 db.disable(); 01508 if (db.isInitialized()) 01509 db.finalizeConnections(); 01510 } |
|
Enables a Backend that was previously disabled. Ask the corresponding connection manager to initialize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。 RAIDb2.java の 1356 行で定義されています。 参照先 org.objectweb.cjdbc.controller.backend.DatabaseBackend.enableWrite(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.getName().
01358 { 01359 if (writeEnabled) 01360 { 01361 // Create 2 worker threads 01362 BackendWorkerThread blockingThread = new BackendWorkerThread( 01363 (DatabaseBackend) db, this); 01364 BackendWorkerThread nonBlockingThread = new BackendWorkerThread( 01365 (DatabaseBackend) db, this); 01366 01367 // Add first to the blocking thread list 01368 try 01369 { 01370 backendBlockingThreadsRWLock.acquireWrite(); 01371 } 01372 catch (InterruptedException e) 01373 { 01374 String msg = Translate.get( 01375 "loadbalancer.backendlist.acquire.writelock.failed", e); 01376 logger.error(msg); 01377 throw new SQLException(msg); 01378 } 01379 backendBlockingThreads.add(blockingThread); 01380 backendBlockingThreadsRWLock.releaseWrite(); 01381 blockingThread.start(); 01382 logger.info(Translate.get( 01383 "loadbalancer.backend.workerthread.blocking.add", db.getName())); 01384 01385 // Then add to the non-blocking thread list 01386 try 01387 { 01388 backendNonBlockingThreadsRWLock.acquireWrite(); 01389 } 01390 catch (InterruptedException e) 01391 { 01392 String msg = Translate.get( 01393 "loadbalancer.backendlist.acquire.writelock.failed", e); 01394 logger.error(msg); 01395 throw new SQLException(msg); 01396 } 01397 backendNonBlockingThreads.add(nonBlockingThread); 01398 backendNonBlockingThreadsRWLock.releaseWrite(); 01399 nonBlockingThread.start(); 01400 logger.info(Translate.get( 01401 "loadbalancer.backend.workerthread.non.blocking.add", db.getName())); 01402 db.enableWrite(); 01403 } 01404 01405 if (!db.isInitialized()) 01406 db.initializeConnections(); 01407 db.enableRead(); 01408 } |
|
|
Implementation specific load balanced read execution.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_RR, と org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRRを実装しています. |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 816 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask.getResult().
00818 { 00819 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure( 00820 proc, true); 00821 return task.getResult(); 00822 } |
|
Execute a read request on the selected backend.
RAIDb2.java の 492 行で定義されています。
00494 { 00495 // Handle macros 00496 request.setSQL(RequestManager.handleSQLMacros(request.getSQL(), 00497 timestampResolution, false)); 00498 00499 // Ok, we have a backend, let's execute the request 00500 AbstractConnectionManager cm = backend.getConnectionManager(request 00501 .getLogin()); 00502 00503 // Sanity check 00504 if (cm == null) 00505 { 00506 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00507 new String[]{request.getLogin(), backend.getName()}); 00508 logger.error(msg); 00509 throw new SQLException(msg); 00510 } 00511 00512 // Execute the query 00513 if (request.isAutoCommit()) 00514 { 00515 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00516 // We could do something finer grain here by waiting 00517 // only for writes that depend on the tables we need 00518 // but is that really worth the overhead ? 00519 waitForAllWritesToComplete(backend); 00520 00521 ResultSet rs = null; 00522 boolean badConnection; 00523 do 00524 { 00525 badConnection = false; 00526 // Use a connection just for this request 00527 Connection c = null; 00528 try 00529 { 00530 c = cm.getConnection(); 00531 } 00532 catch (UnreachableBackendException e1) 00533 { 00534 logger.error(Translate.get( 00535 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00536 backend.disable(); 00537 throw new UnreachableBackendException(Translate.get( 00538 "loadbalancer.backend.unreacheable", backend.getName())); 00539 } 00540 00541 // Sanity check 00542 if (c == null) 00543 throw new UnreachableBackendException( 00544 "No more connections on backend " + backend.getName()); 00545 00546 // Execute Query 00547 try 00548 { 00549 rs = executeStatementOnBackend(request, backend, c); 00550 cm.releaseConnection(c); 00551 } 00552 catch (SQLException e) 00553 { 00554 cm.releaseConnection(c); 00555 throw new SQLException(Translate.get( 00556 "loadbalancer.request.failed.on.backend", new String[]{ 00557 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00558 backend.getName(), e.getMessage()})); 00559 } 00560 catch (BadConnectionException e) 00561 { // Get rid of the bad connection 00562 cm.deleteConnection(c); 00563 badConnection = true; 00564 } 00565 } 00566 while (badConnection); 00567 if (logger.isDebugEnabled()) 00568 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00569 String.valueOf(request.getId()), backend.getName()})); 00570 return rs; 00571 } 00572 else 00573 { // Inside a transaction 00574 Connection c; 00575 long tid = request.getTransactionId(); 00576 Long lTid = new Long(tid); 00577 00578 // Wait for previous writes to complete 00579 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00580 waitForAllWritesToComplete(backend, request.getTransactionId()); 00581 00582 if (!backend.isStartedTransaction(lTid)) 00583 { // transaction has not been started yet on this backend 00584 try 00585 { 00586 c = cm.getConnection(tid); 00587 } 00588 catch (UnreachableBackendException e1) 00589 { 00590 logger.error(Translate.get( 00591 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00592 backend.disable(); 00593 throw new UnreachableBackendException(Translate.get( 00594 "loadbalancer.backend.unreacheable", backend.getName())); 00595 } 00596 00597 // Sanity check 00598 if (c == null) 00599 throw new SQLException(Translate.get( 00600 "loadbalancer.unable.get.connection", new String[]{ 00601 String.valueOf(tid), backend.getName()})); 00602 00603 // begin transaction 00604 backend.startTransaction(lTid); 00605 c.setAutoCommit(false); 00606 } 00607 else 00608 { // Re-use the connection used by this transaction 00609 c = cm.retrieveConnection(tid); 00610 00611 // Sanity check 00612 if (c == null) 00613 throw new SQLException(Translate.get( 00614 "loadbalancer.unable.retrieve.connection", new String[]{ 00615 String.valueOf(tid), backend.getName()})); 00616 } 00617 00618 // Execute Query 00619 ResultSet rs = null; 00620 try 00621 { 00622 rs = executeStatementOnBackend(request, backend, c); 00623 } 00624 catch (SQLException e) 00625 { 00626 throw new SQLException(Translate.get( 00627 "loadbalancer.request.failed.on.backend", new String[]{ 00628 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00629 backend.getName(), e.getMessage()})); 00630 } 00631 catch (BadConnectionException e) 00632 { // Connection failed, so did the transaction 00633 // Disable the backend. 00634 cm.deleteConnection(tid); 00635 String msg = Translate.get( 00636 "loadbalancer.backend.disabling.connection.failure", backend 00637 .getName()); 00638 logger.error(msg); 00639 backend.disable(); 00640 throw new SQLException(msg); 00641 } 00642 if (logger.isDebugEnabled()) 00643 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00644 new String[]{String.valueOf(tid), String.valueOf(request.getId()), 00645 backend.getName()})); 00646 return rs; 00647 } 00648 } |
|
Execute a statement on a backend. If the execution fails, the connection is checked for validity. If the connection was not valid, the query is automatically retried on a new connection.
AbstractLoadBalancer.java の 251 行で定義されています。 参照先 java.sql.Statement.executeQuery(), java.sql.Statement.setCursorName(), java.sql.Statement.setFetchSize(), java.sql.Statement.setMaxRows(), java.sql.Statement.setQueryTimeout(), org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetCursorName, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetFetchSize, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetMaxRows, と org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetQueryTimeout. 参照元 org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDB.execReadRequest().
00254 { 00255 ResultSet rs = null; 00256 try 00257 { 00258 backend.addPendingReadRequest(request); 00259 String sql = request.getSQL(); 00260 // Rewrite the query if needed 00261 sql = backend.rewriteQuery(sql); 00262 // Execute the query 00263 Statement s = c.createStatement(); 00264 DriverCompliance driverCompliance = backend.getDriverCompliance(); 00265 if (driverCompliance.supportSetQueryTimeout()) 00266 s.setQueryTimeout(request.getTimeout()); 00267 if ((request.getCursorName() != null) 00268 && (driverCompliance.supportSetCursorName())) 00269 s.setCursorName(request.getCursorName()); 00270 if ((request.getFetchSize() != 0) 00271 && driverCompliance.supportSetFetchSize()) 00272 s.setFetchSize(request.getFetchSize()); 00273 if ((request.getMaxRows() > 0) && driverCompliance.supportSetMaxRows()) 00274 s.setMaxRows(request.getMaxRows()); 00275 rs = s.executeQuery(sql); 00276 } 00277 catch (SQLException e) 00278 { // Something bad happened 00279 if (backend.isValidConnection(c)) 00280 throw e; // Connection is valid, throw the exception 00281 else 00282 throw new BadConnectionException(); 00283 } 00284 finally 00285 { 00286 backend.removePendingRequest(request); 00287 } 00288 return rs; 00289 } |
|
Execute a stored procedure on the selected backend.
RAIDb2.java の 658 行で定義されています。
00661 { 00662 // Handle macros 00663 proc.setSQL(RequestManager.handleSQLMacros(proc.getSQL(), 00664 timestampResolution, false)); 00665 00666 // Ok, we have a backend, let's execute the request 00667 AbstractConnectionManager cm = backend 00668 .getConnectionManager(proc.getLogin()); 00669 00670 // Sanity check 00671 if (cm == null) 00672 { 00673 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00674 new String[]{proc.getLogin(), backend.getName()}); 00675 logger.error(msg); 00676 throw new SQLException(msg); 00677 } 00678 00679 // Execute the query 00680 if (proc.isAutoCommit()) 00681 { 00682 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00683 // We could do something finer grain here by waiting 00684 // only for writes that depend on the tables we need 00685 // but is that really worth the overhead ? 00686 waitForAllWritesToComplete(backend); 00687 00688 // Use a connection just for this request 00689 Connection c = null; 00690 try 00691 { 00692 c = cm.getConnection(); 00693 } 00694 catch (UnreachableBackendException e1) 00695 { 00696 logger.error(Translate.get( 00697 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00698 backend.disable(); 00699 throw new UnreachableBackendException(Translate.get( 00700 "loadbalancer.backend.unreacheable", backend.getName())); 00701 } 00702 00703 // Sanity check 00704 if (c == null) 00705 throw new SQLException(Translate.get( 00706 "loadbalancer.backend.no.connection", backend.getName())); 00707 00708 // Execute Query 00709 ResultSet rs = null; 00710 try 00711 { 00712 // We suppose here that the request does not modify the schema since 00713 // it is a read-only query. 00714 CallableStatement cs = c.prepareCall(proc.getSQL()); 00715 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00716 cs.setQueryTimeout(proc.getTimeout()); 00717 if ((proc.getMaxRows() > 0) 00718 && backend.getDriverCompliance().supportSetMaxRows()) 00719 cs.setMaxRows(proc.getMaxRows()); 00720 rs = cs.executeQuery(); 00721 } 00722 catch (SQLException e) 00723 { 00724 throw new SQLException(Translate.get( 00725 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00726 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00727 backend.getName(), e.getMessage()})); 00728 } 00729 finally 00730 { 00731 cm.releaseConnection(c); 00732 } 00733 if (logger.isDebugEnabled()) 00734 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 00735 new String[]{String.valueOf(proc.getId()), backend.getName()})); 00736 return rs; 00737 } 00738 else 00739 { // Inside a transaction 00740 Connection c; 00741 long tid = proc.getTransactionId(); 00742 Long lTid = new Long(tid); 00743 00744 // Wait for previous writes to complete 00745 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00746 waitForAllWritesToComplete(backend, proc.getTransactionId()); 00747 00748 if (!backend.isStartedTransaction(lTid)) 00749 { // transaction has not been started yet on this backend 00750 try 00751 { 00752 c = cm.getConnection(tid); 00753 } 00754 catch (UnreachableBackendException e1) 00755 { 00756 logger.error(Translate.get( 00757 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00758 backend.disable(); 00759 throw new SQLException(Translate.get( 00760 "loadbalancer.backend.unreacheable", backend.getName())); 00761 } 00762 00763 // Sanity check 00764 if (c == null) 00765 throw new SQLException(Translate.get( 00766 "loadbalancer.unable.get.connection", new String[]{ 00767 String.valueOf(tid), backend.getName()})); 00768 00769 // begin transaction 00770 backend.startTransaction(lTid); 00771 c.setAutoCommit(false); 00772 } 00773 else 00774 { // Re-use the connection used by this transaction 00775 c = cm.retrieveConnection(tid); 00776 00777 // Sanity check 00778 if (c == null) 00779 throw new SQLException(Translate.get( 00780 "loadbalancer.unable.retrieve.connection", new String[]{ 00781 String.valueOf(tid), backend.getName()})); 00782 } 00783 00784 // Execute Query 00785 ResultSet rs; 00786 try 00787 { 00788 // We suppose here that the request does not modify the schema since 00789 // it is a read-only query. 00790 CallableStatement cs = c.prepareCall(proc.getSQL()); 00791 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00792 cs.setQueryTimeout(proc.getTimeout()); 00793 if ((proc.getMaxRows() > 0) 00794 && backend.getDriverCompliance().supportSetMaxRows()) 00795 cs.setMaxRows(proc.getMaxRows()); 00796 rs = cs.executeQuery(); 00797 } 00798 catch (SQLException e) 00799 { 00800 throw new SQLException(Translate.get( 00801 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00802 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00803 backend.getName(), e.getMessage()})); 00804 } 00805 if (logger.isDebugEnabled()) 00806 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00807 new String[]{String.valueOf(tid), String.valueOf(proc.getId()), 00808 backend.getName()})); 00809 return rs; 00810 } 00811 } |
|
Common code for execWriteRequest(AbstractWriteRequest) and execWriteRequestWithKeys(AbstractWriteRequest). The result is given back using member variables execWriteRequestResult and execWriteRequestWithKeysResult defined above.
RAIDb2.java の 209 行で定義されています。 参照先 org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.acquireRead(), org.objectweb.cjdbc.common.log.Trace.debug(), org.objectweb.cjdbc.common.log.Trace.error(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getBackends(), org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule.getBackends(), org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy.getDefaultRule(), org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy.getPolicy(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getSQLShortFormLength(), org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy.getTableRule(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.hasTable(), org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.waitForAllWritesToComplete(), と org.objectweb.cjdbc.common.log.Trace.warn().
00211 { 00212 ArrayList backendThreads; 00213 ReadPrioritaryFIFOWriteLock lock; 00214 00215 // Handle macros 00216 request.setSQL(RequestManager.handleSQLMacros(request.getSQL(), 00217 timestampResolution, false)); 00218 00219 // Determine which list (blocking or not) to use 00220 if (request.mightBlock()) 00221 { // Blocking 00222 backendThreads = backendBlockingThreads; 00223 lock = backendBlockingThreadsRWLock; 00224 } 00225 else 00226 { // Non-blocking 00227 backendThreads = backendNonBlockingThreads; 00228 lock = backendNonBlockingThreadsRWLock; 00229 if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00230 && (request.getTransactionId() != 0)) 00231 waitForAllWritesToComplete(request.getTransactionId()); 00232 } 00233 00234 try 00235 { 00236 lock.acquireRead(); 00237 } 00238 catch (InterruptedException e) 00239 { 00240 String msg = Translate.get( 00241 "loadbalancer.backendlist.acquire.readlock.failed", e); 00242 logger.error(msg); 00243 throw new SQLException(msg); 00244 } 00245 00246 int nbOfThreads = backendThreads.size(); 00247 ArrayList writeList = new ArrayList(); 00248 String tableName = request.getTableName(); 00249 00250 if (request.isCreate()) 00251 { // Choose the backend according to the defined policy 00252 CreateTableRule rule = createTablePolicy.getTableRule(request 00253 .getTableName()); 00254 if (rule == null) 00255 rule = createTablePolicy.getDefaultRule(); 00256 00257 // Ask the rule to pickup the backends 00258 ArrayList chosen; 00259 try 00260 { 00261 chosen = rule.getBackends(vdb.getBackends()); 00262 } 00263 catch (CreateTableException e) 00264 { 00265 throw new SQLException(Translate.get( 00266 "loadbalancer.create.table.rule.failed", e.getMessage())); 00267 } 00268 00269 // Build the thread list from the backend list 00270 if (chosen != null) 00271 { 00272 for (int i = 0; i < nbOfThreads; i++) 00273 { 00274 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00275 .get(i); 00276 if (chosen.contains(thread.getBackend())) 00277 writeList.add(thread); 00278 } 00279 } 00280 } 00281 else 00282 { // Build the list of backends that need to execute this request 00283 for (int i = 0; i < nbOfThreads; i++) 00284 { 00285 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00286 .get(i); 00287 if (thread.getBackend().hasTable(tableName)) 00288 writeList.add(thread); 00289 } 00290 } 00291 00292 nbOfThreads = writeList.size(); 00293 if (nbOfThreads == 0) 00294 { 00295 String msg = Translate.get("loadbalancer.execute.no.backend.found", 00296 request.getSQLShortForm(vdb.getSQLShortFormLength())); 00297 logger.warn(msg); 00298 throw new SQLException(msg); 00299 } 00300 else 00301 logger.debug(Translate.get("loadbalancer.execute.on.several", 00302 new String[]{String.valueOf(request.getId()), 00303 String.valueOf(nbOfThreads)})); 00304 00305 // Create the task 00306 AbstractTask task; 00307 if (useKeys) 00308 task = new WriteRequestWithKeysTask(getNbToWait(nbOfThreads), 00309 nbOfThreads, request); 00310 else 00311 task = new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads, 00312 request); 00313 00314 synchronized (task) 00315 { 00316 if (waitForCompletionPolicy.getPolicy() == WaitForCompletionPolicy.ALL) 00317 { // Post the task in each backendThread tasklist and wakeup the threads 00318 for (int i = 0; i < nbOfThreads; i++) 00319 { 00320 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00321 .get(i); 00322 synchronized (thread) 00323 { 00324 thread.addTask(task); 00325 thread.notify(); 00326 } 00327 } 00328 } 00329 else 00330 { 00331 // We have to first post the request on each backend before letting the 00332 // first backend to execute the request. Therefore we have 2 phases: 00333 // 1. post the task in each thread queue 00334 // 2. notify each thread to execute the query 00335 00336 // 1. Post the task 00337 if (request.mightBlock()) 00338 { 00339 for (int i = 0; i < nbOfThreads; i++) 00340 { 00341 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00342 .get(i); 00343 synchronized (thread) 00344 { 00345 thread.addTask(task, request.getTransactionId()); 00346 } 00347 } 00348 } 00349 else 00350 { 00351 for (int i = 0; i < nbOfThreads; i++) 00352 { 00353 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00354 .get(i); 00355 synchronized (thread) 00356 { 00357 thread.addTask(task); 00358 } 00359 } 00360 } 00361 00362 // 2. Start the task execution on each backend 00363 for (int i = 0; i < nbOfThreads; i++) 00364 { 00365 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00366 .get(i); 00367 synchronized (thread) 00368 { 00369 thread.notify(); 00370 } 00371 } 00372 } 00373 00374 lock.releaseRead(); 00375 00376 // Wait for completion (notified by the task) 00377 try 00378 { 00379 // Wait on task 00380 long timeout = request.getTimeout() * 1000; 00381 if (timeout > 0) 00382 { 00383 long start = System.currentTimeMillis(); 00384 task.wait(timeout); 00385 long end = System.currentTimeMillis(); 00386 long remaining = timeout - (end - start); 00387 if (remaining <= 0) 00388 { 00389 String msg = Translate.get("loadbalancer.request.timeout", 00390 new String[]{String.valueOf(request.getId()), 00391 String.valueOf(task.getSuccess()), 00392 String.valueOf(task.getFailed())}); 00393 00394 // Try to remove the request from the task list 00395 lock.acquireRead(); 00396 nbOfThreads = backendThreads.size(); 00397 for (int i = 0; i < nbOfThreads; i++) 00398 { 00399 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00400 .get(i); 00401 synchronized (thread) 00402 { 00403 thread.removeTask(task); 00404 } 00405 } 00406 lock.releaseRead(); 00407 00408 logger.warn(msg); 00409 throw new SQLException(msg); 00410 } 00411 // No need to update request timeout since the execution is finished 00412 } 00413 else 00414 task.wait(); 00415 } 00416 catch (InterruptedException e) 00417 { 00418 // Try to remove the request from the task list 00419 try 00420 { 00421 lock.acquireRead(); 00422 nbOfThreads = backendThreads.size(); 00423 } 00424 catch (InterruptedException ignore) 00425 { 00426 nbOfThreads = 0; // Give up 00427 } 00428 for (int i = 0; i < nbOfThreads; i++) 00429 { 00430 BackendWorkerThread thread = (BackendWorkerThread) backendThreads 00431 .get(i); 00432 synchronized (thread) 00433 { 00434 thread.removeTask(task); 00435 } 00436 } 00437 lock.releaseRead(); 00438 00439 throw new SQLException(Translate.get("loadbalancer.request.timeout", 00440 new String[]{String.valueOf(request.getId()), 00441 String.valueOf(task.getSuccess()), 00442 String.valueOf(task.getFailed())})); 00443 } 00444 00445 if (task.getSuccess() > 0) 00446 { 00447 if (useKeys) 00448 execWriteRequestWithKeysResult = ((WriteRequestWithKeysTask) task) 00449 .getResult(); 00450 else 00451 execWriteRequestResult = ((WriteRequestTask) task).getResult(); 00452 00453 } 00454 else 00455 { // All tasks failed 00456 ArrayList exceptions = task.getExceptions(); 00457 if (exceptions == null) 00458 throw new AllBackendsFailedException(Translate.get( 00459 "loadbalancer.request.failed.all", request.getId())); 00460 else 00461 { 00462 String errorMsg = Translate.get("loadbalancer.request.failed.stack", 00463 request.getId()) 00464 + "\n"; 00465 for (int i = 0; i < exceptions.size(); i++) 00466 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 00467 logger.error(errorMsg); 00468 throw new SQLException(errorMsg); 00469 } 00470 } 00471 } 00472 } |
|
Performs a write request. This request is broadcasted to all nodes that owns the table to be written.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 174 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestResult. 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeys().
00176 { 00177 execWriteRequest(request, false); 00178 return execWriteRequestResult; 00179 } |
|
Perform a write request and return the auto generated keys.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 190 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest(), と org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeysResult.
00192 { 00193 execWriteRequest(request, true); 00194 return execWriteRequestWithKeysResult; 00195 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 827 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask.getResult().
00828 { 00829 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure( 00830 proc, false); 00831 return task.getResult(); 00832 } |
|
Get information about the Request Load Balancer
org.objectweb.cjdbc.controller.loadbalancer.paralleldb.ParallelDB_LPRF, org.objectweb.cjdbc.controller.loadbalancer.paralleldb.ParallelDB_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRR, と org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDBを実装しています. |
|
Returns the number of nodes to wait for according to the defined
RAIDb2.java の 140 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy.getPolicy(), と org.objectweb.cjdbc.common.log.Trace.warn().
00141 { 00142 int nbToWait; 00143 switch (waitForCompletionPolicy.getPolicy()) 00144 { 00145 case WaitForCompletionPolicy.FIRST : 00146 nbToWait = 1; 00147 break; 00148 case WaitForCompletionPolicy.MAJORITY : 00149 nbToWait = nbOfThreads / 2 + 1; 00150 break; 00151 default : 00152 logger 00153 .warn(Translate.get("loadbalancer.waitforcompletion.unsupported")); 00154 case WaitForCompletionPolicy.ALL : 00155 nbToWait = nbOfThreads; 00156 break; 00157 } 00158 return nbToWait; 00159 } |
|
Get the needed query parsing granularity.
AbstractLoadBalancer.java の 151 行で定義されています。 参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.setLoadBalancer().
00152 { 00153 return parsingGranularity; 00154 } |
|
return xml formatted information about this raidb2 load balancer
org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_RR, と org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRRを実装しています. |
|
Returns the RAIDbLevel.
AbstractLoadBalancer.java の 131 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.raidbLevel.
00132 { 00133 return raidbLevel; 00134 } |
|
AbstractLoadBalancer.java の 385 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getXmlImpl().
00386 { 00387 StringBuffer info = new StringBuffer(); 00388 info.append("<" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00389 info.append(getXmlImpl()); 00390 info.append("</" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00391 return info.toString(); 00392 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。 RAIDb2.java の 1515 行で定義されています。
01516 { 01517 StringBuffer info = new StringBuffer(); 01518 info.append("<" + DatabasesXmlTags.ELT_RAIDb_2 + " " 01519 + DatabasesXmlTags.ATT_timestampResolution + "=\"" 01520 + timestampResolution + "\" >"); 01521 if (createTablePolicy != null) 01522 info.append(createTablePolicy.getXml()); 01523 if (waitForCompletionPolicy != null) 01524 info.append(waitForCompletionPolicy.getXml()); 01525 this.getRaidb2Xml(); 01526 info.append("</" + DatabasesXmlTags.ELT_RAIDb_2 + ">"); 01527 return info.toString(); 01528 } |
|
Rollbacks a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb2.java の 1131 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
01132 { 01133 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 01134 waitForAllWritesToComplete(tm.getTransactionId()); 01135 01136 try 01137 { 01138 backendNonBlockingThreadsRWLock.acquireRead(); 01139 } 01140 catch (InterruptedException e) 01141 { 01142 String msg = Translate.get( 01143 "loadbalancer.backendlist.acquire.readlock.failed", e); 01144 logger.error(msg); 01145 throw new SQLException(msg); 01146 } 01147 int nbOfThreads = backendNonBlockingThreads.size(); 01148 ArrayList rollbackList = new ArrayList(); 01149 Long iTid = new Long(tm.getTransactionId()); 01150 01151 // Build the list of backend that need to rollback this transaction 01152 for (int i = 0; i < nbOfThreads; i++) 01153 { 01154 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01155 .get(i); 01156 if (thread.getBackend().isStartedTransaction(iTid)) 01157 rollbackList.add(thread); 01158 } 01159 01160 nbOfThreads = rollbackList.size(); 01161 if (nbOfThreads == 0) 01162 { 01163 backendNonBlockingThreadsRWLock.releaseRead(); 01164 return; 01165 } 01166 01167 // Create the task 01168 RollbackTask task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, 01169 tm.getTimeout(), tm.getLogin(), tm.getTransactionId()); 01170 01171 synchronized (task) 01172 { 01173 // Post the task in each backendThread tasklist and wakeup the threads 01174 for (int i = 0; i < nbOfThreads; i++) 01175 { 01176 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 01177 synchronized (thread) 01178 { 01179 thread.addTask(task); 01180 thread.notify(); 01181 } 01182 } 01183 01184 backendNonBlockingThreadsRWLock.releaseRead(); 01185 01186 // Wait for completion (notified by the task) 01187 try 01188 { 01189 // Wait on task 01190 long timeout = tm.getTimeout(); 01191 if (timeout > 0) 01192 { 01193 long start = System.currentTimeMillis(); 01194 task.wait(timeout); 01195 long end = System.currentTimeMillis(); 01196 long remaining = timeout - (end - start); 01197 if (remaining <= 0) 01198 { 01199 String msg = Translate.get("loadbalancer.rollback.timeout", 01200 new String[]{String.valueOf(tm.getTransactionId()), 01201 String.valueOf(task.getSuccess()), 01202 String.valueOf(task.getFailed())}); 01203 logger.warn(msg); 01204 throw new SQLException(msg); 01205 } 01206 } 01207 else 01208 task.wait(); 01209 } 01210 catch (InterruptedException e) 01211 { 01212 throw new SQLException(Translate.get("loadbalancer.rollback.timeout", 01213 new String[]{String.valueOf(tm.getTransactionId()), 01214 String.valueOf(task.getSuccess()), 01215 String.valueOf(task.getFailed())})); 01216 } 01217 01218 if (task.getSuccess() > 0) 01219 return; 01220 else 01221 { // All tasks failed 01222 ArrayList exceptions = task.getExceptions(); 01223 if (exceptions == null) 01224 throw new SQLException(Translate.get( 01225 "loadbalancer.rollback.all.failed", tm.getTransactionId())); 01226 else 01227 { 01228 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 01229 tm.getTransactionId()) 01230 + "\n"; 01231 for (int i = 0; i < exceptions.size(); i++) 01232 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01233 logger.error(errorMsg); 01234 throw new SQLException(errorMsg); 01235 } 01236 } 01237 } 01238 } |
|
Set the needed query parsing granularity.
AbstractLoadBalancer.java の 161 行で定義されています。
00162 {
00163 this.parsingGranularity = parsingGranularity;
00164 }
|
|
Sets the RAIDbLevel.
AbstractLoadBalancer.java の 141 行で定義されています。
00142 {
00143 this.raidbLevel = raidbLevel;
00144 }
|
|
Associate a weight to a backend identified by its logical name.
org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRR, と org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDBで再定義されています。 AbstractLoadBalancer.java の 359 行で定義されています。
00360 { 00361 throw new SQLException("Weight is not supported by this load balancer"); 00362 } |
|
Waits for all writes in the blocking thread queue of the given backend to complete.
RAIDb2.java の 1312 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01314 { 01315 try 01316 { 01317 backendBlockingThreadsRWLock.acquireRead(); 01318 } 01319 catch (InterruptedException e) 01320 { 01321 String msg = Translate.get( 01322 "loadbalancer.backendlist.acquire.readlock.failed", e); 01323 logger.error(msg); 01324 throw new SQLException(msg); 01325 } 01326 01327 int nbOfThreads = backendBlockingThreads.size(); 01328 01329 for (int i = 0; i < nbOfThreads; i++) 01330 { 01331 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01332 .get(i); 01333 if (thread.getBackend() == backend) 01334 thread.waitForAllTasksToComplete(); 01335 } 01336 01337 backendBlockingThreadsRWLock.releaseRead(); 01338 } |
|
Waits for all writes of the given transaction in the blocking thread queue of the given backend to complete before being able to complete the transaction.
RAIDb2.java の 1278 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01280 { 01281 try 01282 { 01283 backendBlockingThreadsRWLock.acquireRead(); 01284 } 01285 catch (InterruptedException e) 01286 { 01287 String msg = Translate.get( 01288 "loadbalancer.backendlist.acquire.readlock.failed", e); 01289 logger.error(msg); 01290 throw new SQLException(msg); 01291 } 01292 01293 int nbOfThreads = backendBlockingThreads.size(); 01294 01295 for (int i = 0; i < nbOfThreads; i++) 01296 { 01297 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01298 .get(i); 01299 if (thread.getBackend() == backend) 01300 thread.waitForAllTasksToComplete(transactionId); 01301 } 01302 01303 backendBlockingThreadsRWLock.releaseRead(); 01304 } |
|
Waits for all writes of the given transaction in the blocking thread queue to complete before being able to complete the transaction. RAIDb2.java の 1244 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete(). 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest().
01246 { 01247 try 01248 { 01249 backendBlockingThreadsRWLock.acquireRead(); 01250 } 01251 catch (InterruptedException e) 01252 { 01253 String msg = Translate.get( 01254 "loadbalancer.backendlist.acquire.readlock.failed", e); 01255 logger.error(msg); 01256 throw new SQLException(msg); 01257 } 01258 01259 int nbOfThreads = backendBlockingThreads.size(); 01260 01261 for (int i = 0; i < nbOfThreads; i++) 01262 { 01263 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01264 .get(i); 01265 thread.waitForAllTasksToComplete(transactionId); 01266 } 01267 01268 backendBlockingThreadsRWLock.releaseRead(); 01269 } |
|
RAIDb2.java の 87 行で定義されています。 |
|
RAIDb2.java の 89 行で定義されています。 |
|
RAIDb2.java の 88 行で定義されています。 |
|
RAIDb2.java の 90 行で定義されています。 |
|
RAIDb2.java の 93 行で定義されています。 |
|
RAIDb2.java の 161 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest(). |
|
RAIDb2.java の 162 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeys(). |
|
初期値: Trace
.getLogger("org.objectweb.cjdbc.controller.loadbalancer.raidb2")
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerを再定義しています。 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。 RAIDb2.java の 96 行で定義されています。 |
|
AbstractLoadBalancer.java の 74 行で定義されています。 |
|
AbstractLoadBalancer.java の 73 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.AbstractLoadBalancer(), と org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getRAIDbLevel(). |
|
RAIDb2.java の 94 行で定義されています。 |
|
AbstractLoadBalancer.java の 72 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.AbstractLoadBalancer(). |
|
RAIDb2.java の 92 行で定義されています。 |