src/org/objectweb/cjdbc/controller/requestmanager/distributed/DistributedRequestManager.java

説明を見る。
00001 00025 package org.objectweb.cjdbc.controller.requestmanager.distributed; 00026 00027 import java.sql.SQLException; 00028 import java.util.ArrayList; 00029 import java.util.Vector; 00030 00031 import org.jgroups.blocks.GroupRequest; 00032 import org.objectweb.cjdbc.common.exceptions.VirtualDatabaseException; 00033 import org.objectweb.cjdbc.common.i18n.Translate; 00034 import org.objectweb.cjdbc.common.log.Trace; 00035 import org.objectweb.cjdbc.common.sql.AbstractRequest; 00036 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 00037 import org.objectweb.cjdbc.common.sql.StoredProcedure; 00038 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 00039 import org.objectweb.cjdbc.controller.cache.result.AbstractResultCache; 00040 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer; 00041 import org.objectweb.cjdbc.controller.recoverylog.AbstractRecoveryLog; 00042 import org.objectweb.cjdbc.controller.requestmanager.RequestManager; 00043 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData; 00044 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler; 00045 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet; 00046 import org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase; 00047 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase; 00048 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CJDBCGroupMessage; 00049 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.EnableBackend; 00050 00063 public abstract class DistributedRequestManager extends RequestManager 00064 { 00065 protected DistributedVirtualDatabase dvdb; 00067 private Vector failedOnAllBackends; 00069 private long controllerId; 00071 private ArrayList writeTransactions; 00072 00087 public DistributedRequestManager(DistributedVirtualDatabase vdb, 00088 AbstractScheduler scheduler, AbstractResultCache cache, 00089 AbstractLoadBalancer loadBalancer, AbstractRecoveryLog recoveryLog, 00090 long beginTimeout, long commitTimeout, long rollbackTimeout) 00091 throws SQLException 00092 { 00093 super(vdb, scheduler, cache, loadBalancer, recoveryLog, beginTimeout, 00094 commitTimeout, rollbackTimeout); 00095 dvdb = vdb; 00096 failedOnAllBackends = new Vector(); 00097 writeTransactions = new ArrayList(); 00098 } 00099 00105 public Trace getLogger() 00106 { 00107 return logger; 00108 } 00109 00115 public VirtualDatabase getVirtualDatabase() 00116 { 00117 return dvdb; 00118 } 00119 00120 // 00121 // Database Backends management 00122 // 00123 00134 public void enableBackend(DatabaseBackend db) throws SQLException 00135 { 00136 int size = dvdb.getAllMemberButUs().size(); 00137 if (size > 0) 00138 { 00139 logger.debug(Translate 00140 .get("virtualdatabase.distributed.enable.backend.check")); 00141 00142 // Notify other controllers that we enable this backend. 00143 // No answer is expected. 00144 dvdb.getDispatcher().castMessage(dvdb.getAllMemberButUs(), 00145 CJDBCGroupMessage.getMessage(new EnableBackend(db)), 00146 GroupRequest.GET_NONE, CJDBCGroupMessage.defaultCastTimeOut); 00147 } 00148 00149 loadBalancer.enableBackend(db, true); 00150 vdb.logger.info("Database backend " + db.getName() + " enabled"); 00151 } 00152 00158 public void addFailedOnAllBackends(AbstractRequest request) 00159 { 00160 failedOnAllBackends.add(request); 00161 } 00162 00170 public void completeFailedOnAllBackends(AbstractRequest request, 00171 boolean success) 00172 { 00173 if (!failedOnAllBackends.remove(request)) 00174 { 00175 logger.warn("Unable to find request " 00176 + request.getSQLShortForm(dvdb.getSQLShortFormLength()) 00177 + " in list of requests that failed on all backends."); 00178 return; 00179 } 00180 if (success) 00181 { //We have to invalidate all backends 00182 logger 00183 .error("Request " 00184 + request.getSQLShortForm(dvdb.getSQLShortFormLength()) 00185 + " failed on all local backends but succeeded on other controllers. Disabling all local backends."); 00186 try 00187 { 00188 dvdb.disableAllBackend(); 00189 } 00190 catch (VirtualDatabaseException e) 00191 { 00192 logger.error("An error occured while disabling all backends", e); 00193 } 00194 } 00195 } 00196 00197 // 00198 // Transaction management 00199 // 00200 00206 public void setControllerId(int id) 00207 { 00208 if (id > 0xffff) 00209 { 00210 String msg = "Out of range controller id (" + id + ")"; 00211 logger.error(msg); 00212 throw new RuntimeException(msg); 00213 } 00214 this.controllerId = (0x000000000000ffff & id) << 12; 00215 } 00216 00220 public long begin(String login) throws SQLException 00221 { 00222 try 00223 { 00224 TransactionMarkerMetaData tm = new TransactionMarkerMetaData(0, 00225 beginTimeout, login); 00226 00227 // Wait for the scheduler to give us the authorization to execute 00228 long tid = scheduler.begin(tm); 00229 // 2 first bytes are used for controller id 00230 // 6 right-most bytes are used for transaction id 00231 tid = tid & 0x0000ffffffffffffL; 00232 tid = tid | controllerId; 00233 tm.setTransactionId(tid); 00234 00235 if (logger.isDebugEnabled()) 00236 logger.debug(Translate.get("transaction.begin", String.valueOf(tid))); 00237 00238 try 00239 { 00240 // Send to load balancer 00241 loadBalancer.begin(tm); 00242 00243 // Log the begin 00244 if (recoveryLog != null) 00245 { 00246 recoveryLog.begin(tm); 00247 } 00248 } 00249 catch (SQLException e) 00250 { 00251 throw e; 00252 } 00253 finally 00254 { 00255 // Notify scheduler for completion in any case 00256 scheduler.beginCompleted(tid); 00257 } 00258 00259 tidLoginTable.put(new Long(tid), tm); 00260 return tid; 00261 } 00262 catch (RuntimeException e) 00263 { 00264 logger.fatal(Translate.get( 00265 "fatal.runtime.exception.requestmanager.begin", e)); 00266 throw new SQLException(e.getMessage()); 00267 } 00268 } 00269 00273 public void commit(long transactionId) throws SQLException 00274 { 00275 Long lTid = new Long(transactionId); 00276 boolean isAWriteTransaction; 00277 synchronized (writeTransactions) 00278 { 00279 isAWriteTransaction = writeTransactions.remove(lTid); 00280 } 00281 if (isAWriteTransaction) 00282 distributedCommit(transactionId); 00283 else 00284 // read-only transaction, it is local 00285 super.commit(transactionId); 00286 } 00287 00291 public void rollback(long transactionId) throws SQLException 00292 { 00293 Long lTid = new Long(transactionId); 00294 boolean isAWriteTransaction; 00295 synchronized (writeTransactions) 00296 { 00297 isAWriteTransaction = writeTransactions.remove(lTid); 00298 } 00299 if (isAWriteTransaction) 00300 distributedRollback(transactionId); 00301 else 00302 // read-only transaction, it is local 00303 super.rollback(transactionId); 00304 } 00305 00309 public void scheduleExecWriteRequest(AbstractWriteRequest request) 00310 throws SQLException 00311 { 00312 lazyTransactionStart(request); 00313 super.scheduleExecWriteRequest(request); 00314 } 00315 00323 public void lazyTransactionStart(AbstractRequest request) throws SQLException 00324 { 00325 // Check if this is a remotely started transaction that we need to lazyly 00326 // start locally 00327 if (!request.isAutoCommit()) 00328 { 00329 long tid = request.getTransactionId(); 00330 if ((tid & 0x0000ffffffffffffL) != controllerId) 00331 { // Remote transaction, check that it is started 00332 if (!tidLoginTable.containsKey(new Long(tid))) 00333 { // Begin this transaction 00334 try 00335 { 00336 TransactionMarkerMetaData tm = new TransactionMarkerMetaData(0, 00337 beginTimeout, request.getLogin()); 00338 tm.setTransactionId(tid); 00339 00340 if (logger.isDebugEnabled()) 00341 logger.debug(Translate.get("transaction.begin", String 00342 .valueOf(tid))); 00343 00344 try 00345 { 00346 scheduler.begin(tm); 00347 00348 // Send to load balancer 00349 loadBalancer.begin(tm); 00350 00351 // Log the begin 00352 if (recoveryLog != null) 00353 { 00354 recoveryLog.begin(tm); 00355 } 00356 } 00357 catch (SQLException e) 00358 { 00359 throw e; 00360 } 00361 finally 00362 { 00363 // Notify scheduler for completion in any case 00364 scheduler.beginCompleted(tid); 00365 } 00366 00367 tidLoginTable.put(new Long(tid), tm); 00368 } 00369 catch (RuntimeException e) 00370 { 00371 logger.fatal(Translate.get( 00372 "fatal.runtime.exception.requestmanager.begin", e)); 00373 throw new SQLException(e.getMessage()); 00374 } 00375 } 00376 } 00377 } 00378 } 00379 00383 public int execWriteRequest(AbstractWriteRequest request) throws SQLException 00384 { 00385 if (!request.isAutoCommit()) 00386 { // Add this transaction to the list of write transactions 00387 Long lTid = new Long(request.getTransactionId()); 00388 synchronized (writeTransactions) 00389 { 00390 if (!writeTransactions.contains(lTid)) 00391 writeTransactions.add(lTid); 00392 } 00393 } 00394 return execDistributedWriteRequest(request); 00395 } 00396 00400 public ControllerResultSet execWriteRequestWithKeys( 00401 AbstractWriteRequest request) throws SQLException 00402 { 00403 if (!request.isAutoCommit()) 00404 { // Add this transaction to the list of write transactions 00405 Long lTid = new Long(request.getTransactionId()); 00406 synchronized (writeTransactions) 00407 { 00408 if (!writeTransactions.contains(lTid)) 00409 writeTransactions.add(lTid); 00410 } 00411 } 00412 return execDistributedWriteRequestWithKeys(request); 00413 } 00414 00418 public ControllerResultSet execReadStoredProcedure(StoredProcedure proc) 00419 throws SQLException 00420 { 00421 // If connection is read-only, we don't broadcast 00422 if (proc.isReadOnly()) 00423 return execDistributedReadStoredProcedureLocally(proc); 00424 00425 if (!proc.isAutoCommit()) 00426 { // Add this transaction to the list of write transactions 00427 Long lTid = new Long(proc.getTransactionId()); 00428 synchronized (writeTransactions) 00429 { 00430 if (!writeTransactions.contains(lTid)) 00431 writeTransactions.add(lTid); 00432 } 00433 } 00434 return execDistributedReadStoredProcedure(proc); 00435 } 00436 00440 public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException 00441 { 00442 if (!proc.isAutoCommit()) 00443 { // Add this transaction to the list of write transactions 00444 Long lTid = new Long(proc.getTransactionId()); 00445 synchronized (writeTransactions) 00446 { 00447 if (!writeTransactions.contains(lTid)) 00448 writeTransactions.add(lTid); 00449 } 00450 } 00451 return execDistributedWriteStoredProcedure(proc); 00452 } 00453 00454 // 00455 // RAIDb level specific methods 00456 // 00457 00464 public abstract void distributedCommit(long transactionId) 00465 throws SQLException; 00466 00473 public abstract void distributedRollback(long transactionId) 00474 throws SQLException; 00475 00483 public abstract int execDistributedWriteRequest(AbstractWriteRequest request) 00484 throws SQLException; 00485 00494 public abstract ControllerResultSet execDistributedWriteRequestWithKeys( 00495 AbstractWriteRequest request) throws SQLException; 00496 00504 public abstract ControllerResultSet execDistributedReadStoredProcedure( 00505 StoredProcedure proc) throws SQLException; 00506 00514 public abstract int execDistributedWriteStoredProcedure(StoredProcedure proc) 00515 throws SQLException; 00516 00525 public ControllerResultSet execDistributedReadStoredProcedureLocally( 00526 StoredProcedure proc) throws SQLException 00527 { 00528 return super.execReadStoredProcedure(proc); 00529 } 00530 00539 public int execDistributedWriteStoredProcedureLocally(StoredProcedure proc) 00540 throws SQLException 00541 { 00542 return super.execWriteStoredProcedure(proc); 00543 } 00544 00545 }

CJDBCversion1.0.4に対してTue Oct 12 15:16:03 2004に生成されました。 doxygen 1.3.8