00001
00025 package org.objectweb.cjdbc.controller.requestmanager.distributed;
00026
00027
import java.sql.SQLException;
00028
import java.util.ArrayList;
00029
import java.util.Vector;
00030
00031
import org.jgroups.blocks.GroupRequest;
00032
import org.objectweb.cjdbc.common.exceptions.VirtualDatabaseException;
00033
import org.objectweb.cjdbc.common.i18n.Translate;
00034
import org.objectweb.cjdbc.common.log.Trace;
00035
import org.objectweb.cjdbc.common.sql.AbstractRequest;
00036
import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00037
import org.objectweb.cjdbc.common.sql.StoredProcedure;
00038
import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00039
import org.objectweb.cjdbc.controller.cache.result.AbstractResultCache;
00040
import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00041
import org.objectweb.cjdbc.controller.recoverylog.AbstractRecoveryLog;
00042
import org.objectweb.cjdbc.controller.requestmanager.RequestManager;
00043
import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData;
00044
import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00045
import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00046
import org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase;
00047
import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00048
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CJDBCGroupMessage;
00049
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.EnableBackend;
00050
00063 public abstract class DistributedRequestManager extends RequestManager
00064 {
00065 protected DistributedVirtualDatabase
dvdb;
00067 private Vector
failedOnAllBackends;
00069 private long controllerId;
00071 private ArrayList
writeTransactions;
00072
00087 public DistributedRequestManager(DistributedVirtualDatabase vdb,
00088
AbstractScheduler scheduler,
AbstractResultCache cache,
00089
AbstractLoadBalancer loadBalancer,
AbstractRecoveryLog recoveryLog,
00090
long beginTimeout,
long commitTimeout,
long rollbackTimeout)
00091
throws SQLException
00092 {
00093 super(vdb, scheduler, cache, loadBalancer, recoveryLog, beginTimeout,
00094 commitTimeout, rollbackTimeout);
00095
dvdb = vdb;
00096
failedOnAllBackends =
new Vector();
00097
writeTransactions =
new ArrayList();
00098 }
00099
00105 public Trace getLogger()
00106 {
00107
return logger;
00108 }
00109
00115 public VirtualDatabase getVirtualDatabase()
00116 {
00117
return dvdb;
00118 }
00119
00120
00121
00122
00123
00134 public void enableBackend(
DatabaseBackend db)
throws SQLException
00135 {
00136
int size =
dvdb.getAllMemberButUs().size();
00137
if (size > 0)
00138 {
00139 logger.
debug(
Translate
00140 .get(
"virtualdatabase.distributed.enable.backend.check"));
00141
00142
00143
00144
dvdb.getDispatcher().castMessage(
dvdb.getAllMemberButUs(),
00145
CJDBCGroupMessage.getMessage(
new EnableBackend(db)),
00146 GroupRequest.GET_NONE,
CJDBCGroupMessage.defaultCastTimeOut);
00147 }
00148
00149 loadBalancer.
enableBackend(db,
true);
00150 vdb.
logger.
info(
"Database backend " + db.getName() +
" enabled");
00151 }
00152
00158 public void addFailedOnAllBackends(
AbstractRequest request)
00159 {
00160
failedOnAllBackends.add(request);
00161 }
00162
00170 public void completeFailedOnAllBackends(
AbstractRequest request,
00171
boolean success)
00172 {
00173
if (!
failedOnAllBackends.remove(request))
00174 {
00175 logger.
warn(
"Unable to find request "
00176 + request.
getSQLShortForm(
dvdb.getSQLShortFormLength())
00177 +
" in list of requests that failed on all backends.");
00178
return;
00179 }
00180
if (success)
00181 {
00182 logger
00183 .
error(
"Request "
00184 + request.
getSQLShortForm(
dvdb.getSQLShortFormLength())
00185 +
" failed on all local backends but succeeded on other controllers. Disabling all local backends.");
00186
try
00187 {
00188
dvdb.disableAllBackend();
00189 }
00190
catch (
VirtualDatabaseException e)
00191 {
00192 logger.
error(
"An error occured while disabling all backends", e);
00193 }
00194 }
00195 }
00196
00197
00198
00199
00200
00206 public void setControllerId(
int id)
00207 {
00208
if (
id > 0xffff)
00209 {
00210 String msg =
"Out of range controller id (" +
id +
")";
00211 logger.
error(msg);
00212
throw new RuntimeException(msg);
00213 }
00214
this.controllerId = (0x000000000000ffff &
id) << 12;
00215 }
00216
00220 public long begin(String login)
throws SQLException
00221 {
00222
try
00223 {
00224
TransactionMarkerMetaData tm =
new TransactionMarkerMetaData(0,
00225 beginTimeout, login);
00226
00227
00228
long tid = scheduler.
begin(tm);
00229
00230
00231 tid = tid & 0x0000ffffffffffffL;
00232 tid = tid |
controllerId;
00233 tm.
setTransactionId(tid);
00234
00235
if (logger.
isDebugEnabled())
00236 logger.
debug(
Translate.get(
"transaction.begin", String.valueOf(tid)));
00237
00238
try
00239 {
00240
00241 loadBalancer.
begin(tm);
00242
00243
00244
if (recoveryLog != null)
00245 {
00246 recoveryLog.
begin(tm);
00247 }
00248 }
00249
catch (SQLException e)
00250 {
00251
throw e;
00252 }
00253 finally
00254 {
00255
00256 scheduler.
beginCompleted(tid);
00257 }
00258
00259 tidLoginTable.put(
new Long(tid), tm);
00260
return tid;
00261 }
00262
catch (RuntimeException e)
00263 {
00264 logger.
fatal(
Translate.get(
00265
"fatal.runtime.exception.requestmanager.begin", e));
00266
throw new SQLException(e.getMessage());
00267 }
00268 }
00269
00273 public void commit(
long transactionId)
throws SQLException
00274 {
00275 Long lTid =
new Long(transactionId);
00276
boolean isAWriteTransaction;
00277
synchronized (
writeTransactions)
00278 {
00279 isAWriteTransaction =
writeTransactions.remove(lTid);
00280 }
00281
if (isAWriteTransaction)
00282
distributedCommit(transactionId);
00283
else
00284
00285 super.commit(transactionId);
00286 }
00287
00291 public void rollback(
long transactionId)
throws SQLException
00292 {
00293 Long lTid =
new Long(transactionId);
00294
boolean isAWriteTransaction;
00295
synchronized (
writeTransactions)
00296 {
00297 isAWriteTransaction =
writeTransactions.remove(lTid);
00298 }
00299
if (isAWriteTransaction)
00300
distributedRollback(transactionId);
00301
else
00302
00303 super.rollback(transactionId);
00304 }
00305
00309 public void scheduleExecWriteRequest(
AbstractWriteRequest request)
00310
throws SQLException
00311 {
00312
lazyTransactionStart(request);
00313 super.scheduleExecWriteRequest(request);
00314 }
00315
00323 public void lazyTransactionStart(
AbstractRequest request)
throws SQLException
00324 {
00325
00326
00327
if (!request.isAutoCommit())
00328 {
00329
long tid = request.getTransactionId();
00330
if ((tid & 0x0000ffffffffffffL) !=
controllerId)
00331 {
00332
if (!tidLoginTable.containsKey(
new Long(tid)))
00333 {
00334
try
00335 {
00336
TransactionMarkerMetaData tm =
new TransactionMarkerMetaData(0,
00337 beginTimeout, request.getLogin());
00338 tm.
setTransactionId(tid);
00339
00340
if (logger.
isDebugEnabled())
00341 logger.
debug(
Translate.get(
"transaction.begin", String
00342 .valueOf(tid)));
00343
00344
try
00345 {
00346 scheduler.
begin(tm);
00347
00348
00349 loadBalancer.
begin(tm);
00350
00351
00352
if (recoveryLog != null)
00353 {
00354 recoveryLog.
begin(tm);
00355 }
00356 }
00357
catch (SQLException e)
00358 {
00359
throw e;
00360 }
00361 finally
00362 {
00363
00364 scheduler.
beginCompleted(tid);
00365 }
00366
00367 tidLoginTable.put(
new Long(tid), tm);
00368 }
00369
catch (RuntimeException e)
00370 {
00371 logger.
fatal(
Translate.get(
00372
"fatal.runtime.exception.requestmanager.begin", e));
00373
throw new SQLException(e.getMessage());
00374 }
00375 }
00376 }
00377 }
00378 }
00379
00383 public int execWriteRequest(
AbstractWriteRequest request)
throws SQLException
00384 {
00385
if (!request.isAutoCommit())
00386 {
00387 Long lTid =
new Long(request.getTransactionId());
00388
synchronized (
writeTransactions)
00389 {
00390
if (!
writeTransactions.contains(lTid))
00391
writeTransactions.add(lTid);
00392 }
00393 }
00394
return execDistributedWriteRequest(request);
00395 }
00396
00400 public ControllerResultSet execWriteRequestWithKeys(
00401
AbstractWriteRequest request)
throws SQLException
00402 {
00403
if (!request.isAutoCommit())
00404 {
00405 Long lTid =
new Long(request.getTransactionId());
00406
synchronized (
writeTransactions)
00407 {
00408
if (!
writeTransactions.contains(lTid))
00409
writeTransactions.add(lTid);
00410 }
00411 }
00412
return execDistributedWriteRequestWithKeys(request);
00413 }
00414
00418 public ControllerResultSet execReadStoredProcedure(
StoredProcedure proc)
00419
throws SQLException
00420 {
00421
00422
if (proc.isReadOnly())
00423
return execDistributedReadStoredProcedureLocally(proc);
00424
00425
if (!proc.isAutoCommit())
00426 {
00427 Long lTid =
new Long(proc.getTransactionId());
00428
synchronized (
writeTransactions)
00429 {
00430
if (!
writeTransactions.contains(lTid))
00431
writeTransactions.add(lTid);
00432 }
00433 }
00434
return execDistributedReadStoredProcedure(proc);
00435 }
00436
00440 public int execWriteStoredProcedure(
StoredProcedure proc)
throws SQLException
00441 {
00442
if (!proc.isAutoCommit())
00443 {
00444 Long lTid =
new Long(proc.getTransactionId());
00445
synchronized (
writeTransactions)
00446 {
00447
if (!
writeTransactions.contains(lTid))
00448
writeTransactions.add(lTid);
00449 }
00450 }
00451
return execDistributedWriteStoredProcedure(proc);
00452 }
00453
00454
00455
00456
00457
00464
public abstract void distributedCommit(
long transactionId)
00465
throws SQLException;
00466
00473
public abstract void distributedRollback(
long transactionId)
00474
throws SQLException;
00475
00483
public abstract int execDistributedWriteRequest(
AbstractWriteRequest request)
00484
throws SQLException;
00485
00494
public abstract ControllerResultSet execDistributedWriteRequestWithKeys(
00495
AbstractWriteRequest request)
throws SQLException;
00496
00504
public abstract ControllerResultSet execDistributedReadStoredProcedure(
00505
StoredProcedure proc)
throws SQLException;
00506
00514
public abstract int execDistributedWriteStoredProcedure(
StoredProcedure proc)
00515
throws SQLException;
00516
00525 public ControllerResultSet execDistributedReadStoredProcedureLocally(
00526
StoredProcedure proc)
throws SQLException
00527 {
00528
return super.execReadStoredProcedure(proc);
00529 }
00530
00539 public int execDistributedWriteStoredProcedureLocally(
StoredProcedure proc)
00540
throws SQLException
00541 {
00542
return super.execWriteStoredProcedure(proc);
00543 }
00544
00545 }