The read requests coming from the Request Manager are sent in a round robin to the backend nodes. Write requests are broadcasted to all backends.
RAIDb1_RR.java の 51 行で定義されています。
Public メソッド | |
RAIDb1_RR (VirtualDatabase vdb, WaitForCompletionPolicy waitForCompletionPolicy, long timestampResolution) throws SQLException | |
java.sql.ResultSet | execReadRequest (SelectRequest request) throws SQLException |
ResultSet | execReadOnlyReadStoredProcedure (StoredProcedure proc) throws SQLException |
String | getInformation () |
String | getRaidb1Xml () |
int | execWriteRequest (AbstractWriteRequest request) throws AllBackendsFailedException, SQLException |
ResultSet | execWriteRequestWithKeys (AbstractWriteRequest request) throws AllBackendsFailedException, SQLException |
ResultSet | execReadStoredProcedure (StoredProcedure proc) throws SQLException |
int | execWriteStoredProcedure (StoredProcedure proc) throws SQLException |
final void | begin (TransactionMarkerMetaData tm) throws SQLException |
void | commit (TransactionMarkerMetaData tm) throws SQLException |
void | rollback (TransactionMarkerMetaData tm) throws SQLException |
void | enableBackend (DatabaseBackend db, boolean writeEnabled) throws SQLException |
synchronized void | disableBackend (DatabaseBackend db) throws SQLException |
String | getXmlImpl () |
int | getRAIDbLevel () |
void | setRAIDbLevel (int raidbLevel) |
int | getParsingGranularity () |
void | setParsingGranularity (int parsingGranularity) |
void | setWeight (String name, int w) throws SQLException |
String | getXml () |
Protected メソッド | |
java.sql.ResultSet | executeRequestOnBackend (SelectRequest request, DatabaseBackend backend) throws SQLException, UnreachableBackendException |
java.sql.ResultSet | executeStoredProcedureOnBackend (StoredProcedure proc, DatabaseBackend backend) throws SQLException, UnreachableBackendException |
void | waitForAllWritesToComplete (long transactionId) throws SQLException |
void | waitForAllWritesToComplete (DatabaseBackend backend, long transactionId) throws SQLException |
void | waitForAllWritesToComplete (DatabaseBackend backend) throws SQLException |
ResultSet | executeStatementOnBackend (SelectRequest request, DatabaseBackend backend, Connection c) throws SQLException, BadConnectionException |
Protected 変数 | |
ArrayList | backendBlockingThreads |
ArrayList | backendNonBlockingThreads |
ReadPrioritaryFIFOWriteLock | backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock() |
ReadPrioritaryFIFOWriteLock | backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock() |
WaitForCompletionPolicy | waitForCompletionPolicy |
VirtualDatabase | vdb |
int | raidbLevel |
int | parsingGranularity |
Static Protected 変数 | |
Trace | logger |
Private メソッド | |
ResultSet | executeRoundRobinRequest (AbstractRequest request, boolean isSelect, String errorMsgPrefix) throws SQLException |
Private 変数 | |
int | index |
|
Creates a new RAIDb-1 Round Robin request load balancer.
RAIDb1_RR.java の 73 行で定義されています。
00076 { 00077 super(vdb, waitForCompletionPolicy, timestampResolution); 00078 index = -1; 00079 } |
|
Begins a new transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 910 行で定義されています。
00911 { 00912 } |
|
Commits a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 920 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
00921 { 00922 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00923 waitForAllWritesToComplete(tm.getTransactionId()); 00924 00925 try 00926 { 00927 backendNonBlockingThreadsRWLock.acquireRead(); 00928 } 00929 catch (InterruptedException e) 00930 { 00931 String msg = Translate.get( 00932 "loadbalancer.backendlist.acquire.readlock.failed", e); 00933 logger.error(msg); 00934 throw new SQLException(msg); 00935 } 00936 00937 int nbOfThreads = backendNonBlockingThreads.size(); 00938 ArrayList commitList = new ArrayList(); 00939 Long lTid = new Long(tm.getTransactionId()); 00940 00941 // Build the list of backend that need to commit this transaction 00942 for (int i = 0; i < nbOfThreads; i++) 00943 { 00944 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 00945 .get(i); 00946 if (thread.getBackend().isStartedTransaction(lTid)) 00947 commitList.add(thread); 00948 } 00949 00950 nbOfThreads = commitList.size(); 00951 if (nbOfThreads == 0) 00952 { 00953 backendNonBlockingThreadsRWLock.releaseRead(); 00954 return; 00955 } 00956 00957 // Create the task 00958 CommitTask task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm 00959 .getTimeout(), tm.getLogin(), tm.getTransactionId()); 00960 00961 synchronized (task) 00962 { 00963 // Post the task in each backendThread tasklist and wakeup the threads 00964 for (int i = 0; i < nbOfThreads; i++) 00965 { 00966 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 00967 synchronized (thread) 00968 { 00969 thread.addTask(task); 00970 thread.notify(); 00971 } 00972 } 00973 00974 backendNonBlockingThreadsRWLock.releaseRead(); 00975 00976 // Wait for completion (notified by the task) 00977 try 00978 { 00979 // Wait on task 00980 long timeout = tm.getTimeout(); 00981 if (timeout > 0) 00982 { 00983 long start = System.currentTimeMillis(); 00984 task.wait(timeout); 00985 long end = System.currentTimeMillis(); 00986 long remaining = timeout - (end - start); 00987 if (remaining <= 0) 00988 { 00989 String msg = Translate.get("loadbalancer.commit.timeout", 00990 new String[]{String.valueOf(tm.getTransactionId()), 00991 String.valueOf(task.getSuccess()), 00992 String.valueOf(task.getFailed())}); 00993 logger.warn(msg); 00994 throw new SQLException(msg); 00995 } 00996 } 00997 else 00998 task.wait(); 00999 } 01000 catch (InterruptedException e) 01001 { 01002 throw new SQLException(Translate.get("loadbalancer.commit.timeout", 01003 new String[]{String.valueOf(tm.getTransactionId()), 01004 String.valueOf(task.getSuccess()), 01005 String.valueOf(task.getFailed())})); 01006 } 01007 01008 if (task.getSuccess() > 0) 01009 return; 01010 else 01011 { // All tasks failed 01012 ArrayList exceptions = task.getExceptions(); 01013 if (exceptions == null) 01014 throw new SQLException(Translate.get( 01015 "loadbalancer.commit.all.failed", tm.getTransactionId())); 01016 else 01017 { 01018 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 01019 tm.getTransactionId()) 01020 + "\n"; 01021 for (int i = 0; i < exceptions.size(); i++) 01022 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01023 logger.error(errorMsg); 01024 throw new SQLException(errorMsg); 01025 } 01026 } 01027 } 01028 } |
|
Disables a backend that was previously enabled. Ask the corresponding connection manager to finalize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ecで再定義されています。 RAIDb1.java の 1326 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.equals(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend().
01328 { 01329 if (db.isWriteEnabled()) 01330 { 01331 // Starts with backendBlockingThreads 01332 try 01333 { 01334 backendBlockingThreadsRWLock.acquireWrite(); 01335 } 01336 catch (InterruptedException e) 01337 { 01338 String msg = Translate.get( 01339 "loadbalancer.backendlist.acquire.writelock.failed", e); 01340 logger.error(msg); 01341 throw new SQLException(msg); 01342 } 01343 01344 int nbOfThreads = backendBlockingThreads.size(); 01345 01346 // Find the right blocking thread 01347 for (int i = 0; i < nbOfThreads; i++) 01348 { 01349 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01350 .get(i); 01351 if (thread.getBackend().equals(db)) 01352 { 01353 logger.info(Translate 01354 .get("loadbalancer.backend.workerthread.blocking.remove", db 01355 .getName())); 01356 01357 // Remove it from the backendBlockingThread list 01358 backendBlockingThreads.remove(thread); 01359 01360 synchronized (thread) 01361 { 01362 // Kill the thread 01363 thread.addPriorityTask(new KillThreadTask(1, 1)); 01364 thread.notify(); 01365 } 01366 break; 01367 } 01368 } 01369 01370 backendBlockingThreadsRWLock.releaseWrite(); 01371 01372 // Continue with backendNonBlockingThreads 01373 01374 try 01375 { 01376 backendNonBlockingThreadsRWLock.acquireWrite(); 01377 } 01378 catch (InterruptedException e) 01379 { 01380 String msg = Translate.get( 01381 "loadbalancer.backendlist.acquire.writelock.failed", e); 01382 logger.error(msg); 01383 throw new SQLException(msg); 01384 } 01385 01386 // Find the right non-blocking thread 01387 nbOfThreads = backendNonBlockingThreads.size(); 01388 for (int i = 0; i < nbOfThreads; i++) 01389 { 01390 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01391 .get(i); 01392 if (thread.getBackend().equals(db)) 01393 { 01394 logger.info(Translate.get( 01395 "loadbalancer.backend.workerthread.non.blocking.remove", db 01396 .getName())); 01397 01398 // Remove it from the backendNonBlockingThreads list 01399 backendNonBlockingThreads.remove(thread); 01400 01401 synchronized (thread) 01402 { 01403 // Kill the thread 01404 thread.addPriorityTask(new KillThreadTask(1, 1)); 01405 thread.notify(); 01406 } 01407 break; 01408 } 01409 } 01410 01411 backendNonBlockingThreadsRWLock.releaseWrite(); 01412 } 01413 01414 db.disable(); 01415 if (db.isInitialized()) 01416 db.finalizeConnections(); 01417 } |
|
Enables a Backend that was previously disabled. Ask the corresponding connection manager to initialize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ecで再定義されています。 RAIDb1.java の 1261 行で定義されています。 参照先 org.objectweb.cjdbc.controller.backend.DatabaseBackend.enableWrite(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.getName().
01263 { 01264 if (writeEnabled) 01265 { 01266 // Create 2 worker threads 01267 BackendWorkerThread blockingThread = new BackendWorkerThread( 01268 (DatabaseBackend) db, this); 01269 BackendWorkerThread nonBlockingThread = new BackendWorkerThread( 01270 (DatabaseBackend) db, this); 01271 01272 // Add first to the blocking thread list 01273 try 01274 { 01275 backendBlockingThreadsRWLock.acquireWrite(); 01276 } 01277 catch (InterruptedException e) 01278 { 01279 String msg = Translate.get( 01280 "loadbalancer.backendlist.acquire.writelock.failed", e); 01281 logger.error(msg); 01282 throw new SQLException(msg); 01283 } 01284 backendBlockingThreads.add(blockingThread); 01285 backendBlockingThreadsRWLock.releaseWrite(); 01286 blockingThread.start(); 01287 logger.info(Translate.get( 01288 "loadbalancer.backend.workerthread.blocking.add", db.getName())); 01289 01290 // Then add to the non-blocking thread list 01291 try 01292 { 01293 backendNonBlockingThreadsRWLock.acquireWrite(); 01294 } 01295 catch (InterruptedException e) 01296 { 01297 String msg = Translate.get( 01298 "loadbalancer.backendlist.acquire.writelock.failed", e); 01299 logger.error(msg); 01300 throw new SQLException(msg); 01301 } 01302 backendNonBlockingThreads.add(nonBlockingThread); 01303 backendNonBlockingThreadsRWLock.releaseWrite(); 01304 nonBlockingThread.start(); 01305 logger.info(Translate.get( 01306 "loadbalancer.backend.workerthread.non.blocking.add", db.getName())); 01307 db.enableWrite(); 01308 } 01309 01310 if (!db.isInitialized()) 01311 db.initializeConnections(); 01312 db.enableRead(); 01313 } |
|
Selects the backend using a simple round-robin algorithm and executes the read request.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1_RR.java の 104 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR.executeRoundRobinRequest().
00106 { 00107 return executeRoundRobinRequest(proc, false, "Stored procedure "); 00108 } |
|
Selects the backend using a simple round-robin algorithm and executes the read request.
org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1に実装されています. RAIDb1_RR.java の 92 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR.executeRoundRobinRequest().
00094 { 00095 return executeRoundRobinRequest(request, true, "Request "); 00096 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 734 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask.getResult().
00736 { 00737 ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure( 00738 proc, true); 00739 return task.getResult(); 00740 } |
|
Execute a read request on the selected backend.
RAIDb1.java の 162 行で定義されています。 参照先 org.objectweb.cjdbc.common.log.Trace.error().
00164 { 00165 // Handle macros 00166 request.setSQL(RequestManager.handleSQLMacros(request.getSQL(), 00167 timestampResolution, false)); 00168 00169 // Ok, we have a backend, let's execute the request 00170 AbstractConnectionManager cm = backend.getConnectionManager(request 00171 .getLogin()); 00172 00173 // Sanity check 00174 if (cm == null) 00175 { 00176 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00177 new String[]{request.getLogin(), backend.getName()}); 00178 logger.error(msg); 00179 throw new SQLException(msg); 00180 } 00181 00182 // Execute the query 00183 if (request.isAutoCommit()) 00184 { 00185 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00186 // We could do something finer grain here by waiting 00187 // only for writes that depend on the tables we need 00188 // but is that really worth the overhead ? 00189 waitForAllWritesToComplete(backend); 00190 00191 ResultSet rs = null; 00192 boolean badConnection; 00193 do 00194 { 00195 badConnection = false; 00196 // Use a connection just for this request 00197 Connection c = null; 00198 try 00199 { 00200 c = cm.getConnection(); 00201 } 00202 catch (UnreachableBackendException e1) 00203 { 00204 logger.error(Translate.get( 00205 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00206 backend.disable(); 00207 throw new UnreachableBackendException(Translate.get( 00208 "loadbalancer.backend.unreacheable", backend.getName())); 00209 } 00210 00211 // Sanity check 00212 if (c == null) 00213 throw new SQLException(Translate.get( 00214 "loadbalancer.backend.no.connection", backend.getName())); 00215 00216 // Execute Query 00217 try 00218 { 00219 rs = executeStatementOnBackend(request, backend, c); 00220 cm.releaseConnection(c); 00221 } 00222 catch (SQLException e) 00223 { 00224 cm.releaseConnection(c); 00225 throw new SQLException(Translate.get( 00226 "loadbalancer.request.failed.on.backend", new String[]{ 00227 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00228 backend.getName(), e.getMessage()})); 00229 } 00230 catch (BadConnectionException e) 00231 { // Get rid of the bad connection 00232 cm.deleteConnection(c); 00233 badConnection = true; 00234 } 00235 } 00236 while (badConnection); 00237 if (logger.isDebugEnabled()) 00238 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00239 String.valueOf(request.getId()), backend.getName()})); 00240 return rs; 00241 } 00242 else 00243 { // Inside a transaction 00244 Connection c; 00245 long tid = request.getTransactionId(); 00246 Long lTid = new Long(tid); 00247 00248 // Wait for previous writes to complete 00249 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00250 waitForAllWritesToComplete(backend, request.getTransactionId()); 00251 00252 if (!backend.isStartedTransaction(lTid)) 00253 { // transaction has not been started yet on this backend 00254 try 00255 { 00256 c = cm.getConnection(tid); 00257 } 00258 catch (UnreachableBackendException e1) 00259 { 00260 logger.error(Translate.get( 00261 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00262 backend.disable(); 00263 throw new SQLException(Translate.get( 00264 "loadbalancer.backend.unreacheable", backend.getName())); 00265 } 00266 00267 // Sanity check 00268 if (c == null) 00269 throw new SQLException(Translate.get( 00270 "loadbalancer.unable.get.connection", new String[]{ 00271 String.valueOf(tid), backend.getName()})); 00272 00273 // begin transaction 00274 backend.startTransaction(lTid); 00275 c.setAutoCommit(false); 00276 } 00277 else 00278 { // Re-use the connection used by this transaction 00279 c = cm.retrieveConnection(tid); 00280 00281 // Sanity check 00282 if (c == null) 00283 throw new SQLException(Translate.get( 00284 "loadbalancer.unable.retrieve.connection", new String[]{ 00285 String.valueOf(tid), backend.getName()})); 00286 } 00287 00288 // Execute Query 00289 ResultSet rs = null; 00290 try 00291 { 00292 rs = executeStatementOnBackend(request, backend, c); 00293 } 00294 catch (SQLException e) 00295 { 00296 throw new SQLException(Translate.get( 00297 "loadbalancer.request.failed.on.backend", new String[]{ 00298 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00299 backend.getName(), e.getMessage()})); 00300 } 00301 catch (BadConnectionException e) 00302 { // Connection failed, so did the transaction 00303 // Disable the backend. 00304 cm.deleteConnection(tid); 00305 String msg = Translate.get( 00306 "loadbalancer.backend.disabling.connection.failure", backend 00307 .getName()); 00308 logger.error(msg); 00309 backend.disable(); 00310 throw new SQLException(msg); 00311 } 00312 if (logger.isDebugEnabled()) 00313 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00314 new String[]{String.valueOf(tid), String.valueOf(request.getId()), 00315 backend.getName()})); 00316 return rs; 00317 } 00318 } |
|
Common code to execute a SelectRequest or a StoredProcedure on a backend chosen using a round-robin algorithm.
RAIDb1_RR.java の 123 行で定義されています。 参照先 org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.acquireReadLockBackendLists(), org.objectweb.cjdbc.common.log.Trace.error(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getBackends(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getSQLShortFormLength(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isReadEnabled. 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR.execReadOnlyReadStoredProcedure(), と org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR.execReadRequest().
00125 { 00126 // Choose a backend 00127 try 00128 { 00129 vdb.acquireReadLockBackendLists(); 00130 } 00131 catch (InterruptedException e) 00132 { 00133 String msg = Translate.get( 00134 "loadbalancer.backendlist.acquire.readlock.failed", e); 00135 logger.error(msg); 00136 throw new SQLException(msg); 00137 } 00138 00139 DatabaseBackend backend = null; // The backend that will execute the query 00140 00141 // Note that vdb lock is released in the finally clause of this try/catch 00142 // block 00143 try 00144 { 00145 ArrayList backends = vdb.getBackends(); 00146 int size = backends.size(); 00147 00148 if (size == 0) 00149 throw new SQLException(Translate.get( 00150 "loadbalancer.execute.no.backend.available", request.getId())); 00151 00152 // Take the next backend 00153 int maxTries = size; 00154 synchronized (this) 00155 { 00156 do 00157 { 00158 index = (index + 1) % size; 00159 backend = (DatabaseBackend) backends.get(index); 00160 maxTries--; 00161 } 00162 while ((!backend.isReadEnabled() && maxTries >= 0)); 00163 } 00164 00165 if (maxTries < 0) 00166 throw new SQLException(Translate.get( 00167 "loadbalancer.execute.no.backend.enabled", request.getId())); 00168 } 00169 catch (RuntimeException e) 00170 { 00171 String msg = Translate.get("loadbalancer.execute.find.backend.failed", 00172 new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()), 00173 e.getMessage()}); 00174 logger.error(msg, e); 00175 throw new SQLException(msg); 00176 } 00177 finally 00178 { 00179 vdb.releaseReadLockBackendLists(); 00180 } 00181 00182 ResultSet rs = null; 00183 // Execute the request on the chosen backend 00184 try 00185 { 00186 if (isSelect) 00187 rs = executeRequestOnBackend((SelectRequest) request, backend); 00188 else 00189 rs = executeStoredProcedureOnBackend((StoredProcedure) request, backend); 00190 } 00191 catch (UnreachableBackendException urbe) 00192 { 00193 // Try to execute query on different backend 00194 return executeRoundRobinRequest(request, isSelect, errorMsgPrefix); 00195 } 00196 catch (SQLException se) 00197 { 00198 String msg = Translate.get("loadbalancer.something.failed", new String[]{ 00199 errorMsgPrefix, String.valueOf(request.getId()), se.getMessage()}); 00200 if (logger.isInfoEnabled()) 00201 logger.info(msg); 00202 throw new SQLException(msg); 00203 } 00204 catch (RuntimeException e) 00205 { 00206 String msg = Translate.get("loadbalancer.something.failed.on", 00207 new String[]{errorMsgPrefix, 00208 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00209 backend.getName(), e.getMessage()}); 00210 logger.error(msg, e); 00211 throw new SQLException(msg); 00212 } 00213 00214 return rs; 00215 } |
|
Execute a statement on a backend. If the execution fails, the connection is checked for validity. If the connection was not valid, the query is automatically retried on a new connection.
AbstractLoadBalancer.java の 251 行で定義されています。 参照先 java.sql.Statement.executeQuery(), java.sql.Statement.setCursorName(), java.sql.Statement.setFetchSize(), java.sql.Statement.setMaxRows(), java.sql.Statement.setQueryTimeout(), org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetCursorName, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetFetchSize, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetMaxRows, と org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetQueryTimeout. 参照元 org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDB.execReadRequest().
00254 { 00255 ResultSet rs = null; 00256 try 00257 { 00258 backend.addPendingReadRequest(request); 00259 String sql = request.getSQL(); 00260 // Rewrite the query if needed 00261 sql = backend.rewriteQuery(sql); 00262 // Execute the query 00263 Statement s = c.createStatement(); 00264 DriverCompliance driverCompliance = backend.getDriverCompliance(); 00265 if (driverCompliance.supportSetQueryTimeout()) 00266 s.setQueryTimeout(request.getTimeout()); 00267 if ((request.getCursorName() != null) 00268 && (driverCompliance.supportSetCursorName())) 00269 s.setCursorName(request.getCursorName()); 00270 if ((request.getFetchSize() != 0) 00271 && driverCompliance.supportSetFetchSize()) 00272 s.setFetchSize(request.getFetchSize()); 00273 if ((request.getMaxRows() > 0) && driverCompliance.supportSetMaxRows()) 00274 s.setMaxRows(request.getMaxRows()); 00275 rs = s.executeQuery(sql); 00276 } 00277 catch (SQLException e) 00278 { // Something bad happened 00279 if (backend.isValidConnection(c)) 00280 throw e; // Connection is valid, throw the exception 00281 else 00282 throw new BadConnectionException(); 00283 } 00284 finally 00285 { 00286 backend.removePendingRequest(request); 00287 } 00288 return rs; 00289 } |
|
Execute a stored procedure on the selected backend.
RAIDb1.java の 576 行で定義されています。
00579 { 00580 // Handle macros 00581 proc.setSQL(RequestManager.handleSQLMacros(proc.getSQL(), 00582 timestampResolution, false)); 00583 00584 // Ok, we have a backend, let's execute the request 00585 AbstractConnectionManager cm = backend 00586 .getConnectionManager(proc.getLogin()); 00587 00588 // Sanity check 00589 if (cm == null) 00590 { 00591 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00592 new String[]{proc.getLogin(), backend.getName()}); 00593 logger.error(msg); 00594 throw new SQLException(msg); 00595 } 00596 00597 // Execute the query 00598 if (proc.isAutoCommit()) 00599 { 00600 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00601 // We could do something finer grain here by waiting 00602 // only for writes that depend on the tables we need 00603 // but is that really worth the overhead ? 00604 waitForAllWritesToComplete(backend); 00605 00606 // Use a connection just for this request 00607 Connection c = null; 00608 try 00609 { 00610 c = cm.getConnection(); 00611 } 00612 catch (UnreachableBackendException e1) 00613 { 00614 logger.error(Translate.get( 00615 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00616 backend.disable(); 00617 throw new UnreachableBackendException(Translate.get( 00618 "loadbalancer.backend.unreacheable", backend.getName())); 00619 } 00620 00621 // Sanity check 00622 if (c == null) 00623 throw new UnreachableBackendException(Translate.get( 00624 "loadbalancer.backend.no.connection", backend.getName())); 00625 00626 // Execute Query 00627 ResultSet rs = null; 00628 try 00629 { 00630 // We suppose here that the request does not modify the schema since 00631 // it is a read-only query. 00632 CallableStatement cs = c.prepareCall(proc.getSQL()); 00633 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00634 cs.setQueryTimeout(proc.getTimeout()); 00635 if ((proc.getMaxRows() > 0) 00636 && backend.getDriverCompliance().supportSetMaxRows()) 00637 cs.setMaxRows(proc.getMaxRows()); 00638 rs = cs.executeQuery(); 00639 } 00640 catch (SQLException e) 00641 { 00642 throw new SQLException(Translate.get( 00643 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00644 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00645 backend.getName(), e.getMessage()})); 00646 } 00647 finally 00648 { 00649 cm.releaseConnection(c); 00650 } 00651 if (logger.isDebugEnabled()) 00652 logger.debug(Translate.get("loadbalancer.storedprocedure.on", 00653 new String[]{String.valueOf(proc.getId()), backend.getName()})); 00654 return rs; 00655 } 00656 else 00657 { // Inside a transaction 00658 Connection c; 00659 long tid = proc.getTransactionId(); 00660 Long lTid = new Long(tid); 00661 00662 // Wait for previous writes to complete 00663 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 00664 waitForAllWritesToComplete(backend, proc.getTransactionId()); 00665 00666 if (!backend.isStartedTransaction(lTid)) 00667 { // transaction has not been started yet on this backend 00668 try 00669 { 00670 c = cm.getConnection(tid); 00671 } 00672 catch (UnreachableBackendException e1) 00673 { 00674 logger.error(Translate.get( 00675 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00676 backend.disable(); 00677 throw new SQLException(Translate.get( 00678 "loadbalancer.backend.unreacheable", backend.getName())); 00679 } 00680 00681 // Sanity check 00682 if (c == null) 00683 throw new SQLException(Translate.get( 00684 "loadbalancer.unable.get.connection", new String[]{ 00685 String.valueOf(tid), backend.getName()})); 00686 00687 // begin transaction 00688 backend.startTransaction(lTid); 00689 c.setAutoCommit(false); 00690 } 00691 else 00692 { // Re-use the connection used by this transaction 00693 c = cm.retrieveConnection(tid); 00694 00695 // Sanity check 00696 if (c == null) 00697 throw new SQLException(Translate.get( 00698 "loadbalancer.unable.retrieve.connection", new String[]{ 00699 String.valueOf(tid), backend.getName()})); 00700 } 00701 00702 // Execute Query 00703 ResultSet rs; 00704 try 00705 { 00706 // We suppose here that the request does not modify the schema since 00707 // it is a read-only query. 00708 CallableStatement cs = c.prepareCall(proc.getSQL()); 00709 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00710 cs.setQueryTimeout(proc.getTimeout()); 00711 if ((proc.getMaxRows() > 0) 00712 && backend.getDriverCompliance().supportSetMaxRows()) 00713 cs.setMaxRows(proc.getMaxRows()); 00714 rs = cs.executeQuery(); 00715 } 00716 catch (SQLException e) 00717 { 00718 throw new SQLException(Translate.get( 00719 "loadbalancer.storedprocedure.failed.on.backend", new String[]{ 00720 proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00721 backend.getName(), e.getMessage()})); 00722 } 00723 if (logger.isDebugEnabled()) 00724 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00725 new String[]{String.valueOf(tid), String.valueOf(proc.getId()), 00726 backend.getName()})); 00727 return rs; 00728 } 00729 } |
|
Performs a write request. This request is broadcasted to all nodes.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 329 行で定義されています。
00331 { 00332 if (!request.isParsed() && (request.isCreate() || request.isDrop())) 00333 request.parse(null, ParsingGranularities.TABLE, true); 00334 return ((WriteRequestTask) execWriteRequest(request, false)).getResult(); 00335 } |
|
Perform a write request and return the auto generated keys.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 346 行で定義されています。
00348 { 00349 if (!request.isParsed() && (request.isCreate() || request.isDrop())) 00350 request.parse(null, ParsingGranularities.TABLE, true); 00351 return ((WriteRequestWithKeysTask) execWriteRequest(request, false)) 00352 .getResult(); 00353 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 745 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask.getResult().
00746 { 00747 WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure( 00748 proc, false); 00749 return task.getResult(); 00750 } |
|
Gets information about the request load balancer.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1_RR.java の 226 行で定義されています。
00227 { 00228 // We don't lock since we don't need a top accurate value 00229 int size = vdb.getBackends().size(); 00230 00231 if (size == 0) 00232 return "RAIDb-1 Round-Robin Request load balancer: !!!Warning!!! No backend nodes found\n"; 00233 else 00234 return "RAIDb-1 Round-Robin Request load balancer (" + size 00235 + " backends)\n"; 00236 } |
|
Get the needed query parsing granularity.
AbstractLoadBalancer.java の 151 行で定義されています。 参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.setLoadBalancer().
00152 { 00153 return parsingGranularity; 00154 } |
|
org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1に実装されています. RAIDb1_RR.java の 241 行で定義されています。
00242 { 00243 return "<" + DatabasesXmlTags.ELT_RAIDb_1_RoundRobin + "/>"; 00244 } |
|
Returns the RAIDbLevel.
AbstractLoadBalancer.java の 131 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.raidbLevel.
00132 { 00133 return raidbLevel; 00134 } |
|
AbstractLoadBalancer.java の 385 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getXmlImpl().
00386 { 00387 StringBuffer info = new StringBuffer(); 00388 info.append("<" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00389 info.append(getXmlImpl()); 00390 info.append("</" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00391 return info.toString(); 00392 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ecで再定義されています。 RAIDb1.java の 1422 行で定義されています。
01423 { 01424 StringBuffer info = new StringBuffer(); 01425 info.append("<" + DatabasesXmlTags.ELT_RAIDb_1 + " " 01426 + DatabasesXmlTags.ATT_timestampResolution + "=\"" 01427 + timestampResolution + "\" >"); 01428 if (waitForCompletionPolicy != null) 01429 info.append(waitForCompletionPolicy.getXml()); 01430 info.append(getRaidb1Xml()); 01431 info.append("</" + DatabasesXmlTags.ELT_RAIDb_1 + ">"); 01432 return info.toString(); 01433 } |
|
Rollbacks a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb1.java の 1036 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
01037 { 01038 if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL) 01039 waitForAllWritesToComplete(tm.getTransactionId()); 01040 01041 try 01042 { 01043 backendNonBlockingThreadsRWLock.acquireRead(); 01044 } 01045 catch (InterruptedException e) 01046 { 01047 String msg = Translate.get( 01048 "loadbalancer.backendlist.acquire.readlock.failed", e); 01049 logger.error(msg); 01050 throw new SQLException(msg); 01051 } 01052 int nbOfThreads = backendNonBlockingThreads.size(); 01053 ArrayList rollbackList = new ArrayList(); 01054 Long lTid = new Long(tm.getTransactionId()); 01055 01056 // Build the list of backend that need to rollback this transaction 01057 for (int i = 0; i < nbOfThreads; i++) 01058 { 01059 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads 01060 .get(i); 01061 if (thread.getBackend().isStartedTransaction(lTid)) 01062 rollbackList.add(thread); 01063 } 01064 01065 nbOfThreads = rollbackList.size(); 01066 if (nbOfThreads == 0) 01067 { 01068 backendNonBlockingThreadsRWLock.releaseRead(); 01069 return; 01070 } 01071 01072 // Create the task 01073 RollbackTask task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, 01074 tm.getTimeout(), tm.getLogin(), tm.getTransactionId()); 01075 01076 synchronized (task) 01077 { 01078 // Post the task in each backendThread tasklist and wakeup the threads 01079 for (int i = 0; i < nbOfThreads; i++) 01080 { 01081 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 01082 synchronized (thread) 01083 { 01084 thread.addTask(task); 01085 thread.notify(); 01086 } 01087 } 01088 01089 backendNonBlockingThreadsRWLock.releaseRead(); 01090 01091 // Wait for completion (notified by the task) 01092 try 01093 { 01094 // Wait on task 01095 long timeout = tm.getTimeout(); 01096 if (timeout > 0) 01097 { 01098 long start = System.currentTimeMillis(); 01099 task.wait(timeout); 01100 long end = System.currentTimeMillis(); 01101 long remaining = timeout - (end - start); 01102 if (remaining <= 0) 01103 { 01104 String msg = Translate.get("loadbalancer.rollback.timeout", 01105 new String[]{String.valueOf(tm.getTransactionId()), 01106 String.valueOf(task.getSuccess()), 01107 String.valueOf(task.getFailed())}); 01108 logger.warn(msg); 01109 throw new SQLException(msg); 01110 } 01111 } 01112 else 01113 task.wait(); 01114 } 01115 catch (InterruptedException e) 01116 { 01117 throw new SQLException(Translate.get("loadbalancer.rollback.timeout", 01118 new String[]{String.valueOf(tm.getTransactionId()), 01119 String.valueOf(task.getSuccess()), 01120 String.valueOf(task.getFailed())})); 01121 } 01122 01123 if (task.getSuccess() > 0) 01124 return; 01125 else 01126 { // All tasks failed 01127 ArrayList exceptions = task.getExceptions(); 01128 if (exceptions == null) 01129 throw new SQLException(Translate.get( 01130 "loadbalancer.rollback.all.failed", tm.getTransactionId())); 01131 else 01132 { 01133 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 01134 tm.getTransactionId()) 01135 + "\n"; 01136 for (int i = 0; i < exceptions.size(); i++) 01137 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01138 logger.error(errorMsg); 01139 throw new SQLException(errorMsg); 01140 } 01141 } 01142 } 01143 } |
|
Set the needed query parsing granularity.
AbstractLoadBalancer.java の 161 行で定義されています。
00162 {
00163 this.parsingGranularity = parsingGranularity;
00164 }
|
|
Sets the RAIDbLevel.
AbstractLoadBalancer.java の 141 行で定義されています。
00142 {
00143 this.raidbLevel = raidbLevel;
00144 }
|
|
Associate a weight to a backend identified by its logical name.
org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRR, と org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDBで再定義されています。 AbstractLoadBalancer.java の 359 行で定義されています。
00360 { 00361 throw new SQLException("Weight is not supported by this load balancer"); 00362 } |
|
Waits for all writes in the blocking thread queue of the given backend to complete.
RAIDb1.java の 1217 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01219 { 01220 try 01221 { 01222 backendBlockingThreadsRWLock.acquireRead(); 01223 } 01224 catch (InterruptedException e) 01225 { 01226 String msg = Translate.get( 01227 "loadbalancer.backendlist.acquire.readlock.failed", e); 01228 logger.error(msg); 01229 throw new SQLException(msg); 01230 } 01231 01232 int nbOfThreads = backendBlockingThreads.size(); 01233 01234 for (int i = 0; i < nbOfThreads; i++) 01235 { 01236 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01237 .get(i); 01238 if (thread.getBackend() == backend) 01239 thread.waitForAllTasksToComplete(); 01240 } 01241 01242 backendBlockingThreadsRWLock.releaseRead(); 01243 } |
|
Waits for all writes of the given transaction in the blocking thread queue of the given backend to complete before being able to complete the transaction.
RAIDb1.java の 1183 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01185 { 01186 try 01187 { 01188 backendBlockingThreadsRWLock.acquireRead(); 01189 } 01190 catch (InterruptedException e) 01191 { 01192 String msg = Translate.get( 01193 "loadbalancer.backendlist.acquire.readlock.failed", e); 01194 logger.error(msg); 01195 throw new SQLException(msg); 01196 } 01197 01198 int nbOfThreads = backendBlockingThreads.size(); 01199 01200 for (int i = 0; i < nbOfThreads; i++) 01201 { 01202 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01203 .get(i); 01204 if (thread.getBackend() == backend) 01205 thread.waitForAllTasksToComplete(transactionId); 01206 } 01207 01208 backendBlockingThreadsRWLock.releaseRead(); 01209 } |
|
Waits for all writes of the given transaction in the blocking thread queue to complete before being able to complete the transaction. RAIDb1.java の 1149 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().
01151 { 01152 try 01153 { 01154 backendBlockingThreadsRWLock.acquireRead(); 01155 } 01156 catch (InterruptedException e) 01157 { 01158 String msg = Translate.get( 01159 "loadbalancer.backendlist.acquire.readlock.failed", e); 01160 logger.error(msg); 01161 throw new SQLException(msg); 01162 } 01163 01164 int nbOfThreads = backendBlockingThreads.size(); 01165 01166 for (int i = 0; i < nbOfThreads; i++) 01167 { 01168 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads 01169 .get(i); 01170 thread.waitForAllTasksToComplete(transactionId); 01171 } 01172 01173 backendBlockingThreadsRWLock.releaseRead(); 01174 } |
|
RAIDb1.java の 79 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.RAIDb1(). |
|
RAIDb1.java の 81 行で定義されています。 |
|
RAIDb1.java の 80 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.RAIDb1(). |
|
RAIDb1.java の 82 行で定義されています。 |
|
RAIDb1_RR.java の 58 行で定義されています。 |
|
初期値: Trace
.getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb1")
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerを再定義しています。 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ecで再定義されています。 RAIDb1.java の 87 行で定義されています。 |
|
AbstractLoadBalancer.java の 74 行で定義されています。 |
|
|
AbstractLoadBalancer.java の 72 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.AbstractLoadBalancer(). |
|
RAIDb1.java の 84 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.getNbToWait(), と org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.RAIDb1(). |