00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.requestmanager.distributed;
00026
00027 import java.sql.SQLException;
00028 import java.util.ArrayList;
00029 import java.util.Vector;
00030
00031 import javax.management.NotCompliantMBeanException;
00032
00033 import org.objectweb.cjdbc.common.exceptions.NoMoreBackendException;
00034 import org.objectweb.cjdbc.common.exceptions.VirtualDatabaseException;
00035 import org.objectweb.cjdbc.common.i18n.Translate;
00036 import org.objectweb.cjdbc.common.log.Trace;
00037 import org.objectweb.cjdbc.common.shared.BackendInfo;
00038 import org.objectweb.cjdbc.common.sql.AbstractRequest;
00039 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00040 import org.objectweb.cjdbc.common.sql.SelectRequest;
00041 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00042 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00043 import org.objectweb.cjdbc.controller.cache.result.AbstractResultCache;
00044 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00045 import org.objectweb.cjdbc.controller.recoverylog.AbstractRecoveryLog;
00046 import org.objectweb.cjdbc.controller.requestmanager.RequestManager;
00047 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00048 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00049 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00050 import org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase;
00051 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00052 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CJDBCGroupMessage;
00053 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.EnableBackend;
00054 import org.objectweb.tribe.adapters.MulticastRequestAdapter;
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068 public abstract class DistributedRequestManager extends RequestManager
00069 {
00070 protected DistributedVirtualDatabase dvdb;
00071
00072 private Vector failedOnAllBackends;
00073
00074 private long controllerId;
00075
00076 private ArrayList writeTransactions;
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094 public DistributedRequestManager(DistributedVirtualDatabase vdb,
00095 AbstractScheduler scheduler, AbstractResultCache cache,
00096 AbstractLoadBalancer loadBalancer, AbstractRecoveryLog recoveryLog,
00097 long beginTimeout, long commitTimeout, long rollbackTimeout)
00098 throws SQLException, NotCompliantMBeanException
00099 {
00100 super(vdb, scheduler, cache, loadBalancer, recoveryLog, beginTimeout,
00101 commitTimeout, rollbackTimeout);
00102 dvdb = vdb;
00103 failedOnAllBackends = new Vector();
00104 writeTransactions = new ArrayList();
00105 }
00106
00107
00108
00109
00110
00111
00112 public long getControllerId()
00113 {
00114 return controllerId;
00115 }
00116
00117
00118
00119
00120
00121
00122 public Trace getLogger()
00123 {
00124 return logger;
00125 }
00126
00127
00128
00129
00130
00131
00132 public VirtualDatabase getVirtualDatabase()
00133 {
00134 return dvdb;
00135 }
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151 public void enableBackend(DatabaseBackend db) throws SQLException
00152 {
00153 int size = dvdb.getAllMemberButUs().size();
00154 if (size > 0)
00155 {
00156 logger.debug(Translate
00157 .get("virtualdatabase.distributed.enable.backend.check"));
00158
00159 try
00160 {
00161
00162
00163 dvdb.getMulticastRequestAdapter().multicastMessage(
00164 dvdb.getAllMemberButUs(), new EnableBackend(new BackendInfo(db)),
00165 MulticastRequestAdapter.WAIT_NONE,
00166 CJDBCGroupMessage.defaultCastTimeOut);
00167 }
00168 catch (Exception e)
00169 {
00170 String msg = "Error while enabling backend " + db.getName();
00171 logger.error(msg, e);
00172 throw new SQLException(msg + "(" + e + ")");
00173 }
00174 }
00175
00176 loadBalancer.enableBackend(db, true);
00177 vdb.logger.info("Database backend " + db.getName() + " enabled");
00178 }
00179
00180
00181
00182
00183
00184
00185 public void addFailedOnAllBackends(AbstractRequest request)
00186 {
00187 failedOnAllBackends.add(request);
00188 }
00189
00190
00191
00192
00193
00194
00195
00196
00197 public void completeFailedOnAllBackends(AbstractRequest request,
00198 boolean success)
00199 {
00200 if (!failedOnAllBackends.remove(request))
00201 {
00202 logger.warn("Unable to find request "
00203 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00204 + " in list of requests that failed on all backends.");
00205 return;
00206 }
00207 if (success)
00208 {
00209 logger
00210 .error("Request "
00211 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00212 + " failed on all local backends but succeeded on other controllers. Disabling all local backends.");
00213 try
00214 {
00215 dvdb.disableAllBackends();
00216 }
00217 catch (VirtualDatabaseException e)
00218 {
00219 logger.error("An error occured while disabling all backends", e);
00220 }
00221 }
00222 else
00223
00224
00225 scheduler.notifyWriteCompleted((AbstractWriteRequest) request);
00226 }
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237 public void setControllerId(int id)
00238 {
00239 if (id > 0xffff)
00240 {
00241 String msg = "Out of range controller id (" + id + ")";
00242 logger.error(msg);
00243 throw new RuntimeException(msg);
00244 }
00245 this.controllerId = (0x000000000000ffff & id) << 12;
00246 }
00247
00248
00249
00250
00251 public long begin(String login) throws SQLException
00252 {
00253 try
00254 {
00255 TransactionMarkerMetaData tm = new TransactionMarkerMetaData(0,
00256 beginTimeout, login);
00257
00258
00259 long tid = scheduler.begin(tm);
00260
00261
00262 tid = tid & 0x0000ffffffffffffL;
00263 tid = tid | controllerId;
00264 tm.setTransactionId(tid);
00265
00266 if (logger.isDebugEnabled())
00267 logger.debug(Translate.get("transaction.begin", String.valueOf(tid)));
00268
00269 try
00270 {
00271
00272 loadBalancer.begin(tm);
00273
00274
00275 if (recoveryLog != null)
00276 {
00277 recoveryLog.begin(tm);
00278 }
00279 }
00280 catch (SQLException e)
00281 {
00282 throw e;
00283 }
00284 finally
00285 {
00286
00287 scheduler.beginCompleted(tid);
00288 }
00289
00290 tidLoginTable.put(new Long(tid), tm);
00291 return tid;
00292 }
00293 catch (RuntimeException e)
00294 {
00295 logger.fatal(Translate.get(
00296 "fatal.runtime.exception.requestmanager.begin", e));
00297 throw new SQLException(e.getMessage());
00298 }
00299 }
00300
00301
00302
00303
00304 public void commit(long transactionId) throws SQLException
00305 {
00306 Long lTid = new Long(transactionId);
00307 boolean isAWriteTransaction;
00308 synchronized (writeTransactions)
00309 {
00310 isAWriteTransaction = writeTransactions.remove(lTid);
00311 }
00312 if (isAWriteTransaction)
00313 distributedCommit(transactionId);
00314 else
00315
00316 super.commit(transactionId);
00317 }
00318
00319
00320
00321
00322 public void rollback(long transactionId) throws SQLException
00323 {
00324 Long lTid = new Long(transactionId);
00325 boolean isAWriteTransaction;
00326 synchronized (writeTransactions)
00327 {
00328 isAWriteTransaction = writeTransactions.remove(lTid);
00329 }
00330 if (isAWriteTransaction)
00331 distributedRollback(transactionId);
00332 else
00333
00334 super.rollback(transactionId);
00335 }
00336
00337
00338
00339
00340 public void scheduleExecWriteRequest(AbstractWriteRequest request)
00341 throws SQLException
00342 {
00343 lazyTransactionStart(request);
00344 super.scheduleExecWriteRequest(request);
00345 }
00346
00347
00348
00349
00350
00351
00352
00353
00354 public void lazyTransactionStart(AbstractRequest request) throws SQLException
00355 {
00356
00357
00358 if (!request.isAutoCommit())
00359 {
00360 long tid = request.getTransactionId();
00361 if ((tid & 0x0000ffffffffffffL) != controllerId)
00362 {
00363 if (!tidLoginTable.containsKey(new Long(tid)))
00364 {
00365 try
00366 {
00367 TransactionMarkerMetaData tm = new TransactionMarkerMetaData(0,
00368 beginTimeout, request.getLogin());
00369 tm.setTransactionId(tid);
00370
00371 if (logger.isDebugEnabled())
00372 logger.debug(Translate.get("transaction.begin.lazy", String
00373 .valueOf(tid)));
00374
00375 try
00376 {
00377 scheduler.begin(tm);
00378
00379
00380 loadBalancer.begin(tm);
00381
00382
00383 if (recoveryLog != null)
00384 {
00385 recoveryLog.begin(tm);
00386 }
00387 }
00388 catch (SQLException e)
00389 {
00390 throw e;
00391 }
00392 finally
00393 {
00394
00395 scheduler.beginCompleted(tid);
00396 }
00397
00398 tidLoginTable.put(new Long(tid), tm);
00399 }
00400 catch (RuntimeException e)
00401 {
00402 logger.fatal(Translate.get(
00403 "fatal.runtime.exception.requestmanager.begin", e));
00404 throw new SQLException(e.getMessage());
00405 }
00406 }
00407 }
00408 }
00409 }
00410
00411
00412
00413
00414 public ControllerResultSet execReadRequest(SelectRequest request)
00415 throws SQLException
00416 {
00417 try
00418 {
00419 return super.execReadRequest(request);
00420 }
00421 catch (SQLException e)
00422 {
00423 if (!(e instanceof NoMoreBackendException))
00424 throw e;
00425 }
00426
00427
00428 return execRemoteReadRequest(controllerId, request);
00429 }
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441 public abstract ControllerResultSet execRemoteReadRequest(
00442 long controllerUniqueId, SelectRequest request) throws SQLException;
00443
00444
00445
00446
00447 public int execWriteRequest(AbstractWriteRequest request) throws SQLException
00448 {
00449 if (!request.isAutoCommit())
00450 {
00451 Long lTid = new Long(request.getTransactionId());
00452 synchronized (writeTransactions)
00453 {
00454 if (!writeTransactions.contains(lTid))
00455 writeTransactions.add(lTid);
00456 }
00457 }
00458 return execDistributedWriteRequest(request);
00459 }
00460
00461
00462
00463
00464 public ControllerResultSet execWriteRequestWithKeys(
00465 AbstractWriteRequest request) throws SQLException
00466 {
00467 if (!request.isAutoCommit())
00468 {
00469 Long lTid = new Long(request.getTransactionId());
00470 synchronized (writeTransactions)
00471 {
00472 if (!writeTransactions.contains(lTid))
00473 writeTransactions.add(lTid);
00474 }
00475 }
00476 return execDistributedWriteRequestWithKeys(request);
00477 }
00478
00479
00480
00481
00482 public ControllerResultSet execReadStoredProcedure(StoredProcedure proc)
00483 throws SQLException
00484 {
00485
00486 if (proc.isReadOnly())
00487 return execDistributedReadStoredProcedureLocally(proc);
00488
00489 if (!proc.isAutoCommit())
00490 {
00491 Long lTid = new Long(proc.getTransactionId());
00492 synchronized (writeTransactions)
00493 {
00494 if (!writeTransactions.contains(lTid))
00495 writeTransactions.add(lTid);
00496 }
00497 }
00498 return execDistributedReadStoredProcedure(proc);
00499 }
00500
00501
00502
00503
00504 public int execWriteStoredProcedure(StoredProcedure proc) throws SQLException
00505 {
00506 if (!proc.isAutoCommit())
00507 {
00508 Long lTid = new Long(proc.getTransactionId());
00509 synchronized (writeTransactions)
00510 {
00511 if (!writeTransactions.contains(lTid))
00512 writeTransactions.add(lTid);
00513 }
00514 }
00515 return execDistributedWriteStoredProcedure(proc);
00516 }
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528 public abstract void distributedCommit(long transactionId)
00529 throws SQLException;
00530
00531
00532
00533
00534
00535
00536
00537 public abstract void distributedRollback(long transactionId)
00538 throws SQLException;
00539
00540
00541
00542
00543
00544
00545
00546
00547 public abstract int execDistributedWriteRequest(AbstractWriteRequest request)
00548 throws SQLException;
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558 public abstract ControllerResultSet execDistributedWriteRequestWithKeys(
00559 AbstractWriteRequest request) throws SQLException;
00560
00561
00562
00563
00564
00565
00566
00567
00568 public abstract ControllerResultSet execDistributedReadStoredProcedure(
00569 StoredProcedure proc) throws SQLException;
00570
00571
00572
00573
00574
00575
00576
00577
00578 public abstract int execDistributedWriteStoredProcedure(StoredProcedure proc)
00579 throws SQLException;
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589 public ControllerResultSet execDistributedReadStoredProcedureLocally(
00590 StoredProcedure proc) throws SQLException
00591 {
00592 return super.execReadStoredProcedure(proc);
00593 }
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603 public int execDistributedWriteStoredProcedureLocally(StoredProcedure proc)
00604 throws SQLException
00605 {
00606 return super.execWriteStoredProcedure(proc);
00607 }
00608
00609 }