The requests are sent to the backend nodes hosting the tables needed to execute the request. If no backend has the needed tables to perform a request, it will fail.
RAIDb0.java の 69 行で定義されています。
|
Creates a new RAIDb-0 request load balancer.
RAIDb0.java の 96 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0.backendThreads, と org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0.createTablePolicy.
00098 { 00099 super(vdb, RAIDbLevels.RAIDb0, ParsingGranularities.TABLE); 00100 backendThreads = new ArrayList(); 00101 this.createTablePolicy = createTablePolicy; 00102 } |
|
Begins a new transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb0.java の 856 行で定義されています。
00857 { 00858 } |
|
Commits a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb0.java の 866 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
00867 { 00868 try 00869 { 00870 backendThreadsRWLock.acquireRead(); 00871 } 00872 catch (InterruptedException e) 00873 { 00874 String msg = Translate.get( 00875 "loadbalancer.backendlist.acquire.readlock.failed", e); 00876 logger.error(msg); 00877 throw new SQLException(msg); 00878 } 00879 00880 int nbOfThreads = backendThreads.size(); 00881 ArrayList commitList = new ArrayList(); 00882 Long lTid = new Long(tm.getTransactionId()); 00883 00884 // Build the list of backend that need to commit this transaction 00885 for (int i = 0; i < nbOfThreads; i++) 00886 { 00887 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i); 00888 if (thread.getBackend().isStartedTransaction(lTid)) 00889 commitList.add(thread); 00890 } 00891 00892 nbOfThreads = commitList.size(); 00893 // Create the task 00894 CommitTask task = new CommitTask(nbOfThreads, // Wait for all to commit 00895 nbOfThreads, tm.getTimeout(), tm.getLogin(), tm.getTransactionId()); 00896 00897 synchronized (task) 00898 { 00899 // Post the task in each backendThread tasklist and wakeup the threads 00900 for (int i = 0; i < nbOfThreads; i++) 00901 { 00902 BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i); 00903 synchronized (thread) 00904 { 00905 thread.addTask(task); 00906 thread.notify(); 00907 } 00908 } 00909 00910 backendThreadsRWLock.releaseRead(); 00911 00912 // Wait for completion (notified by the task) 00913 try 00914 { 00915 // Wait on task 00916 long timeout = tm.getTimeout(); 00917 if (timeout > 0) 00918 { 00919 long start = System.currentTimeMillis(); 00920 task.wait(timeout); 00921 long end = System.currentTimeMillis(); 00922 long remaining = timeout - (end - start); 00923 if (remaining <= 0) 00924 { 00925 String msg = Translate.get("loadbalancer.commit.timeout", 00926 new String[]{String.valueOf(tm.getTransactionId()), 00927 String.valueOf(task.getSuccess()), 00928 String.valueOf(task.getFailed())}); 00929 logger.warn(msg); 00930 throw new SQLException(msg); 00931 } 00932 } 00933 else 00934 task.wait(); 00935 } 00936 catch (InterruptedException e) 00937 { 00938 throw new SQLException(Translate.get("loadbalancer.commit.timeout", 00939 new String[]{String.valueOf(tm.getTransactionId()), 00940 String.valueOf(task.getSuccess()), 00941 String.valueOf(task.getFailed())})); 00942 } 00943 00944 if (task.getSuccess() > 0) 00945 return; 00946 else 00947 { // All tasks failed 00948 ArrayList exceptions = task.getExceptions(); 00949 if (exceptions == null) 00950 throw new SQLException(Translate.get( 00951 "loadbalancer.commit.all.failed", tm.getTransactionId())); 00952 else 00953 { 00954 String errorMsg = Translate.get("loadbalancer.commit.failed.stack", 00955 tm.getTransactionId()) 00956 + "\n"; 00957 for (int i = 0; i < exceptions.size(); i++) 00958 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 00959 logger.error(errorMsg); 00960 throw new SQLException(errorMsg); 00961 } 00962 } 00963 } 00964 } |
|
Disables a backend that was previously enabled. Ask the corresponding connection manager to finalize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb0.java の 1130 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.equals(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend().
01132 { 01133 int nbOfThreads = backendThreads.size(); 01134 01135 // Find the right thread 01136 for (int i = 0; i < nbOfThreads; i++) 01137 { 01138 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i); 01139 if (thread.getBackend().equals(db)) 01140 { 01141 logger.info(Translate.get("loadbalancer.backend.workerthread.remove", 01142 db.getName())); 01143 01144 // Remove it from the backendThread list 01145 try 01146 { 01147 backendThreadsRWLock.acquireWrite(); 01148 } 01149 catch (InterruptedException e) 01150 { 01151 String msg = Translate.get( 01152 "loadbalancer.backendlist.acquire.writelock.failed", e); 01153 logger.error(msg); 01154 throw new SQLException(msg); 01155 } 01156 backendThreads.remove(thread); 01157 backendThreadsRWLock.releaseWrite(); 01158 01159 synchronized (thread) 01160 { 01161 // Kill the thread 01162 thread.addPriorityTask(new KillThreadTask(1, 1)); 01163 thread.notify(); 01164 } 01165 break; 01166 } 01167 } 01168 01169 db.disable(); 01170 if (db.isInitialized()) 01171 db.finalizeConnections(); 01172 } |
|
Enables a Backend that was previously disabled. Ask the corresponding connection manager to initialize the connections if needed. No sanity checks are performed by this function.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb0.java の 1089 行で定義されています。 参照先 org.objectweb.cjdbc.controller.backend.DatabaseBackend.enableRead(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.enableWrite(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.getName(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.initializeConnections(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isInitialized().
01091 { 01092 // Create a worker thread and add it to the list 01093 BackendWorkerThread thread = new BackendWorkerThread((DatabaseBackend) db, 01094 this); 01095 try 01096 { 01097 backendThreadsRWLock.acquireWrite(); 01098 } 01099 catch (InterruptedException e) 01100 { 01101 String msg = Translate.get( 01102 "loadbalancer.backendlist.acquire.writelock.failed", e); 01103 logger.error(msg); 01104 throw new SQLException(msg); 01105 } 01106 backendThreads.add(thread); 01107 backendThreadsRWLock.releaseWrite(); 01108 thread.start(); 01109 logger.info(Translate.get("loadbalancer.backend.workerthread.add", db 01110 .getName())); 01111 01112 if (!db.isInitialized()) 01113 db.initializeConnections(); 01114 db.enableRead(); 01115 if (writeEnabled) 01116 db.enableWrite(); 01117 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb0.java の 820 行で定義されています。
00822 { 00823 throw new SQLException( 00824 "Stored procedure calls are not supported with RAIDb-0 load balancers."); 00825 } |
|
Performs a read request on the backend that has the needed tables to executes the request.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb0.java の 116 行で定義されています。 参照先 org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.acquireReadLockBackendLists(), org.objectweb.cjdbc.common.log.Trace.error(), org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0.executeRequestOnBackend(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getBackends(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getSQLShortFormLength(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.hasTables(), org.objectweb.cjdbc.common.log.Trace.info(), org.objectweb.cjdbc.common.log.Trace.isInfoEnabled(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isReadEnabled.
00117 { 00118 try 00119 { 00120 vdb.acquireReadLockBackendLists(); // Acquire read lock 00121 } 00122 catch (InterruptedException e) 00123 { 00124 String msg = Translate.get( 00125 "loadbalancer.backendlist.acquire.readlock.failed", e); 00126 logger.error(msg); 00127 throw new SQLException(msg); 00128 } 00129 00130 try 00131 { 00132 ResultSet rs = null; 00133 ArrayList fromTables = request.getFrom(); 00134 AbstractConnectionManager cm = null; 00135 00136 if (fromTables == null) 00137 throw new SQLException(Translate.get("loadbalancer.from.not.found", 00138 request.getSQLShortForm(vdb.getSQLShortFormLength()))); 00139 00140 // Find the backend that has the needed tables 00141 ArrayList backends = vdb.getBackends(); 00142 int size = backends.size(); 00143 00144 DatabaseBackend backend = null; 00145 // The backend that will execute the query 00146 for (int i = 0; i < size; i++) 00147 { 00148 backend = (DatabaseBackend) backends.get(i); 00149 if (backend.isReadEnabled() && backend.hasTables(fromTables)) 00150 break; 00151 } 00152 00153 // Execute the request on the chosen backend 00154 try 00155 { 00156 rs = executeRequestOnBackend(request, backend); 00157 } 00158 catch (SQLException se) 00159 { 00160 String msg = Translate.get("loadbalancer.request.failed", new String[]{ 00161 String.valueOf(request.getId()), se.getMessage()}); 00162 if (logger.isInfoEnabled()) 00163 logger.info(msg); 00164 throw new SQLException(msg); 00165 } 00166 00167 return rs; 00168 } 00169 catch (RuntimeException e) 00170 { 00171 String msg = Translate 00172 .get("loadbalancer.request.failed", new String[]{ 00173 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00174 e.getMessage()}); 00175 logger.fatal(msg, e); 00176 throw new SQLException(msg); 00177 } 00178 finally 00179 { 00180 vdb.releaseReadLockBackendLists(); // Release the lock 00181 } 00182 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb0.java の 830 行で定義されています。
00832 { 00833 throw new SQLException( 00834 "Stored procedure calls are not supported with RAIDb-0 load balancers."); 00835 } |
|
Execute a read request on the selected backend.
RAIDb0.java の 677 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0.execReadRequest().
00679 { 00680 // Ok, we have a backend, let's execute the request 00681 AbstractConnectionManager cm = backend.getConnectionManager(request 00682 .getLogin()); 00683 00684 // Sanity check 00685 if (cm == null) 00686 { 00687 String msg = Translate.get("loadbalancer.connectionmanager.not.found", 00688 new String[]{request.getLogin(), backend.getName()}); 00689 logger.error(msg); 00690 throw new SQLException(msg); 00691 } 00692 00693 // Execute the query 00694 if (request.isAutoCommit()) 00695 { 00696 ResultSet rs = null; 00697 boolean badConnection; 00698 do 00699 { 00700 badConnection = false; 00701 // Use a connection just for this request 00702 Connection c = null; 00703 try 00704 { 00705 c = cm.getConnection(); 00706 } 00707 catch (UnreachableBackendException e1) 00708 { 00709 logger.error(Translate.get( 00710 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00711 backend.disable(); 00712 throw new SQLException(Translate.get( 00713 "loadbalancer.backend.unreacheable", backend.getName())); 00714 } 00715 00716 // Sanity check 00717 if (c == null) 00718 throw new SQLException(Translate.get( 00719 "loadbalancer.backend.no.connection", backend.getName())); 00720 00721 // Execute Query 00722 try 00723 { 00724 rs = executeStatementOnBackend(request, backend, c); 00725 cm.releaseConnection(c); 00726 } 00727 catch (SQLException e) 00728 { 00729 cm.releaseConnection(c); 00730 throw new SQLException(Translate.get( 00731 "loadbalancer.request.failed.on.backend", new String[]{ 00732 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00733 backend.getName(), e.getMessage()})); 00734 } 00735 catch (BadConnectionException e) 00736 { // Get rid of the bad connection 00737 cm.deleteConnection(c); 00738 badConnection = true; 00739 } 00740 } 00741 while (badConnection); 00742 if (logger.isDebugEnabled()) 00743 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00744 String.valueOf(request.getId()), backend.getName()})); 00745 return rs; 00746 } 00747 else 00748 { // Inside a transaction 00749 Connection c; 00750 long tid = request.getTransactionId(); 00751 Long lTid = new Long(tid); 00752 00753 if (!backend.isStartedTransaction(lTid)) 00754 { // transaction has not been started yet on this backend 00755 try 00756 { 00757 c = cm.getConnection(tid); 00758 } 00759 catch (UnreachableBackendException e1) 00760 { 00761 logger.error(Translate.get( 00762 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00763 backend.disable(); 00764 throw new SQLException(Translate.get( 00765 "loadbalancer.backend.unreacheable", backend.getName())); 00766 } 00767 00768 // Sanity check 00769 if (c == null) 00770 throw new SQLException(Translate.get( 00771 "loadbalancer.unable.get.connection", new String[]{ 00772 String.valueOf(tid), backend.getName()})); 00773 00774 // begin transaction 00775 backend.startTransaction(lTid); 00776 c.setAutoCommit(false); 00777 } 00778 else 00779 { // Re-use the connection used by this transaction 00780 c = cm.retrieveConnection(tid); 00781 00782 // Sanity check 00783 if (c == null) 00784 throw new SQLException(Translate.get( 00785 "loadbalancer.unable.retrieve.connection", new String[]{ 00786 String.valueOf(tid), backend.getName()})); 00787 } 00788 00789 // Execute Query 00790 ResultSet rs = null; 00791 try 00792 { 00793 rs = executeStatementOnBackend(request, backend, c); 00794 } 00795 catch (SQLException e) 00796 { 00797 throw new SQLException(Translate.get( 00798 "loadbalancer.request.failed.on.backend", new String[]{ 00799 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00800 backend.getName(), e.getMessage()})); 00801 } 00802 catch (BadConnectionException e) 00803 { // Get rid of the bad connection 00804 cm.deleteConnection(tid); 00805 throw new SQLException(Translate 00806 .get("loadbalancer.connection.failed", new String[]{ 00807 String.valueOf(tid), backend.getName(), e.getMessage()})); 00808 } 00809 if (logger.isDebugEnabled()) 00810 logger.debug(Translate.get("loadbalancer.execute.transaction.on", 00811 new String[]{String.valueOf(tid), String.valueOf(request.getId()), 00812 backend.getName()})); 00813 return rs; 00814 } 00815 } |
|
Execute a statement on a backend. If the execution fails, the connection is checked for validity. If the connection was not valid, the query is automatically retried on a new connection.
AbstractLoadBalancer.java の 251 行で定義されています。 参照先 java.sql.Statement.executeQuery(), java.sql.Statement.setCursorName(), java.sql.Statement.setFetchSize(), java.sql.Statement.setMaxRows(), java.sql.Statement.setQueryTimeout(), org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetCursorName, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetFetchSize, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetMaxRows, と org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetQueryTimeout. 参照元 org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDB.execReadRequest().
00254 { 00255 ResultSet rs = null; 00256 try 00257 { 00258 backend.addPendingReadRequest(request); 00259 String sql = request.getSQL(); 00260 // Rewrite the query if needed 00261 sql = backend.rewriteQuery(sql); 00262 // Execute the query 00263 Statement s = c.createStatement(); 00264 DriverCompliance driverCompliance = backend.getDriverCompliance(); 00265 if (driverCompliance.supportSetQueryTimeout()) 00266 s.setQueryTimeout(request.getTimeout()); 00267 if ((request.getCursorName() != null) 00268 && (driverCompliance.supportSetCursorName())) 00269 s.setCursorName(request.getCursorName()); 00270 if ((request.getFetchSize() != 0) 00271 && driverCompliance.supportSetFetchSize()) 00272 s.setFetchSize(request.getFetchSize()); 00273 if ((request.getMaxRows() > 0) && driverCompliance.supportSetMaxRows()) 00274 s.setMaxRows(request.getMaxRows()); 00275 rs = s.executeQuery(sql); 00276 } 00277 catch (SQLException e) 00278 { // Something bad happened 00279 if (backend.isValidConnection(c)) 00280 throw e; // Connection is valid, throw the exception 00281 else 00282 throw new BadConnectionException(); 00283 } 00284 finally 00285 { 00286 backend.removePendingRequest(request); 00287 } 00288 return rs; 00289 } |
|
Performs a write request on the backend that has the needed tables to executes the request.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb0.java の 192 行で定義されています。 参照先 org.objectweb.cjdbc.controller.backend.DatabaseBackend.addPendingWriteRequest(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.disable(), java.sql.Statement.executeUpdate(), org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule.getBackends(), org.objectweb.cjdbc.controller.connection.AbstractConnectionManager.getConnection(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.getConnectionManager(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.getDriverCompliance(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.getName(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.hasTable(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.isWriteEnabled, org.objectweb.cjdbc.controller.connection.AbstractConnectionManager.releaseConnection(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.removePendingRequest(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.rewriteQuery(), java.sql.Statement.setQueryTimeout(), と org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetQueryTimeout.
00193 { 00194 try 00195 { 00196 vdb.acquireReadLockBackendLists(); // Acquire read lock 00197 } 00198 catch (InterruptedException e) 00199 { 00200 String msg = Translate.get( 00201 "loadbalancer.backendlist.acquire.readlock.failed", e); 00202 logger.error(msg); 00203 throw new SQLException(msg); 00204 } 00205 00206 try 00207 { 00208 String table = request.getTableName(); 00209 AbstractConnectionManager cm = null; 00210 00211 if (table == null) 00212 throw new SQLException(Translate.get( 00213 "loadbalancer.request.target.table.not.found", request 00214 .getSQLShortForm(vdb.getSQLShortFormLength()))); 00215 00216 // Find the backend that has the needed table 00217 ArrayList backends = vdb.getBackends(); 00218 int size = backends.size(); 00219 00220 DatabaseBackend backend = null; 00221 // The backend that will execute the query 00222 if (request.isCreate()) 00223 { // Choose the backend according to the defined policy 00224 CreateTableRule rule = createTablePolicy.getTableRule(request 00225 .getTableName()); 00226 if (rule == null) 00227 rule = createTablePolicy.getDefaultRule(); 00228 00229 // Ask the rule to pickup a backend 00230 ArrayList choosen; 00231 try 00232 { 00233 choosen = rule.getBackends(backends); 00234 } 00235 catch (CreateTableException e) 00236 { 00237 throw new SQLException(Translate.get( 00238 "loadbalancer.create.table.rule.failed", e.getMessage())); 00239 } 00240 00241 // Get the connection manager from the chosen backend 00242 if (choosen != null) 00243 backend = (DatabaseBackend) choosen.get(0); 00244 if (backend != null) 00245 cm = backend.getConnectionManager(request.getLogin()); 00246 } 00247 else 00248 { // Find the backend that has the table 00249 for (int i = 0; i < size; i++) 00250 { 00251 backend = (DatabaseBackend) backends.get(i); 00252 if (backend.isWriteEnabled() && backend.hasTable(table)) 00253 { 00254 cm = backend.getConnectionManager(request.getLogin()); 00255 break; 00256 } 00257 } 00258 } 00259 00260 // Sanity check 00261 if (cm == null) 00262 throw new SQLException(Translate.get( 00263 "loadbalancer.backend.no.required.table", table)); 00264 00265 // Ok, let's execute the query 00266 00267 if (request.isAutoCommit()) 00268 { // Use a connection just for this request 00269 Connection c = null; 00270 try 00271 { 00272 c = cm.getConnection(); 00273 } 00274 catch (UnreachableBackendException e1) 00275 { 00276 logger.error(Translate.get( 00277 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00278 backend.disable(); 00279 throw new SQLException(Translate.get( 00280 "loadbalancer.backend.unreacheable", backend.getName())); 00281 } 00282 00283 // Sanity check 00284 if (c == null) 00285 throw new SQLException(Translate.get( 00286 "loadbalancer.backend.no.connection", backend.getName())); 00287 00288 int result; 00289 try 00290 { 00291 backend.addPendingWriteRequest(request); 00292 Statement s = c.createStatement(); 00293 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00294 s.setQueryTimeout(request.getTimeout()); 00295 String sql = request.getSQL(); 00296 // Rewrite the query if needed 00297 sql = backend.rewriteQuery(sql); 00298 result = s.executeUpdate(sql); 00299 updateSchema(backend, request); 00300 } 00301 catch (SQLException e) 00302 { 00303 throw new SQLException(Translate.get("loadbalancer.request.failed", 00304 new String[]{ 00305 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00306 e.getMessage()})); 00307 } 00308 finally 00309 { 00310 backend.removePendingRequest(request); 00311 cm.releaseConnection(c); 00312 } 00313 if (logger.isDebugEnabled()) 00314 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00315 String.valueOf(request.getId()), backend.getName()})); 00316 return result; 00317 } 00318 else 00319 { // Inside a transaction 00320 Connection c; 00321 long tid = request.getTransactionId(); 00322 Long lTid = new Long(tid); 00323 00324 if (!backend.isStartedTransaction(lTid)) 00325 { // transaction has not been started yet on this backend 00326 try 00327 { 00328 c = cm.getConnection(tid); 00329 } 00330 catch (UnreachableBackendException e1) 00331 { 00332 logger.error(Translate 00333 .get("loadbalancer.backend.disabling.unreachable", backend 00334 .getName())); 00335 backend.disable(); 00336 throw new SQLException(Translate.get( 00337 "loadbalancer.backend.unreacheable", backend.getName())); 00338 } 00339 00340 // Sanity check 00341 if (c == null) 00342 throw new SQLException(Translate.get( 00343 "loadbalancer.unable.get.connection", new String[]{ 00344 String.valueOf(tid), backend.getName()})); 00345 00346 // begin transaction 00347 backend.startTransaction(lTid); 00348 c.setAutoCommit(false); 00349 } 00350 else 00351 { // Re-use the connection used by this transaction 00352 c = cm.retrieveConnection(tid); 00353 00354 // Sanity check 00355 if (c == null) 00356 throw new SQLException(Translate.get( 00357 "loadbalancer.unable.retrieve.connection", new String[]{ 00358 String.valueOf(tid), backend.getName()})); 00359 } 00360 00361 // Execute the query 00362 int result; 00363 try 00364 { 00365 backend.addPendingWriteRequest(request); 00366 Statement s = c.createStatement(); 00367 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00368 s.setQueryTimeout(request.getTimeout()); 00369 String sql = request.getSQL(); 00370 // Rewrite the query if needed 00371 sql = backend.rewriteQuery(sql); 00372 result = s.executeUpdate(sql); 00373 updateSchema(backend, request); 00374 } 00375 catch (SQLException e) 00376 { 00377 throw new SQLException(Translate.get("loadbalancer.request.failed", 00378 new String[]{ 00379 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00380 e.getMessage()})); 00381 } 00382 finally 00383 { 00384 backend.removePendingRequest(request); 00385 } 00386 if (logger.isDebugEnabled()) 00387 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00388 String.valueOf(request.getId()), backend.getName()})); 00389 return result; 00390 } 00391 } 00392 catch (RuntimeException e) 00393 { 00394 String msg = Translate 00395 .get("loadbalancer.request.failed", new String[]{ 00396 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00397 e.getMessage()}); 00398 logger.fatal(msg, e); 00399 throw new SQLException(msg); 00400 } 00401 finally 00402 { 00403 vdb.releaseReadLockBackendLists(); // Release the lock 00404 } 00405 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb0.java の 435 行で定義されています。 参照先 org.objectweb.cjdbc.controller.backend.DatabaseBackend.addPendingWriteRequest(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.disable(), java.sql.Statement.executeUpdate(), org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule.getBackends(), org.objectweb.cjdbc.controller.connection.AbstractConnectionManager.getConnection(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.getConnectionManager(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.getDriverCompliance(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.getName(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.hasTable(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.isWriteEnabled, org.objectweb.cjdbc.controller.backend.DatabaseBackend.rewriteQuery(), java.sql.Statement.setQueryTimeout(), org.objectweb.cjdbc.controller.backend.DriverCompliance.supportGetGeneratedKeys, と org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetQueryTimeout.
00437 { 00438 try 00439 { 00440 vdb.acquireReadLockBackendLists(); // Acquire read lock 00441 } 00442 catch (InterruptedException e) 00443 { 00444 String msg = Translate.get( 00445 "loadbalancer.backendlist.acquire.readlock.failed", e); 00446 logger.error(msg); 00447 throw new SQLException(msg); 00448 } 00449 00450 try 00451 { 00452 String table = request.getTableName(); 00453 AbstractConnectionManager cm = null; 00454 00455 if (table == null) 00456 throw new SQLException(Translate.get( 00457 "loadbalancer.request.target.table.not.found", request 00458 .getSQLShortForm(vdb.getSQLShortFormLength()))); 00459 00460 // Find the backend that has the needed table 00461 ArrayList backends = vdb.getBackends(); 00462 int size = backends.size(); 00463 00464 DatabaseBackend backend = null; 00465 // The backend that will execute the query 00466 if (request.isCreate()) 00467 { // Choose the backend according to the defined policy 00468 CreateTableRule rule = createTablePolicy.getTableRule(request 00469 .getTableName()); 00470 if (rule == null) 00471 rule = createTablePolicy.getDefaultRule(); 00472 00473 // Ask the rule to pickup a backend 00474 ArrayList choosen; 00475 try 00476 { 00477 choosen = rule.getBackends(backends); 00478 } 00479 catch (CreateTableException e) 00480 { 00481 throw new SQLException(Translate.get( 00482 "loadbalancer.create.table.rule.failed", e.getMessage())); 00483 } 00484 00485 // Get the connection manager from the chosen backend 00486 if (choosen != null) 00487 backend = (DatabaseBackend) choosen.get(0); 00488 if (backend != null) 00489 cm = backend.getConnectionManager(request.getLogin()); 00490 } 00491 else 00492 { // Find the backend that has the table 00493 for (int i = 0; i < size; i++) 00494 { 00495 backend = (DatabaseBackend) backends.get(i); 00496 if (backend.isWriteEnabled() && backend.hasTable(table)) 00497 { 00498 cm = backend.getConnectionManager(request.getLogin()); 00499 break; 00500 } 00501 } 00502 } 00503 00504 // Sanity check 00505 if (cm == null) 00506 throw new SQLException(Translate.get( 00507 "loadbalancer.backend.no.required.table", table)); 00508 00509 if (!backend.getDriverCompliance().supportGetGeneratedKeys()) 00510 throw new SQLException(Translate.get( 00511 "loadbalancer.backend.autogeneratedkeys.unsupported", backend 00512 .getName())); 00513 00514 // Ok, let's execute the query 00515 00516 if (request.isAutoCommit()) 00517 { // Use a connection just for this request 00518 Connection c = null; 00519 try 00520 { 00521 c = cm.getConnection(); 00522 } 00523 catch (UnreachableBackendException e1) 00524 { 00525 logger.error(Translate.get( 00526 "loadbalancer.backend.disabling.unreachable", backend.getName())); 00527 backend.disable(); 00528 throw new SQLException(Translate.get( 00529 "loadbalancer.backend.unreacheable", backend.getName())); 00530 } 00531 00532 // Sanity check 00533 if (c == null) 00534 throw new SQLException(Translate.get( 00535 "loadbalancer.backend.no.connection", backend.getName())); 00536 00537 // Execute Query 00538 ResultSet result; 00539 try 00540 { 00541 backend.addPendingWriteRequest(request); 00542 Statement s = c.createStatement(); 00543 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00544 s.setQueryTimeout(request.getTimeout()); 00545 String sql = request.getSQL(); 00546 // Rewrite the query if needed 00547 sql = backend.rewriteQuery(sql); 00548 int rows = s.executeUpdate(sql, 00549 java.sql.Statement.RETURN_GENERATED_KEYS); 00550 if (logger.isDebugEnabled()) 00551 logger.debug(Translate.get("loadbalancer.request.affected", 00552 new String[]{String.valueOf(request.getId()), 00553 String.valueOf(rows)})); 00554 result = s.getGeneratedKeys(); 00555 updateSchema(backend, request); 00556 } 00557 catch (SQLException e) 00558 { 00559 throw new SQLException(Translate.get("loadbalancer.request.failed", 00560 new String[]{ 00561 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00562 e.getMessage()})); 00563 } 00564 finally 00565 { 00566 backend.removePendingRequest(request); 00567 cm.releaseConnection(c); 00568 } 00569 if (logger.isDebugEnabled()) 00570 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00571 String.valueOf(request.getId()), backend.getName()})); 00572 return result; 00573 } 00574 else 00575 { // Inside a transaction 00576 Connection c; 00577 long tid = request.getTransactionId(); 00578 Long lTid = new Long(tid); 00579 00580 if (!backend.isStartedTransaction(lTid)) 00581 { // transaction has not been started yet on this backend 00582 try 00583 { 00584 c = cm.getConnection(tid); 00585 } 00586 catch (UnreachableBackendException e1) 00587 { 00588 logger.error(Translate 00589 .get("loadbalancer.backend.disabling.unreachable", backend 00590 .getName())); 00591 backend.disable(); 00592 throw new SQLException(Translate.get( 00593 "loadbalancer.backend.unreacheable", backend.getName())); 00594 } 00595 00596 // Sanity check 00597 if (c == null) 00598 throw new SQLException(Translate.get( 00599 "loadbalancer.unable.get.connection", new String[]{ 00600 String.valueOf(tid), backend.getName()})); 00601 00602 // begin transaction 00603 backend.startTransaction(lTid); 00604 c.setAutoCommit(false); 00605 } 00606 else 00607 { // Re-use the connection used by this transaction 00608 c = cm.retrieveConnection(tid); 00609 00610 // Sanity check 00611 if (c == null) 00612 throw new SQLException(Translate.get( 00613 "loadbalancer.unable.retrieve.connection", new String[]{ 00614 String.valueOf(tid), backend.getName()})); 00615 } 00616 00617 // Execute the query 00618 ResultSet result; 00619 try 00620 { 00621 backend.addPendingWriteRequest(request); 00622 Statement s = c.createStatement(); 00623 if (backend.getDriverCompliance().supportSetQueryTimeout()) 00624 s.setQueryTimeout(request.getTimeout()); 00625 String sql = request.getSQL(); 00626 // Rewrite the query if needed 00627 sql = backend.rewriteQuery(sql); 00628 int rows = s.executeUpdate(sql, 00629 java.sql.Statement.RETURN_GENERATED_KEYS); 00630 if (logger.isDebugEnabled()) 00631 logger.debug(Translate.get("loadbalancer.request.affected", 00632 new String[]{String.valueOf(request.getId()), 00633 String.valueOf(rows)})); 00634 result = s.getGeneratedKeys(); 00635 updateSchema(backend, request); 00636 } 00637 catch (SQLException e) 00638 { 00639 throw new SQLException(Translate.get("loadbalancer.request.failed", 00640 new String[]{ 00641 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00642 e.getMessage()})); 00643 } 00644 finally 00645 { 00646 backend.removePendingRequest(request); 00647 } 00648 if (logger.isDebugEnabled()) 00649 logger.debug(Translate.get("loadbalancer.execute.on", new String[]{ 00650 String.valueOf(request.getId()), backend.getName()})); 00651 return result; 00652 } 00653 } 00654 catch (RuntimeException e) 00655 { 00656 String msg = Translate 00657 .get("loadbalancer.request.failed", new String[]{ 00658 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00659 e.getMessage()}); 00660 logger.fatal(msg, e); 00661 throw new SQLException(msg); 00662 } 00663 finally 00664 { 00665 vdb.releaseReadLockBackendLists(); // Release the lock 00666 } 00667 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb0.java の 840 行で定義されています。
00841 { 00842 throw new SQLException( 00843 "Stored procedure calls are not supported with RAIDb-0 load balancers."); 00844 } |
|
Get information about the Request load balancer
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb0.java の 1192 行で定義されています。
01193 { 01194 return "RAIDb-0 Request load balancer\n"; 01195 } |
|
Get the needed query parsing granularity.
AbstractLoadBalancer.java の 151 行で定義されています。 参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.setLoadBalancer().
00152 { 00153 return parsingGranularity; 00154 } |
|
Returns the RAIDbLevel.
AbstractLoadBalancer.java の 131 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.raidbLevel.
00132 { 00133 return raidbLevel; 00134 } |
|
AbstractLoadBalancer.java の 385 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getXmlImpl().
00386 { 00387 StringBuffer info = new StringBuffer(); 00388 info.append("<" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00389 info.append(getXmlImpl()); 00390 info.append("</" + DatabasesXmlTags.ELT_LoadBalancer + ">"); 00391 return info.toString(); 00392 } |
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb0.java の 1200 行で定義されています。
01201 { 01202 StringBuffer info = new StringBuffer(); 01203 info.append("<" + DatabasesXmlTags.ELT_RAIDb_0 + ">"); 01204 createTablePolicy.getXml(); 01205 info.append("</" + DatabasesXmlTags.ELT_RAIDb_0 + ">"); 01206 return info.toString(); 01207 } |
|
Rollbacks a transaction.
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています. RAIDb0.java の 972 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().
00973 { 00974 try 00975 { 00976 backendThreadsRWLock.acquireRead(); 00977 } 00978 catch (InterruptedException e) 00979 { 00980 String msg = Translate.get( 00981 "loadbalancer.backendlist.acquire.readlock.failed", e); 00982 logger.error(msg); 00983 throw new SQLException(msg); 00984 } 00985 int nbOfThreads = backendThreads.size(); 00986 ArrayList rollbackList = new ArrayList(); 00987 Long lTid = new Long(tm.getTransactionId()); 00988 00989 // Build the list of backend that need to rollback this transaction 00990 for (int i = 0; i < nbOfThreads; i++) 00991 { 00992 BackendWorkerThread thread = (BackendWorkerThread) backendThreads.get(i); 00993 if (thread.getBackend().isStartedTransaction(lTid)) 00994 rollbackList.add(thread); 00995 } 00996 00997 nbOfThreads = rollbackList.size(); 00998 00999 // Create the task 01000 RollbackTask task = new RollbackTask(nbOfThreads, // Wait for all to 01001 // rollback 01002 nbOfThreads, tm.getTimeout(), tm.getLogin(), tm.getTransactionId()); 01003 01004 synchronized (task) 01005 { 01006 // Post the task in each backendThread tasklist and wakeup the threads 01007 for (int i = 0; i < nbOfThreads; i++) 01008 { 01009 BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i); 01010 synchronized (thread) 01011 { 01012 thread.addTask(task); 01013 thread.notify(); 01014 } 01015 } 01016 01017 backendThreadsRWLock.releaseRead(); 01018 01019 // Wait for completion (notified by the task) 01020 try 01021 { 01022 // Wait on task 01023 long timeout = tm.getTimeout(); 01024 if (timeout > 0) 01025 { 01026 long start = System.currentTimeMillis(); 01027 task.wait(timeout); 01028 long end = System.currentTimeMillis(); 01029 long remaining = timeout - (end - start); 01030 if (remaining <= 0) 01031 { 01032 String msg = Translate.get("loadbalancer.rollback.timeout", 01033 new String[]{String.valueOf(tm.getTransactionId()), 01034 String.valueOf(task.getSuccess()), 01035 String.valueOf(task.getFailed())}); 01036 logger.warn(msg); 01037 throw new SQLException(msg); 01038 } 01039 } 01040 else 01041 task.wait(); 01042 } 01043 catch (InterruptedException e) 01044 { 01045 throw new SQLException(Translate.get("loadbalancer.rollback.timeout", 01046 new String[]{String.valueOf(tm.getTransactionId()), 01047 String.valueOf(task.getSuccess()), 01048 String.valueOf(task.getFailed())})); 01049 } 01050 01051 if (task.getSuccess() > 0) 01052 return; 01053 else 01054 { // All tasks failed 01055 ArrayList exceptions = task.getExceptions(); 01056 if (exceptions == null) 01057 throw new SQLException(Translate.get( 01058 "loadbalancer.rollback.all.failed", tm.getTransactionId())); 01059 else 01060 { 01061 String errorMsg = Translate.get("loadbalancer.rollback.failed.stack", 01062 tm.getTransactionId()) 01063 + "\n"; 01064 for (int i = 0; i < exceptions.size(); i++) 01065 errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n"; 01066 logger.error(errorMsg); 01067 throw new SQLException(errorMsg); 01068 } 01069 } 01070 } 01071 } |
|
Set the needed query parsing granularity.
AbstractLoadBalancer.java の 161 行で定義されています。
00162 {
00163 this.parsingGranularity = parsingGranularity;
00164 }
|
|
Sets the RAIDbLevel.
AbstractLoadBalancer.java の 141 行で定義されています。
00142 {
00143 this.raidbLevel = raidbLevel;
00144 }
|
|
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerを再定義しています。 RAIDb0.java の 1178 行で定義されています。
01179 { 01180 throw new SQLException("Weight is not supported with this load balancer"); 01181 } |
|
Update the backend schema if needed RAIDb0.java の 408 行で定義されています。 参照先 org.objectweb.cjdbc.common.sql.schema.DatabaseSchema.addTable(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.getDatabaseSchema(), org.objectweb.cjdbc.common.sql.schema.DatabaseSchema.getTable(), org.objectweb.cjdbc.common.sql.AbstractWriteRequest.getTableName(), org.objectweb.cjdbc.common.sql.AbstractWriteRequest.isCreate(), org.objectweb.cjdbc.common.sql.AbstractWriteRequest.isDrop(), と org.objectweb.cjdbc.common.sql.schema.DatabaseSchema.removeTable().
00409 { 00410 DatabaseSchema dbs = b.getDatabaseSchema(); 00411 if (dbs == null) 00412 return; 00413 00414 if (request.isCreate()) 00415 { 00416 // Add the table to the schema 00417 dbs.addTable(((CreateRequest) request).getDatabaseTable()); 00418 if (logger.isDebugEnabled()) 00419 logger.debug(Translate.get("loadbalancer.schema.add.table", request 00420 .getTableName())); 00421 } 00422 else if (request.isDrop()) 00423 { 00424 // Delete the table from the schema 00425 dbs.removeTable(dbs.getTable(request.getTableName())); 00426 if (logger.isDebugEnabled()) 00427 logger.debug(Translate.get("loadbalancer.schema.remove.table", request 00428 .getTableName())); 00429 } 00430 } |
|
RAIDb0.java の 77 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0.RAIDb0(). |
|
RAIDb0.java の 78 行で定義されています。 |
|
RAIDb0.java の 79 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0.RAIDb0(). |
|
初期値: Trace
.getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb0")
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerを再定義しています。 RAIDb0.java の 81 行で定義されています。 |
|
AbstractLoadBalancer.java の 74 行で定義されています。 |
|
|
AbstractLoadBalancer.java の 72 行で定義されています。 参照元 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.AbstractLoadBalancer(). |