RAIDb1DistributedRequestManager.java の 59 行で定義されています。
|
Creates a new
RAIDb1DistributedRequestManager.java の 75 行で定義されています。
00080 { 00081 super(vdb, scheduler, cache, loadBalancer, recoveryLog, beginTimeout, 00082 commitTimeout, rollbackTimeout); 00083 } |
|
Add a request that failed on all backends.
DistributedRequestManager.java の 165 行で定義されています。
00166 { 00167 failedOnAllBackends.add(request); 00168 } |
|
Creates a new backup with the corresponding checkpoint Note that this will disable the backend for the time of the backup
RequestManager.java の 1408 行で定義されています。 参照先 org.objectweb.cjdbc.controller.core.backup.Octopus.backup().
01411 { 01412 // Sanity checks 01413 if (recoveryLog == null) 01414 { 01415 String msg = Translate.get("recovery.store.checkpoint.failed.cause.null", 01416 checkpointName); 01417 logger.error(msg); 01418 throw new SQLException(msg); 01419 } 01420 01421 // Wait for all pending transactions to finish 01422 logger.info(Translate.get("requestmanager.wait.pending.transactions")); 01423 scheduler.suspendNewTransactionsForCheckpoint(); 01424 scheduler.suspendWrites(); 01425 01426 // Disable backend if needed 01427 if (db.isReadEnabled()) 01428 { 01429 disableBackend(db); 01430 logger.info(Translate.get("backend.state.disabled", db.getName())); 01431 } 01432 01433 // Store checkpoint 01434 recoveryLog.storeCheckpoint(checkpointName); 01435 logger.info(Translate.get("recovery.checkpoint.stored", checkpointName)); 01436 01437 // Resume writes and transactions 01438 scheduler.resumeNewTransactions(); 01439 scheduler.resumeWrites(); 01440 01441 try 01442 { 01443 logger.info(Translate 01444 .get("controller.backup.octopus.start", db.getName())); 01445 Octopus octopus = new Octopus(db, checkpointName, tables); 01446 octopus.backup(); 01447 db.setLastKnownCheckpoint(checkpointName); 01448 } 01449 catch (OctopusException e) 01450 { 01451 logger.error(Translate.get("controller.backup.octopus.failed", e)); 01452 throw new SQLException(e.getMessage()); 01453 } 01454 catch (BackupException be) 01455 { 01456 logger.error(Translate.get("controller.backup.failed", be)); 01457 throw new SQLException(be.getMessage()); 01458 } 01459 logger.info(Translate.get("controller.backup.complete", db.getName())); 01460 01461 // if (enableAfter) 01462 // { 01463 // enableBackendFromCheckpoint(db,checkpointName); 01464 // } 01465 01466 } |
|
org.objectweb.cjdbc.controller.requestmanager.RequestManagerを再定義しています。 DistributedRequestManager.java の 227 行で定義されています。 参照先 org.objectweb.cjdbc.controller.recoverylog.AbstractRecoveryLog.begin(), org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.begin(), org.objectweb.cjdbc.controller.scheduler.AbstractScheduler.begin(), org.objectweb.cjdbc.controller.scheduler.AbstractScheduler.beginCompleted(), org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager.controllerId, org.objectweb.cjdbc.common.log.Trace.debug(), org.objectweb.cjdbc.common.log.Trace.fatal(), org.objectweb.cjdbc.common.log.Trace.isDebugEnabled(), と org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData.setTransactionId().
00228 { 00229 try 00230 { 00231 TransactionMarkerMetaData tm = new TransactionMarkerMetaData(0, 00232 beginTimeout, login); 00233 00234 // Wait for the scheduler to give us the authorization to execute 00235 long tid = scheduler.begin(tm); 00236 // 2 first bytes are used for controller id 00237 // 6 right-most bytes are used for transaction id 00238 tid = tid & 0x0000ffffffffffffL; 00239 tid = tid | controllerId; 00240 tm.setTransactionId(tid); 00241 00242 if (logger.isDebugEnabled()) 00243 logger.debug(Translate.get("transaction.begin", "" + tid)); 00244 00245 try 00246 { 00247 // Send to load balancer 00248 loadBalancer.begin(tm); 00249 00250 // Log the begin 00251 if (recoveryLog != null) 00252 { 00253 recoveryLog.begin(tm); 00254 } 00255 } 00256 catch (SQLException e) 00257 { 00258 throw e; 00259 } 00260 finally 00261 { 00262 // Notify scheduler for completion in any case 00263 scheduler.beginCompleted(tid); 00264 } 00265 00266 tidLoginTable.put(new Long(tid), tm); 00267 return tid; 00268 } 00269 catch (RuntimeException e) 00270 { 00271 logger.fatal(Translate.get( 00272 "fatal.runtime.exception.requestmanager.begin", e)); 00273 throw new SQLException(e.getMessage()); 00274 } 00275 } |
|
org.objectweb.cjdbc.controller.requestmanager.RequestManagerを再定義しています。 DistributedRequestManager.java の 280 行で定義されています。 参照先 org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager.distributedCommit(), と org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager.writeTransactions.
00281 { 00282 Long lTid = new Long(transactionId); 00283 boolean isAWriteTransaction; 00284 synchronized (writeTransactions) 00285 { 00286 isAWriteTransaction = writeTransactions.remove(lTid); 00287 } 00288 if (isAWriteTransaction) 00289 distributedCommit(transactionId); 00290 else 00291 // read-only transaction, it is local 00292 super.commit(transactionId); 00293 } |
|
Notify completion of a request that failed on all backends. If completion was successful, all local backends are disabled.
DistributedRequestManager.java の 177 行で定義されています。 参照先 org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.disableAllBackend(), org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager.dvdb, org.objectweb.cjdbc.common.log.Trace.error(), org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager.failedOnAllBackends, org.objectweb.cjdbc.common.sql.AbstractRequest.getSQLShortForm(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getSQLShortFormLength(), と org.objectweb.cjdbc.common.log.Trace.warn().
00179 { 00180 if (!failedOnAllBackends.remove(request)) 00181 { 00182 logger.warn("Unable to find request " 00183 + request.getSQLShortForm(dvdb.getSQLShortFormLength()) 00184 + " in list of requests that failed on all backends."); 00185 return; 00186 } 00187 if (success) 00188 { //We have to invalidate all backends 00189 logger 00190 .error("Request " 00191 + request.getSQLShortForm(dvdb.getSQLShortFormLength()) 00192 + " failed on all local backends but succeeded on other controllers. Disabling all local backends."); 00193 try 00194 { 00195 dvdb.disableAllBackend(); 00196 } 00197 catch (VirtualDatabaseException e) 00198 { 00199 logger.error("An error occured while disabling all backends", e); 00200 } 00201 } 00202 } |
|
Complete the transaction by removing it for the tidLoginTable.
RequestManager.java の 972 行で定義されています。
00973 { 00974 tidLoginTable.remove(tid); 00975 } |
|
Disable a backend that is currently enabled on this virtual database. The backend is disabled without further check. The load balancer disabled method is called on the specified backend.
RequestManager.java の 1308 行で定義されています。
01309 { 01310 if (db.isReadEnabled() || db.isWriteEnabled()) 01311 { 01312 loadBalancer.disableBackend(db); 01313 logger.info(Translate.get("backend.state.disabled", db.getName())); 01314 } 01315 else 01316 { 01317 throw new SQLException(Translate.get("backend.already.disabled", db 01318 .getName())); 01319 } 01320 } |
|
The backend must belong to this virtual database and be in the enabled state. The backend is disabled once all the pending write queries are executed. A checkpoint is inserted in the recovery log.
RequestManager.java の 1333 行で定義されています。
01335 { 01336 ArrayList al = new ArrayList(1); 01337 al.add(db); 01338 this.disableBackendsForCheckpoint(al, checkpointName); 01339 } |
|
Disable a list of backends. Only to store only one checkpoint, and to disable all the backends at the same time so the the system is in a coherent state.
RequestManager.java の 1361 行で定義されています。 参照先 org.objectweb.cjdbc.controller.backend.DatabaseBackend.getName(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.setLastKnownCheckpoint().
01363 { 01364 // Sanity checks 01365 if (recoveryLog == null) 01366 { 01367 String msg = Translate.get("recovery.store.checkpoint.failed.cause.null", 01368 checkpointName); 01369 logger.error(msg); 01370 throw new SQLException(msg); 01371 } 01372 01373 // Wait for all pending transactions to finish 01374 logger.info(Translate.get("requestmanager.wait.pending.transactions")); 01375 scheduler.suspendNewTransactionsForCheckpoint(); 01376 scheduler.suspendWrites(); 01377 01378 // Store checkpoint 01379 recoveryLog.storeCheckpoint(checkpointName); 01380 logger.info(Translate.get("recovery.checkpoint.stored", checkpointName)); 01381 01382 // Disable all backends 01383 int size = backends.size(); 01384 DatabaseBackend db; 01385 for (int i = 0; i < size; i++) 01386 { 01387 db = (DatabaseBackend) backends.get(i); 01388 loadBalancer.disableBackend(db); 01389 db.setLastKnownCheckpoint(checkpointName); 01390 logger.info(Translate.get("backend.state.disabled", db.getName())); 01391 } 01392 01393 // Resume transactions and writes 01394 scheduler.resumeNewTransactions(); 01395 scheduler.resumeWrites(); 01396 } |
|
org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManagerに実装されています. RAIDb1DistributedRequestManager.java の 296 行で定義されています。 参照先 org.objectweb.cjdbc.common.sql.AbstractRequest.setTransactionId().
00297 { 00298 try 00299 { 00300 if (logger.isDebugEnabled()) 00301 logger.debug("Sending transaction " + transactionId 00302 + " commit to all controllers."); 00303 Vector groupMembers = dvdb.getChannel().getView().getMembers(); 00304 // Send the query to everybody including us 00305 RspList responses = dvdb.getDispatcher().castMessage(groupMembers, 00306 CJDBCGroupMessage.getMessage(new Commit(transactionId)), 00307 GroupRequest.GET_ALL, this.commitTimeout); 00308 00309 if (logger.isDebugEnabled()) 00310 logger.debug("Commit of transaction " + transactionId + " completed."); 00311 00312 if (responses.numSuspectedMembers() > 0) 00313 { // Some controllers failed ... too bad ! 00314 logger.warn(responses.numSuspectedMembers() 00315 + " controller(s) died during execution of commit for transaction " 00316 + transactionId); 00317 } 00318 00319 // List of controllers that gave a AllBackendsFailedException 00320 Vector failedOnAllBackends = null; 00321 SQLException exception = null; 00322 Vector validResults = responses.getResults(); 00323 int size = groupMembers.size(); 00324 boolean success = false; 00325 // Get the result of each controller 00326 for (int i = 0; i < size; i++) 00327 { 00328 Address address = (Address) groupMembers.get(i); 00329 if (responses.isSuspected(address)) 00330 { 00331 logger.warn("Controller " + address + " is suspected of failure."); 00332 continue; 00333 } 00334 Object r = responses.get(address); 00335 if (r instanceof Boolean) 00336 { 00337 if (((Boolean) r).booleanValue()) 00338 success = true; 00339 else 00340 logger.error("Unexpected result for controller " + address); 00341 } 00342 else if (r instanceof AllBackendsFailedException) 00343 { 00344 if (failedOnAllBackends == null) 00345 failedOnAllBackends = new Vector(); 00346 failedOnAllBackends.add(address); 00347 if (logger.isDebugEnabled()) 00348 logger.debug("Commit failed on all backends of controller " 00349 + address + " (" + r + ")"); 00350 } 00351 else if (r instanceof SQLException) 00352 { 00353 String msg = "Commit of transaction " + transactionId 00354 + " failed on controller " + address + " (" + r + ")"; 00355 logger.warn(msg); 00356 exception = (SQLException) r; 00357 } 00358 } 00359 00360 if (failedOnAllBackends != null) 00361 { // Notify all controllers of completion 00362 AbstractRequest request = new UnknownRequest("commit", false, 0); 00363 request.setTransactionId(transactionId); 00364 dvdb.getDispatcher().castMessage( 00365 failedOnAllBackends, 00366 CJDBCGroupMessage 00367 .getMessage(new NotifyCompletion(request, success)), 00368 GroupRequest.GET_NONE, commitTimeout); 00369 } 00370 00371 if (exception != null) 00372 throw exception; 00373 // At this point, all controllers failed 00374 String msg = "Transaction " + transactionId 00375 + " failed to commit on all controllers"; 00376 logger.warn(msg); 00377 throw new SQLException(msg); 00378 } 00379 catch (SQLException e) 00380 { 00381 String msg = "Transaction " + transactionId + " commit failed (" + e 00382 + ")"; 00383 logger.warn(msg); 00384 throw e; 00385 } 00386 } |
|
org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManagerに実装されています. RAIDb1DistributedRequestManager.java の 391 行で定義されています。 参照先 org.objectweb.cjdbc.common.sql.AbstractRequest.setTransactionId().
00392 { 00393 try 00394 { 00395 if (logger.isDebugEnabled()) 00396 logger.debug("Sending transaction " + transactionId 00397 + " rollback to all controllers."); 00398 Vector groupMembers = dvdb.getChannel().getView().getMembers(); 00399 // Send the query to everybody including us 00400 RspList responses = dvdb.getDispatcher().castMessage(groupMembers, 00401 CJDBCGroupMessage.getMessage(new Rollback(transactionId)), 00402 GroupRequest.GET_ALL, this.rollbackTimeout); 00403 00404 if (logger.isDebugEnabled()) 00405 logger 00406 .debug("rollback of transaction " + transactionId + " completed."); 00407 00408 if (responses.numSuspectedMembers() > 0) 00409 { // Some controllers failed ... too bad ! 00410 logger 00411 .warn(responses.numSuspectedMembers() 00412 + " controller(s) died during execution of rollback for transaction " 00413 + transactionId); 00414 } 00415 00416 // List of controllers that gave a AllBackendsFailedException 00417 Vector failedOnAllBackends = null; 00418 SQLException exception = null; 00419 Vector validResults = responses.getResults(); 00420 int size = groupMembers.size(); 00421 boolean success = false; 00422 // Get the result of each controller 00423 for (int i = 0; i < size; i++) 00424 { 00425 Address address = (Address) groupMembers.get(i); 00426 if (responses.isSuspected(address)) 00427 { 00428 logger.warn("Controller " + address + " is suspected of failure."); 00429 continue; 00430 } 00431 Object r = responses.get(address); 00432 if (r instanceof Boolean) 00433 { 00434 if (((Boolean) r).booleanValue()) 00435 success = true; 00436 else 00437 logger.error("Unexpected result for controller " + address); 00438 } 00439 else if (r instanceof AllBackendsFailedException) 00440 { 00441 if (failedOnAllBackends == null) 00442 failedOnAllBackends = new Vector(); 00443 failedOnAllBackends.add(address); 00444 if (logger.isDebugEnabled()) 00445 logger.debug("rollback failed on all backends of controller " 00446 + address + " (" + r + ")"); 00447 } 00448 else if (r instanceof SQLException) 00449 { 00450 String msg = "rollback of transaction " + transactionId 00451 + " failed on controller " + address + " (" + r + ")"; 00452 logger.warn(msg); 00453 exception = (SQLException) r; 00454 } 00455 } 00456 00457 if (failedOnAllBackends != null) 00458 { // Notify all controllers of completion 00459 AbstractRequest request = new UnknownRequest("rollback", false, 0); 00460 request.setTransactionId(transactionId); 00461 dvdb.getDispatcher().castMessage( 00462 failedOnAllBackends, 00463 CJDBCGroupMessage 00464 .getMessage(new NotifyCompletion(request, success)), 00465 GroupRequest.GET_NONE, rollbackTimeout); 00466 } 00467 00468 if (exception != null) 00469 throw exception; 00470 // At this point, all controllers failed 00471 String msg = "Transaction " + transactionId 00472 + " failed to rollback on all controllers"; 00473 logger.warn(msg); 00474 throw new SQLException(msg); 00475 } 00476 catch (SQLException e) 00477 { 00478 String msg = "Transaction " + transactionId + " rollback failed (" + e 00479 + ")"; 00480 logger.warn(msg); 00481 throw e; 00482 } 00483 } |
|
Enable a backend that has been previously added to this virtual database and that is in the disabled state. We check we the other controllers if this backend must be enabled in read-only or read-write. The current policy is that the first one to enable this backend will have read-write access to it and others will be in read-only.
org.objectweb.cjdbc.controller.requestmanager.RequestManagerを再定義しています。 DistributedRequestManager.java の 136 行で定義されています。 参照先 org.objectweb.cjdbc.common.log.Trace.debug(), org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager.dvdb, org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.enableBackend(), org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase.getChannel(), org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase.getDispatcher(), org.objectweb.cjdbc.common.log.Trace.info(), と org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.logger.
00137 { 00138 JChannel channel = dvdb.getChannel(); 00139 // Compute the group list without us 00140 Vector allButUs = channel.getView().getMembers(); 00141 allButUs.remove(channel.getLocalAddress()); 00142 00143 int size = allButUs.size(); 00144 if (size > 0) 00145 { 00146 logger.debug(Translate 00147 .get("virtualdatabase.distributed.enable.backend.check")); 00148 00149 // Notify other controllers that we enable this backend. 00150 // No answer is expected. 00151 RspList rspList = dvdb.getDispatcher().castMessage(allButUs, 00152 CJDBCGroupMessage.getMessage(new EnableBackend(db)), 00153 GroupRequest.GET_NONE, 0); 00154 } 00155 00156 loadBalancer.enableBackend(db, true); 00157 vdb.logger.info("Database backend " + db.getName() + " enabled"); 00158 } |
|
The backend must have been previously added to this virtual database and be in the disabled state. All the queries since the given checkpoint are played and the backend state is set to enabled when it is completely synchronized.
RequestManager.java の 1188 行で定義されています。 参照先 org.objectweb.cjdbc.controller.recoverylog.RecoveryTask.getId(), と org.objectweb.cjdbc.controller.recoverylog.RecoveryTask.getTask().
01190 { 01191 // Sanity check 01192 if (recoveryLog == null) 01193 { 01194 String msg = Translate.get( 01195 "recovery.restore.checkpoint.failed.cause.null", checkpointName); 01196 logger.error(msg); 01197 throw new SQLException(msg); 01198 } 01199 01200 db.initializeConnections(); 01201 recoveryLog.beginRecovery(); 01202 01203 // Get the checkpoint from the recovery log 01204 long logIdx; 01205 try 01206 { 01207 logIdx = recoveryLog.getCheckpointRequestId(checkpointName); 01208 } 01209 catch (SQLException e) 01210 { 01211 recoveryLog.endRecovery(); 01212 String msg = Translate.get("recovery.cannot.get.checkpoint", e); 01213 logger.error(msg); 01214 throw new SQLException(msg); 01215 } 01216 01217 // Get a temporary worker thread to execute the queries on this backend 01218 BackendWorkerThread bwt = new BackendWorkerThread(db, loadBalancer); 01219 bwt.start(); 01220 RecoveryTask recoveryTask = null; 01221 01222 logger.info(Translate.get("recovery.start.process")); 01223 01224 // Replay the whole log 01225 while (logIdx != -1) 01226 { 01227 try 01228 { 01229 recoveryTask = recoveryLog.recoverNextRequest(logIdx); 01230 } 01231 catch (SQLException e) 01232 { 01233 // Signal end of recovery and kill worker thread 01234 recoveryLog.endRecovery(); 01235 addWorkerTask(bwt, new KillThreadTask(1, 1)); 01236 String msg = Translate.get("recovery.cannot.recover.from.index", e); 01237 logger.error(msg); 01238 throw new SQLException(msg); 01239 } 01240 if (recoveryTask == null) 01241 break; 01242 addWorkerTask(bwt, recoveryTask.getTask()); 01243 logIdx = recoveryTask.getId(); 01244 } 01245 01246 // Suspend the writes 01247 scheduler.suspendWrites(); 01248 01249 // Play the remaining writes that were pending 01250 logIdx++; 01251 01252 while (logIdx != -1) 01253 { 01254 try 01255 { 01256 recoveryTask = recoveryLog.recoverNextRequest(logIdx); 01257 } 01258 catch (SQLException e) 01259 { 01260 // Signal end of recovery and kill worker thread 01261 recoveryLog.endRecovery(); 01262 addWorkerTask(bwt, new KillThreadTask(1, 1)); 01263 String msg = Translate.get("recovery.cannot.recover.from.index", e); 01264 logger.error(msg); 01265 scheduler.resumeWrites(); 01266 throw new SQLException(msg); 01267 } 01268 if (recoveryTask == null) 01269 break; 01270 addWorkerTask(bwt, recoveryTask.getTask()); 01271 logIdx = recoveryTask.getId(); 01272 } 01273 01274 // We are done with the recovery 01275 logger.info(Translate.get("recovery.process.complete")); 01276 addWorkerTask(bwt, new KillThreadTask(1, 1)); 01277 try 01278 { 01279 bwt.join(); 01280 } 01281 catch (InterruptedException e) 01282 { 01283 recoveryLog.endRecovery(); 01284 String msg = Translate.get("recovery.join.failed", e); 01285 logger.error(msg); 01286 scheduler.resumeWrites(); 01287 throw new SQLException(msg); 01288 } 01289 01290 recoveryLog.endRecovery(); 01291 01292 // Now enable it 01293 loadBalancer.enableBackend(db, true); 01294 scheduler.resumeWrites(); 01295 logger.info(Translate.get("backend.state.enabled", db.getName())); 01296 } |
|
org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManagerに実装されています. RAIDb1DistributedRequestManager.java の 88 行で定義されています。 参照先 org.objectweb.cjdbc.common.log.Trace.debug(), org.objectweb.cjdbc.common.log.Trace.error(), org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase.getChannel(), org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase.getDispatcher(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getSQLShortFormLength(), org.objectweb.cjdbc.common.log.Trace.isDebugEnabled(), と org.objectweb.cjdbc.common.log.Trace.warn().
00090 { 00091 try 00092 { 00093 int execWriteRequestResult = -1; 00094 00095 if (logger.isDebugEnabled()) 00096 logger.debug("Broadcasting request " 00097 + request.getSQLShortForm(dvdb.getSQLShortFormLength()) 00098 + " to all controllers."); 00099 Vector groupMembers = dvdb.getChannel().getView().getMembers(); 00100 // Send the query to everybody including us 00101 RspList responses = dvdb.getDispatcher().castMessage(groupMembers, 00102 CJDBCGroupMessage.getMessage(new ExecWriteRequest(request)), 00103 GroupRequest.GET_ALL, request.getTimeout()); 00104 00105 if (logger.isDebugEnabled()) 00106 logger.debug("Request " 00107 + request.getSQLShortForm(dvdb.getSQLShortFormLength()) 00108 + " completed."); 00109 00110 if (responses.numSuspectedMembers() > 0) 00111 { // Some controllers failed ... too bad ! 00112 logger.warn(responses.numSuspectedMembers() 00113 + " controller(s) died during execution of request " 00114 + request.getId()); 00115 } 00116 00117 // List of controllers that gave a AllBackendsFailedException 00118 Vector failedOnAllBackends = null; 00119 SQLException exception = null; 00120 Vector validResults = responses.getResults(); 00121 int size = groupMembers.size(); 00122 // Get the result of each controller 00123 for (int i = 0; i < size; i++) 00124 { 00125 Address address = (Address) groupMembers.get(i); 00126 if (responses.isSuspected(address)) 00127 { 00128 logger.warn("Controller " + address + " is suspected of failure."); 00129 continue; 00130 } 00131 Object r = responses.get(address); 00132 if (r instanceof Integer) 00133 { 00134 if (execWriteRequestResult == -1) 00135 execWriteRequestResult = ((Integer) r).intValue(); 00136 else if (execWriteRequestResult != ((Integer) r).intValue()) 00137 logger.error("Controllers have different results for request " 00138 + request.getId()); 00139 } 00140 else if (r instanceof AllBackendsFailedException) 00141 { 00142 if (failedOnAllBackends == null) 00143 failedOnAllBackends = new Vector(); 00144 failedOnAllBackends.add(address); 00145 if (logger.isDebugEnabled()) 00146 logger.debug("Request failed on all backends of controller " 00147 + address + " (" + r + ")"); 00148 } 00149 else if (r instanceof SQLException) 00150 { 00151 String msg = "Request " + request.getId() + " failed on controller " 00152 + address + " (" + r + ")"; 00153 logger.warn(msg); 00154 exception = (SQLException) r; 00155 } 00156 } 00157 00158 if (failedOnAllBackends != null) 00159 { // Notify all controllers of completion 00160 dvdb.getDispatcher().castMessage( 00161 failedOnAllBackends, 00162 CJDBCGroupMessage.getMessage(new NotifyCompletion(request, 00163 execWriteRequestResult != -1)), GroupRequest.GET_NONE, 00164 request.getTimeout()); 00165 } 00166 00167 if (execWriteRequestResult != -1) 00168 return execWriteRequestResult; 00169 else if (exception != null) 00170 throw exception; 00171 // At this point, all controllers failed 00172 String msg = "Request '" + request + "' failed on all controllers"; 00173 logger.warn(msg); 00174 throw new SQLException(msg); 00175 } 00176 catch (SQLException e) 00177 { 00178 String msg = Translate 00179 .get("loadbalancer.request.failed", new String[]{ 00180 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00181 e.getMessage()}); 00182 logger.warn(msg); 00183 throw e; 00184 } 00185 } |
|
org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManagerに実装されています. RAIDb1DistributedRequestManager.java の 190 行で定義されています。
00192 { 00193 try 00194 { 00195 ControllerResultSet execWriteRequestResult = null; 00196 00197 if (logger.isDebugEnabled()) 00198 logger.debug("Broadcasting request " 00199 + request.getSQLShortForm(dvdb.getSQLShortFormLength()) 00200 + " to all controllers."); 00201 Vector groupMembers = dvdb.getChannel().getView().getMembers(); 00202 // Send the query to everybody including us 00203 RspList responses = dvdb.getDispatcher().castMessage(groupMembers, 00204 CJDBCGroupMessage.getMessage(new ExecWriteRequestWithKeys(request)), 00205 GroupRequest.GET_ALL, request.getTimeout()); 00206 00207 if (logger.isDebugEnabled()) 00208 logger.debug("Request " 00209 + request.getSQLShortForm(dvdb.getSQLShortFormLength()) 00210 + " completed."); 00211 00212 if (responses.numSuspectedMembers() > 0) 00213 { // Some controllers failed ... too bad ! 00214 logger.warn(responses.numSuspectedMembers() 00215 + " controller(s) died during execution of request " 00216 + request.getId()); 00217 } 00218 00219 // List of controllers that gave a AllBackendsFailedException 00220 Vector failedOnAllBackends = null; 00221 SQLException exception = null; 00222 Vector validResults = responses.getResults(); 00223 int size = groupMembers.size(); 00224 // Get the result of each controller 00225 for (int i = 0; i < size; i++) 00226 { 00227 Address address = (Address) groupMembers.get(i); 00228 if (responses.isSuspected(address)) 00229 { 00230 logger.warn("Controller " + address + " is suspected of failure."); 00231 continue; 00232 } 00233 Object r = responses.get(address); 00234 if (r instanceof ResultSet) 00235 { 00236 if (execWriteRequestResult == null) 00237 execWriteRequestResult = (ControllerResultSet) r; 00238 } 00239 else if (r instanceof AllBackendsFailedException) 00240 { 00241 if (failedOnAllBackends == null) 00242 failedOnAllBackends = new Vector(); 00243 failedOnAllBackends.add(address); 00244 } 00245 else if (r instanceof SQLException) 00246 { 00247 String msg = "Request " + request.getId() + " failed on controller " 00248 + address + " (" + r + ")"; 00249 logger.warn(msg); 00250 exception = (SQLException) r; 00251 } 00252 } 00253 00254 if (failedOnAllBackends != null) 00255 { // Notify all controllers of completion 00256 dvdb.getDispatcher().castMessage( 00257 failedOnAllBackends, 00258 CJDBCGroupMessage.getMessage(new NotifyCompletion(request, 00259 execWriteRequestResult != null)), GroupRequest.GET_NONE, 00260 request.getTimeout()); 00261 } 00262 00263 if (execWriteRequestResult != null) 00264 return execWriteRequestResult; 00265 else if (exception != null) 00266 throw exception; 00267 // At this point, all controllers failed 00268 String msg = "Request '" + request + "' failed on all controllers"; 00269 logger.warn(msg); 00270 throw new SQLException(msg); 00271 } 00272 catch (SQLException e) 00273 { 00274 String msg = Translate 00275 .get("loadbalancer.request.failed", new String[]{ 00276 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00277 e.getMessage()}); 00278 logger.warn(msg); 00279 throw e; 00280 } 00281 } |
|
org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManagerに実装されています. RAIDb1DistributedRequestManager.java の 286 行で定義されています。
00288 { 00289 // TODO Auto-generated method stub 00290 return 0; 00291 } |
|
Perform a read request and return the reply. Call first the scheduler (if defined), then the cache (if defined) and finally the load balancer.
RequestManager.java の 290 行で定義されています。 参照元 org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.execReadRequest().
00292 { 00293 // Sanity check 00294 if (!request.isAutoCommit()) 00295 { // Check that the transaction has been started 00296 long tid = request.getTransactionId(); 00297 if (!tidLoginTable.containsKey(new Long(tid))) 00298 throw new SQLException(Translate.get("transaction.not.started", tid)); 00299 } 00300 00301 // If we need to parse the request, try to get the parsing from the cache. 00302 // Note that if we have a cache miss but backgroundParsing has been turned 00303 // on, then this call will start a ParsedThread in background. 00304 if ((requiredGranularity != ParsingGranularities.NO_PARSING) 00305 && (!request.isParsed())) 00306 { 00307 if (parsingCache == null) 00308 request.parse(dbs, requiredGranularity, isCaseSensitiveParsing); 00309 else 00310 parsingCache.getParsingFromCache(request); 00311 } 00312 00313 // 00314 // SCHEDULER 00315 // 00316 00317 // Get the parsing now if the request is not yet parsed. The parsing is 00318 // handled by the ParsingCache that may already have parsed the request 00319 // in background (if backgroundParsing is set). 00320 if ((schedulerParsingranularity != ParsingGranularities.NO_PARSING) 00321 && !request.isParsed()) 00322 { 00323 if (parsingCache == null) 00324 request.parse(dbs, requiredGranularity, isCaseSensitiveParsing); 00325 else 00326 parsingCache.getParsingFromCacheAndParseIfMissing(request); 00327 } 00328 00329 // Wait for the scheduler to give us the authorization to execute 00330 scheduler.scheduleReadRequest(request); 00331 00332 if (logger.isDebugEnabled()) 00333 logger.debug(Translate.get("requestmanager.read.request", new String[]{ 00334 request.getId() + "", 00335 request.getSQLShortForm(vdb.getSQLShortFormLength())})); 00336 00337 // 00338 // CACHE 00339 // 00340 00341 ControllerResultSet result = null; 00342 try 00343 { // Check cache if any 00344 if (requestCache != null) 00345 { 00346 CacheEntry qce = requestCache.getFromCache(request, true); 00347 if (qce != null) 00348 { 00349 result = qce.getResult(); 00350 if (result != null) 00351 { // Cache hit ! 00352 if (vdb.getSQLMonitor() != null) 00353 vdb.getSQLMonitor().logCacheHit(request); 00354 00355 scheduler.readCompleted(request); 00356 return result; 00357 } 00358 } 00359 } 00360 00361 // 00362 // LOAD BALANCER 00363 // 00364 00365 // At this point, we have a result cache miss. 00366 // If we had a parsing cache miss too, wait for the parsing to be done if 00367 // needed. 00368 if ((loadBalancerParsingranularity != ParsingGranularities.NO_PARSING) 00369 && !request.isParsed()) 00370 { 00371 if (parsingCache == null) 00372 request.parse(dbs, requiredGranularity, isCaseSensitiveParsing); 00373 else 00374 parsingCache.getParsingFromCacheAndParseIfMissing(request); 00375 } 00376 00377 // Send the request to the load balancer 00378 result = new ControllerResultSet(request, loadBalancer 00379 .execReadRequest(request), metadataCache); 00380 00381 // 00382 // UPDATES & NOTIFICATIONS 00383 // 00384 00385 // Update cache 00386 if ((requestCache != null) 00387 && (request.getCacheAbility() != RequestType.UNCACHEABLE)) 00388 { 00389 if (!request.isParsed() 00390 && (cacheParsingranularity != ParsingGranularities.NO_PARSING)) 00391 { // The cache was the only one to need parsing and 00392 // the request was not previously in the cache 00393 { 00394 if (parsingCache == null) 00395 request.parse(dbs, requiredGranularity, isCaseSensitiveParsing); 00396 else 00397 parsingCache.getParsingFromCacheAndParseIfMissing(request); 00398 } 00399 } 00400 requestCache.addToCache(request, result); 00401 } 00402 } 00403 catch (Exception failed) 00404 { 00405 if (requestCache != null) 00406 requestCache.removeFromPendingQueries(request); 00407 scheduler.readCompleted(request); 00408 String msg = Translate.get("requestmanager.request.failed", new String[]{ 00409 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00410 failed.getMessage()}); 00411 if (failed instanceof RuntimeException) 00412 logger.warn(msg, failed); 00413 else 00414 logger.warn(msg); 00415 throw new SQLException(msg); 00416 } 00417 00418 // Notify scheduler of completion 00419 scheduler.readCompleted(request); 00420 00421 return result; 00422 } |
|
Call a stored procedure that returns a ResultSet.
RequestManager.java の 702 行で定義されています。 参照元 org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.execReadStoredProcedure().
00704 { 00705 // Sanity check 00706 if (!proc.isAutoCommit()) 00707 { // Check that the transaction has been started 00708 long tid = proc.getTransactionId(); 00709 if (!tidLoginTable.containsKey(new Long(tid))) 00710 throw new SQLException(Translate.get("transaction.not.started", tid)); 00711 } 00712 00713 // 00714 // SCHEDULER 00715 // 00716 00717 // Note that no parsing is required for stored procedures. 00718 // We build a fake request that span over all tables to be sure 00719 // that the scheduler will lock everything. 00720 SelectRequest spanOnAllTables = null; 00721 if (requiredGranularity != ParsingGranularities.NO_PARSING) 00722 { 00723 String sql = "SELECT * FROM"; 00724 ArrayList tables = dbs.getTables(); 00725 int size = tables.size(); 00726 // The following code does not generate a syntaxically correct SQL 00727 // request (an extra coma at the beginning) but that is not an issue. 00728 for (int i = 0; i < size; i++) 00729 sql += " ," + ((DatabaseTable) tables.get(i)).getName(); 00730 00731 spanOnAllTables = new SelectRequest(sql, false, 0); 00732 } 00733 else 00734 spanOnAllTables = new SelectRequest("select * from x", false, 0); 00735 00736 // Wait for the scheduler to give us the authorization to execute 00737 scheduler.scheduleReadRequest(spanOnAllTables); 00738 00739 if (logger.isDebugEnabled()) 00740 logger.debug(Translate.get("requestmanager.read.store.procedure", 00741 new String[]{String.valueOf(proc.getId()), 00742 proc.getSQLShortForm(vdb.getSQLShortFormLength())})); 00743 00744 // 00745 // CACHE 00746 // 00747 00748 // Cache is always flushed unless the user has explicitely set the 00749 // connection to read-only mode in which case we assume that the 00750 // users deliberately forces the cache not to be flushed when calling 00751 // this stored procedure. 00752 if ((requestCache != null) && (!proc.isReadOnly())) 00753 requestCache.flushCache(); 00754 00755 ControllerResultSet result = null; 00756 try 00757 { 00758 // 00759 // LOAD BALANCER 00760 // 00761 00762 // Send the request to the load balancer 00763 if (proc.isReadOnly()) 00764 result = new ControllerResultSet(proc, loadBalancer 00765 .execReadOnlyReadStoredProcedure(proc), metadataCache); 00766 else 00767 result = new ControllerResultSet(proc, loadBalancer 00768 .execReadStoredProcedure(proc), metadataCache); 00769 00770 // 00771 // RECOVERY LOG 00772 // 00773 00774 if (recoveryLog != null) 00775 recoveryLog.logRequest(proc, true); 00776 00777 } 00778 catch (Exception failed) 00779 { 00780 scheduler.readCompleted(spanOnAllTables); 00781 String msg = Translate.get("requestmanager.store.procedure.failed", 00782 new String[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()), 00783 failed.getMessage()}); 00784 logger.warn(msg); 00785 throw new SQLException(msg); 00786 } 00787 00788 // Notify scheduler of completion 00789 scheduler.readCompleted(spanOnAllTables); 00790 00791 return result; 00792 } |
|
org.objectweb.cjdbc.controller.requestmanager.RequestManagerを再定義しています。 DistributedRequestManager.java の 316 行で定義されています。 参照先 org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager.execDistributedWriteRequest(), と org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager.writeTransactions.
00317 { 00318 if (!request.isAutoCommit()) 00319 { // Add this transaction to the list of write transactions 00320 Long lTid = new Long(request.getTransactionId()); 00321 synchronized (writeTransactions) 00322 { 00323 if (!writeTransactions.contains(lTid)) 00324 writeTransactions.add(lTid); 00325 } 00326 } 00327 return execDistributedWriteRequest(request); 00328 } |
|
org.objectweb.cjdbc.controller.requestmanager.RequestManagerを再定義しています。 DistributedRequestManager.java の 333 行で定義されています。 参照先 org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager.execDistributedWriteRequestWithKeys(), と org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager.writeTransactions.
00335 { 00336 if (!request.isAutoCommit()) 00337 { // Add this transaction to the list of write transactions 00338 Long lTid = new Long(request.getTransactionId()); 00339 synchronized (writeTransactions) 00340 { 00341 if (!writeTransactions.contains(lTid)) 00342 writeTransactions.add(lTid); 00343 } 00344 } 00345 return execDistributedWriteRequestWithKeys(request); 00346 } |
|
org.objectweb.cjdbc.controller.requestmanager.RequestManagerを再定義しています。 DistributedRequestManager.java の 351 行で定義されています。 参照先 org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager.execDistributedWriteStoredProcedure(), と org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager.writeTransactions.
00352 { 00353 if (!proc.isAutoCommit()) 00354 { // Add this transaction to the list of write transactions 00355 Long lTid = new Long(proc.getTransactionId()); 00356 synchronized (writeTransactions) 00357 { 00358 if (!writeTransactions.contains(lTid)) 00359 writeTransactions.add(lTid); 00360 } 00361 } 00362 return execDistributedWriteStoredProcedure(proc); 00363 } |
|
Get the
RequestManager.java の 1583 行で定義されています。 参照元 org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseMetaData.getDatabaseSchema(), org.objectweb.cjdbc.controller.cache.parsing.ParsingCache.getParsingFromCacheAndParseIfMissing(), org.objectweb.cjdbc.controller.virtualdatabase.protocol.VirtualDatabaseConfiguration.isCompatible(), と org.objectweb.cjdbc.controller.virtualdatabase.protocol.VirtualDatabaseConfiguration.VirtualDatabaseConfiguration().
01584 { 01585 return dbs; 01586 } |
|
Get the Request Load Balancer used in this Request Controller.
RequestManager.java の 1593 行で定義されています。 参照元 org.objectweb.cjdbc.controller.virtualdatabase.protocol.VirtualDatabaseConfiguration.isCompatible(), と org.objectweb.cjdbc.controller.virtualdatabase.protocol.VirtualDatabaseConfiguration.VirtualDatabaseConfiguration().
01594 { 01595 return loadBalancer; 01596 } |
|
Get the trace logger of this DistributedRequestManager
DistributedRequestManager.java の 107 行で定義されています。
00108 { 00109 return logger; 00110 } |
|
Returns the metadataCache value.
RequestManager.java の 1627 行で定義されています。
01628 {
01629 return metadataCache;
01630 }
|
|
Returns the Recovery Log Manager.
RequestManager.java の 1660 行で定義されています。
01661 { 01662 return recoveryLog; 01663 } |
|
RequestManager.java の 1729 行で定義されています。 参照元 org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.run().
01730 { 01731 return requiredGranularity; 01732 } |
|
Get the result cache (if any) used in this Request Manager.
RequestManager.java の 1617 行で定義されています。
01618 { 01619 return requestCache; 01620 } |
|
Get the Request Scheduler (if any) used in this Request Controller.
RequestManager.java の 1694 行で定義されています。 参照元 org.objectweb.cjdbc.controller.virtualdatabase.protocol.VirtualDatabaseConfiguration.isCompatible(), と org.objectweb.cjdbc.controller.virtualdatabase.protocol.VirtualDatabaseConfiguration.VirtualDatabaseConfiguration().
01695 { 01696 return scheduler; 01697 } |
|
Get the TransactionMarkerMetaData for the given transaction id.
RequestManager.java の 953 行で定義されています。 参照先 org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData.setTimeout().
00955 { 00956 TransactionMarkerMetaData tm = (TransactionMarkerMetaData) tidLoginTable 00957 .get(tid); 00958 00959 if (tm == null) 00960 throw new SQLException(Translate.get("transaction.marker.not.found", "" 00961 + tid)); 00962 00963 tm.setTimeout(commitTimeout); 00964 return tm; 00965 } |
|
Returns the vdb value.
org.objectweb.cjdbc.controller.requestmanager.RequestManagerを再定義しています。 DistributedRequestManager.java の 117 行で定義されています。 参照先 org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager.dvdb.
00118 { 00119 return dvdb; 00120 } |
|
Get xml information about this Request Manager
RequestManager.java の 1743 行で定義されています。
01744 { 01745 StringBuffer info = new StringBuffer(); 01746 info.append("<" + DatabasesXmlTags.ELT_RequestManager + " " 01747 + DatabasesXmlTags.ATT_caseSensitiveParsing + "=\"" 01748 + isCaseSensitiveParsing + "\" " + DatabasesXmlTags.ATT_beginTimeout 01749 + "=\"" + beginTimeout / 1000 + "\" " 01750 + DatabasesXmlTags.ATT_commitTimeout + "=\"" + commitTimeout / 1000 01751 + "\" " + DatabasesXmlTags.ATT_rollbackTimeout + "=\"" 01752 + rollbackTimeout / 1000 + "\">"); 01753 if (scheduler != null) 01754 info.append(scheduler.getXml()); 01755 if (parsingCache != null) 01756 info.append(parsingCache.getXml()); 01757 if (requestCache != null) 01758 info.append(requestCache.getXml()); 01759 if (loadBalancer != null) 01760 info.append(loadBalancer.getXml()); 01761 if (recoveryLog != null) 01762 info.append(this.recoveryLog.getXml()); 01763 info.append("</" + DatabasesXmlTags.ELT_RequestManager + ">"); 01764 return info.toString(); 01765 } |
|
Replace all SQL macros with an instanciated value: - NOW() is replaced with a current timestamp. - RAND() is replaced with a random value
RequestManager.java の 270 行で定義されています。
00272 { 00273 String sql = MacrosHandler.macroNow(originalSql, timestampResolution); 00274 sql = MacrosHandler.macroCurrentTimestamp(sql, timestampResolution); 00275 00276 if (handleOnlyNow) 00277 return sql; 00278 00279 return MacrosHandler.macroRand(sql); 00280 } |
|
Send the given query to the load balancer.
RequestManager.java の 597 行で定義されています。
00599 { 00600 int result; 00601 try 00602 { // Send the request to the load balancer 00603 if (request.isUpdate() && (requestCache != null)) 00604 { // Try the optimization if we try to update values that are already 00605 // up-to-date. 00606 if (!requestCache.isUpdateNecessary((UpdateRequest) request)) 00607 { 00608 scheduler.writeCompleted(request); 00609 return 0; 00610 } 00611 } 00612 return loadBalancer.execWriteRequest(request); 00613 } 00614 catch (Exception failed) 00615 { 00616 scheduler.writeCompleted(request); 00617 String msg = Translate.get("requestmanager.request.failed", new String[]{ 00618 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00619 failed.getMessage()}); 00620 if (failed instanceof RuntimeException) 00621 logger.warn(msg, failed); 00622 else 00623 logger.warn(msg); 00624 if (failed instanceof AllBackendsFailedException) 00625 throw (AllBackendsFailedException) failed; 00626 else 00627 throw new SQLException(msg); 00628 } 00629 } |
|
Send the given query to the load balancer.
RequestManager.java の 561 行で定義されています。
00564 { 00565 int result; 00566 try 00567 { // Send the request to the load balancer 00568 return new ControllerResultSet(request, loadBalancer 00569 .execWriteRequestWithKeys(request), metadataCache); 00570 } 00571 catch (Exception failed) 00572 { 00573 scheduler.writeCompleted(request); 00574 String msg = Translate.get("requestmanager.request.failed", new String[]{ 00575 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00576 failed.getMessage()}); 00577 if (failed instanceof RuntimeException) 00578 logger.warn(msg, failed); 00579 else 00580 logger.warn(msg); 00581 if (failed instanceof AllBackendsFailedException) 00582 throw (AllBackendsFailedException) failed; 00583 else 00584 throw new SQLException(msg); 00585 } 00586 } |
|
Merge the given schema with the existing database schema.
RequestManager.java の 1552 行で定義されています。
01553 { 01554 try 01555 { 01556 if (dbs == null) 01557 setDatabaseSchema(new DatabaseSchema(backendSchema), false); 01558 else 01559 { 01560 dbs.mergeSchema(backendSchema); 01561 logger.info(Translate 01562 .get("requestmanager.schema.virtualdatabase.merged.new")); 01563 01564 if (schedulerParsingranularity != ParsingGranularities.NO_PARSING) 01565 scheduler.mergeDatabaseSchema(dbs); 01566 01567 if (cacheParsingranularity != ParsingGranularities.NO_PARSING) 01568 requestCache.mergeDatabaseSchema(dbs); 01569 } 01570 } 01571 catch (SQLException e) 01572 { 01573 logger.error(Translate.get("requestmanager.schema.merge.failed", e 01574 .getMessage())); 01575 } 01576 } |
|
Remove a checkpoint and corresponding entries from the log table
RequestManager.java の 1347 行で定義されています。
01348 { 01349 recoveryLog.removeCheckpoint(checkpointName); 01350 } |
|
Recopy all the data of a previous dump recorded by octopus into the named backend. This disables the backend and leave it disable after recovery process. The user has to call the
RequestManager.java の 1141 行で定義されています。 参照先 org.objectweb.cjdbc.controller.core.backup.Octopus.restore().
01143 { 01144 01145 try 01146 { 01147 // no check for disable as we are going to overwrite 01148 // all the database data 01149 if (db.isReadEnabled()) 01150 loadBalancer.disableBackend(db); 01151 // Execute Recovery with octopus 01152 Octopus octopus = new Octopus(db, checkpointName); 01153 octopus.restore(); 01154 } 01155 catch (SQLException e1) 01156 { 01157 // This comes from the loadbalancer 01158 throw new BackupException(ExceptionTypes.BACKEND_CANNOT_BE_DISABLED); 01159 } 01160 catch (OctopusException e) 01161 { 01162 logger.error(Translate.get("controller.octopus.recovery.failed", e)); 01163 throw e; 01164 } 01165 catch (BackupException be) 01166 { 01167 logger.error(Translate.get("controller.backup.recovery.failed", be)); 01168 throw be; 01169 } 01170 finally 01171 { 01172 logger.info(Translate 01173 .get("controller.backup.recovery.done", db.getName())); 01174 } 01175 } |
|
org.objectweb.cjdbc.controller.requestmanager.RequestManagerを再定義しています。 DistributedRequestManager.java の 298 行で定義されています。 参照先 org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager.distributedRollback(), と org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager.writeTransactions.
00299 { 00300 Long lTid = new Long(transactionId); 00301 boolean isAWriteTransaction; 00302 synchronized (writeTransactions) 00303 { 00304 isAWriteTransaction = writeTransactions.remove(lTid); 00305 } 00306 if (isAWriteTransaction) 00307 distributedRollback(transactionId); 00308 else 00309 // read-only transaction, it is local 00310 super.rollback(transactionId); 00311 } |
|
Schedule a request for execution.
RequestManager.java の 483 行で定義されています。
00485 { 00486 // Sanity check 00487 if (!request.isAutoCommit()) 00488 { // Check that the transaction has been started 00489 long tid = request.getTransactionId(); 00490 if (!tidLoginTable.containsKey(new Long(tid))) 00491 throw new SQLException(Translate.get("transaction.not.started", tid)); 00492 } 00493 00494 // If we need to parse the request, try to get the parsing from the cache. 00495 // Note that if we have a cache miss but backgroundParsing has been turned 00496 // on, then this call will start a ParsedThread in background. 00497 if ((requiredGranularity != ParsingGranularities.NO_PARSING) 00498 && (!request.isParsed())) 00499 { 00500 if (parsingCache == null) 00501 request.parse(dbs, requiredGranularity, isCaseSensitiveParsing); 00502 else 00503 parsingCache.getParsingFromCache(request); 00504 } 00505 00506 // 00507 // SCHEDULER 00508 // 00509 00510 // Get the parsing now if the request is not yet parsed. The parsing is 00511 // handled by the ParsingCache that may already have parsed the request 00512 // in background (if backgroundParsing is set). 00513 if ((schedulerParsingranularity != ParsingGranularities.NO_PARSING) 00514 && !request.isParsed()) 00515 { 00516 if (parsingCache == null) 00517 request.parse(dbs, requiredGranularity, isCaseSensitiveParsing); 00518 else 00519 parsingCache.getParsingFromCacheAndParseIfMissing(request); 00520 } 00521 00522 // Wait for the scheduler to give us the authorization to execute 00523 try 00524 { 00525 scheduler.scheduleWriteRequest(request); 00526 } 00527 catch (RollbackException e) 00528 { // Something bad happened and we need to rollback this transaction 00529 rollback(request.getTransactionId()); 00530 throw new SQLException(e.getMessage()); 00531 } 00532 00533 if (logger.isDebugEnabled()) 00534 logger.debug(Translate.get("requestmanager.write.request", new String[]{ 00535 String.valueOf(request.getId()), 00536 request.getSQLShortForm(vdb.getSQLShortFormLength())})); 00537 00538 // If we have a parsing cache miss, wait for the parsing to be done if 00539 // needed. Note that even if the cache was the only one to require parsing, 00540 // we wait for the parsing result here, because if it fails, we must not 00541 // execute the query. 00542 if ((requiredGranularity != ParsingGranularities.NO_PARSING) 00543 && !request.isParsed()) 00544 { 00545 if (parsingCache == null) 00546 request.parse(dbs, requiredGranularity, isCaseSensitiveParsing); 00547 else 00548 parsingCache.getParsingFromCacheAndParseIfMissing(request); 00549 } 00550 } |
|
Sets the parsing case sensitivity. If true the request are parsed in a case sensitive way (table/column name must match exactly the case of the names fetched from the database or enforced by a static schema).
RequestManager.java の 1719 行で定義されています。 参照元 org.objectweb.cjdbc.controller.xml.DatabasesParser.endElement().
01720 { 01721 this.isCaseSensitiveParsing = isCaseSensitiveParsing; 01722 if (parsingCache != null) 01723 parsingCache.setCaseSensitiveParsing(isCaseSensitiveParsing); 01724 } |
|
Sets the controller identifier value (this id must be unique).
DistributedRequestManager.java の 213 行で定義されています。 参照先 org.objectweb.cjdbc.common.log.Trace.error().
00214 { 00215 if (id > 0xffff) 00216 { 00217 String msg = "Out of range controller id (" + id + ")"; 00218 logger.error(msg); 00219 throw new RuntimeException(msg); 00220 } 00221 this.controllerId = (0x000000000000ffff & id) << 12; 00222 } |
|
Sets the
RequestManager.java の 1517 行で定義されています。
01518 { 01519 if (schemaIsStatic) 01520 { 01521 if (isStatic) 01522 { 01523 logger.warn(Translate 01524 .get("requestmanager.schema.replace.static.with.new")); 01525 this.dbs = schema; 01526 } 01527 else 01528 logger.info(Translate.get("requestmanager.schema.ignore.new.dynamic")); 01529 } 01530 else 01531 { 01532 schemaIsStatic = isStatic; 01533 this.dbs = schema; 01534 logger.info(Translate 01535 .get("requestmanager.schema.set.new.virtualdatabase")); 01536 } 01537 01538 if (schedulerParsingranularity != ParsingGranularities.NO_PARSING) 01539 scheduler.setDatabaseSchema(dbs); 01540 01541 if (cacheParsingranularity != ParsingGranularities.NO_PARSING) 01542 requestCache.setDatabaseSchema(dbs); 01543 01544 // Load balancers do not have a specific database schema to update 01545 } |
|
Set the Request Load Balancer to use in this Request Controller.
RequestManager.java の 1603 行で定義されています。 参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getParsingGranularity().
01604 { 01605 this.loadBalancer = loadBalancer; 01606 loadBalancerParsingranularity = loadBalancer.getParsingGranularity(); 01607 if (loadBalancerParsingranularity > requiredGranularity) 01608 requiredGranularity = loadBalancerParsingranularity; 01609 } |
|
Sets the metadataCache value.
RequestManager.java の 1637 行で定義されています。 参照元 org.objectweb.cjdbc.controller.xml.DatabasesParser.endElement().
01638 {
01639 this.metadataCache = metadataCache;
01640 }
|
|
Sets the ParsingCache.
RequestManager.java の 1647 行で定義されています。 参照先 org.objectweb.cjdbc.controller.cache.parsing.ParsingCache.setCaseSensitiveParsing(), org.objectweb.cjdbc.controller.cache.parsing.ParsingCache.setGranularity(), と org.objectweb.cjdbc.controller.cache.parsing.ParsingCache.setRequestManager(). 参照元 org.objectweb.cjdbc.controller.xml.DatabasesParser.endElement().
01648 { 01649 parsingCache.setRequestManager(this); 01650 parsingCache.setGranularity(requiredGranularity); 01651 parsingCache.setCaseSensitiveParsing(isCaseSensitiveParsing); 01652 this.parsingCache = parsingCache; 01653 } |
|
Sets the Recovery Log Manager.
RequestManager.java の 1670 行で定義されています。
01671 {
01672 this.recoveryLog = recoveryLog;
01673 }
|
|
Set the Request Cache to use in this Request Controller.
RequestManager.java の 1680 行で定義されています。 参照先 org.objectweb.cjdbc.controller.cache.result.AbstractResultCache.getParsingGranularity().
01681 { 01682 requestCache = cache; 01683 cacheParsingranularity = cache.getParsingGranularity(); 01684 if (cacheParsingranularity > requiredGranularity) 01685 requiredGranularity = cacheParsingranularity; 01686 } |
|
Set the Request Scheduler to use in this Request Controller.
RequestManager.java の 1704 行で定義されています。 参照先 org.objectweb.cjdbc.controller.scheduler.AbstractScheduler.getParsingGranularity().
01705 { 01706 this.scheduler = scheduler; 01707 schedulerParsingranularity = scheduler.getParsingGranularity(); 01708 if (schedulerParsingranularity > requiredGranularity) 01709 requiredGranularity = schedulerParsingranularity; 01710 } |
|
Store all the backends checkpoint in the recoverylog
RequestManager.java の 1474 行で定義されています。 参照先 org.objectweb.cjdbc.controller.backend.DatabaseBackend.getLastKnownCheckpoint(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.getName().
01475 { 01476 if (recoveryLog == null) 01477 return; 01478 int size = backends.size(); 01479 DatabaseBackend backend; 01480 for (int i = 0; i < size; i++) 01481 { 01482 backend = (DatabaseBackend) backends.get(i); 01483 try 01484 { 01485 recoveryLog.storeBackendCheckpoint(databaseName, backend.getName(), 01486 backend.getLastKnownCheckpoint()); 01487 } 01488 catch (SQLException e) 01489 { 01490 logger.error(Translate.get("recovery.store.checkpoint.failed", 01491 new String[]{backend.getName(), e.getMessage()})); 01492 } 01493 } 01494 } |
|
Update the cache, notify the recovery log and finally the scheduler. It is possible to disable scheduler notifications (unless an error occurs in which case the scheduler is always notified). This is especially useful for distributed schedulers when all backends failed at one controller but we have to wait for the confirmation that all other controllers failed. This piece of code is then generic and reusable.
RequestManager.java の 643 行で定義されています。 参照先 org.objectweb.cjdbc.common.sql.AbstractWriteRequest.getTableName().
00645 { 00646 try 00647 { // Notify cache if any 00648 if (requestCache != null) 00649 { // Update cache 00650 requestCache.writeNotify(request); 00651 } 00652 00653 // Log the request 00654 if (recoveryLog != null) 00655 recoveryLog.logRequest(request); 00656 00657 // Update the schema if needed 00658 if (requiredGranularity != ParsingGranularities.NO_PARSING) 00659 { 00660 if (request.isCreate()) 00661 { // Add the table to the schema 00662 dbs.addTable(((CreateRequest) request).getDatabaseTable()); 00663 if (logger.isDebugEnabled()) 00664 logger.debug(Translate.get("requestmanager.schema.add.table", 00665 request.getTableName())); 00666 } 00667 else if (request.isDrop()) 00668 { // Delete the table from the schema 00669 dbs.removeTable(dbs.getTable(request.getTableName())); 00670 if (logger.isDebugEnabled()) 00671 logger.debug(Translate.get("requestmanager.schema.remove.table", 00672 request.getTableName())); 00673 } 00674 } 00675 00676 // Notify scheduler 00677 if (notifyScheduler) 00678 scheduler.writeCompleted(request); 00679 00680 } 00681 catch (Exception failed) 00682 { 00683 scheduler.writeCompleted(request); 00684 String msg = Translate.get("requestmanager.request.failed", new String[]{ 00685 request.getSQLShortForm(vdb.getSQLShortFormLength()), 00686 failed.getMessage()}); 00687 if (failed instanceof RuntimeException) 00688 logger.warn(msg, failed); 00689 else 00690 logger.warn(msg); 00691 throw new SQLException(msg); 00692 } 00693 } |
|
begin timeout in ms RequestManager.java の 93 行で定義されています。 参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.RequestManager(). |
|
commit timeout in ms RequestManager.java の 95 行で定義されています。 参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.RequestManager(). |
|
RequestManager.java の 115 行で定義されています。 |
|
|
The request load balancer to use to send requests to the databases RequestManager.java の 109 行で定義されています。 |
|
RequestManager.java の 130 行で定義されています。 |
|
RequestManager.java の 118 行で定義されています。 |
|
|
An optional request cache to cache responses to SQL requests RequestManager.java の 106 行で定義されています。 参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.RequestManager(). |
|
RequestManager.java の 126 行で定義されています。 参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.RequestManager(). |
|
rollback timeout in ms RequestManager.java の 97 行で定義されています。 参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.RequestManager(). |
|
The request scheduler to order and schedule requests RequestManager.java の 103 行で定義されています。 参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.assignAndCheckSchedulerLoadBalancerValidity(), と org.objectweb.cjdbc.controller.requestmanager.RequestManager.RequestManager(). |
|
RequestManager.java の 123 行で定義されています。 |
|
Transaction id/Login mapping RequestManager.java の 128 行で定義されています。 |
|
The virtual database owning this Request Manager RequestManager.java の 100 行で定義されています。 |