00001
00025
package org.objectweb.cjdbc.controller.requestmanager.distributed;
00026
00027
import java.sql.ResultSet;
00028
import java.sql.SQLException;
00029
import java.util.Vector;
00030
00031
import org.jgroups.Address;
00032
import org.jgroups.blocks.GroupRequest;
00033
import org.jgroups.util.RspList;
00034
import org.objectweb.cjdbc.common.i18n.Translate;
00035
import org.objectweb.cjdbc.common.sql.AbstractRequest;
00036
import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00037
import org.objectweb.cjdbc.common.sql.StoredProcedure;
00038
import org.objectweb.cjdbc.common.sql.UnknownRequest;
00039
import org.objectweb.cjdbc.controller.cache.result.AbstractResultCache;
00040
import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00041
import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00042
import org.objectweb.cjdbc.controller.recoverylog.AbstractRecoveryLog;
00043
import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00044
import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00045
import org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase;
00046
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CJDBCGroupMessage;
00047
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Commit;
00048
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecReadStoredProcedure;
00049
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecWriteRequest;
00050
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecWriteRequestWithKeys;
00051
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecWriteStoredProcedure;
00052
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.NotifyCompletion;
00053
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Rollback;
00054
00061 public class RAIDb2DistributedRequestManager extends DistributedRequestManager
00062 {
00063
00077 public RAIDb2DistributedRequestManager(
DistributedVirtualDatabase vdb,
00078
AbstractScheduler scheduler,
AbstractResultCache cache,
00079
AbstractLoadBalancer loadBalancer,
AbstractRecoveryLog recoveryLog,
00080
long beginTimeout,
long commitTimeout,
long rollbackTimeout)
00081
throws SQLException
00082 {
00083 super(vdb, scheduler, cache, loadBalancer, recoveryLog, beginTimeout,
00084 commitTimeout, rollbackTimeout);
00085 }
00086
00090 public int execDistributedWriteRequest(
AbstractWriteRequest request)
00091
throws SQLException
00092 {
00093
try
00094 {
00095
int execWriteRequestResult = -1;
00096
00097 Vector groupMembers = dvdb.
getCurrentView().getMembers();
00098
00099
if (logger.
isDebugEnabled())
00100 logger.
debug(
"Broadcasting request "
00101 + request.getSQLShortForm(dvdb.
getSQLShortFormLength())
00102 + (request.isAutoCommit() ?
"" :
" transaction "
00103 + request.getTransactionId()) +
") to all controllers ("
00104 + dvdb.
getChannel().getLocalAddress() +
"->"
00105 + groupMembers.toString() +
")");
00106
00107
00108
00109
00110 RspList responses = dvdb.
getDispatcher().castMessage(groupMembers,
00111
CJDBCGroupMessage.getMessage(
new ExecWriteRequest(request)),
00112 GroupRequest.GET_ALL, request.getTimeout());
00113
00114
if (logger.
isDebugEnabled())
00115 logger.
debug(
"Request "
00116 + request.getSQLShortForm(dvdb.
getSQLShortFormLength())
00117 +
" completed.");
00118
00119
if (responses.numSuspectedMembers() > 0)
00120 {
00121 logger.
warn(responses.numSuspectedMembers()
00122 +
" controller(s) died during execution of request "
00123 + request.getId());
00124 }
00125
00126
00127 Vector failedOnAllBackends = null;
00128 SQLException exception = null;
00129
int size = groupMembers.size();
00130
00131
for (
int i = 0; i < size; i++)
00132 {
00133 Address address = (Address) groupMembers.get(i);
00134
if (responses.isSuspected(address))
00135 {
00136 logger.
warn(
"Controller " + address +
" is suspected of failure.");
00137
continue;
00138 }
00139 Object r = responses.get(address);
00140
if (r instanceof Integer)
00141 {
00142
if (execWriteRequestResult == -1)
00143 execWriteRequestResult = ((Integer) r).intValue();
00144
else if (execWriteRequestResult != ((Integer) r).intValue())
00145 logger.
error(
"Controllers have different results for request "
00146 + request.getId());
00147 }
00148
else if (r instanceof
AllBackendsFailedException)
00149 {
00150
if (failedOnAllBackends == null)
00151 failedOnAllBackends =
new Vector();
00152 failedOnAllBackends.add(address);
00153
if (logger.
isDebugEnabled())
00154 logger.
debug(
"Request failed on all backends of controller "
00155 + address +
" (" + r +
")");
00156 }
00157
else if (r instanceof SQLException)
00158 {
00159 String msg =
"Request " + request.getId() +
" failed on controller "
00160 + address +
" (" + r +
")";
00161 logger.
warn(msg);
00162 exception = (SQLException) r;
00163 }
00164 }
00165
00166
if (failedOnAllBackends != null)
00167 {
00168 dvdb.
getDispatcher().castMessage(
00169 failedOnAllBackends,
00170
CJDBCGroupMessage.getMessage(
new NotifyCompletion(request,
00171 execWriteRequestResult != -1)), GroupRequest.GET_NONE,
00172 request.getTimeout());
00173 }
00174
00175
if (execWriteRequestResult != -1)
00176
return execWriteRequestResult;
00177
else if (exception != null)
00178
throw exception;
00179
00180 String msg =
"Request '" + request +
"' failed on all controllers";
00181 logger.
warn(msg);
00182
throw new SQLException(msg);
00183 }
00184
catch (SQLException e)
00185 {
00186 String msg =
Translate
00187 .get(
"loadbalancer.request.failed",
new String[]{
00188 request.getSQLShortForm(vdb.
getSQLShortFormLength()),
00189 e.getMessage()});
00190 logger.
warn(msg);
00191
throw e;
00192 }
00193 }
00194
00198 public ControllerResultSet execDistributedWriteRequestWithKeys(
00199
AbstractWriteRequest request)
throws SQLException
00200 {
00201
try
00202 {
00203
ControllerResultSet execWriteRequestResult = null;
00204
00205 Vector groupMembers = dvdb.getCurrentView().getMembers();
00206
00207
if (logger.isDebugEnabled())
00208 logger.debug(
"Broadcasting request "
00209 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00210 + (request.isAutoCommit() ?
"" :
" transaction "
00211 + request.getTransactionId()) +
") to all controllers ("
00212 + dvdb.getChannel().getLocalAddress() +
"->"
00213 + groupMembers.toString() +
")");
00214
00215
00216
00217
00218 RspList responses = dvdb.getDispatcher().castMessage(groupMembers,
00219
CJDBCGroupMessage.getMessage(
new ExecWriteRequestWithKeys(request)),
00220 GroupRequest.GET_ALL, request.getTimeout());
00221
00222
if (logger.isDebugEnabled())
00223 logger.debug(
"Request "
00224 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00225 +
" completed.");
00226
00227
if (responses.numSuspectedMembers() > 0)
00228 {
00229 logger.warn(responses.numSuspectedMembers()
00230 +
" controller(s) died during execution of request "
00231 + request.getId());
00232 }
00233
00234
00235 Vector failedOnAllBackends = null;
00236 SQLException exception = null;
00237
int size = groupMembers.size();
00238
00239
for (
int i = 0; i < size; i++)
00240 {
00241 Address address = (Address) groupMembers.get(i);
00242
if (responses.isSuspected(address))
00243 {
00244 logger.warn(
"Controller " + address +
" is suspected of failure.");
00245
continue;
00246 }
00247 Object r = responses.get(address);
00248
if (r instanceof ResultSet)
00249 {
00250
if (execWriteRequestResult == null)
00251 execWriteRequestResult = (
ControllerResultSet) r;
00252 }
00253
else if (r instanceof
AllBackendsFailedException)
00254 {
00255
if (failedOnAllBackends == null)
00256 failedOnAllBackends =
new Vector();
00257 failedOnAllBackends.add(address);
00258
if (logger.isDebugEnabled())
00259 logger.debug(
"Request failed on all backends of controller "
00260 + address +
" (" + r +
")");
00261 }
00262
else if (r instanceof SQLException)
00263 {
00264 String msg =
"Request " + request.getId() +
" failed on controller "
00265 + address +
" (" + r +
")";
00266 logger.warn(msg);
00267 exception = (SQLException) r;
00268 }
00269 }
00270
00271
if (failedOnAllBackends != null)
00272 {
00273 dvdb.getDispatcher().castMessage(
00274 failedOnAllBackends,
00275
CJDBCGroupMessage.getMessage(
new NotifyCompletion(request,
00276 execWriteRequestResult != null)), GroupRequest.GET_NONE,
00277 request.getTimeout());
00278 }
00279
00280
if (execWriteRequestResult != null)
00281
return execWriteRequestResult;
00282
else if (exception != null)
00283
throw exception;
00284
00285 String msg =
"Request '" + request +
"' failed on all controllers";
00286 logger.warn(msg);
00287
throw new SQLException(msg);
00288 }
00289
catch (SQLException e)
00290 {
00291 String msg =
Translate
00292 .get(
"loadbalancer.request.failed",
new String[]{
00293 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00294 e.getMessage()});
00295 logger.warn(msg);
00296
throw e;
00297 }
00298 }
00299
00303 public ControllerResultSet execDistributedReadStoredProcedure(
00304
StoredProcedure proc)
throws SQLException
00305 {
00306
try
00307 {
00308
ControllerResultSet result = null;
00309
00310 Vector groupMembers = dvdb.getCurrentView().getMembers();
00311
00312
if (logger.isDebugEnabled())
00313 logger.debug(
"Broadcasting read stored procedure "
00314 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00315 + (proc.isAutoCommit() ?
"" :
" transaction "
00316 + proc.getTransactionId()) +
") to all controllers ("
00317 + dvdb.getChannel().getLocalAddress() +
"->"
00318 + groupMembers.toString() +
")");
00319
00320
00321
00322
00323 RspList responses = dvdb.getDispatcher().castMessage(groupMembers,
00324
CJDBCGroupMessage.getMessage(
new ExecReadStoredProcedure(proc)),
00325 GroupRequest.GET_ALL, proc.getTimeout());
00326
00327
if (logger.isDebugEnabled())
00328 logger.debug(
"Stored procedure "
00329 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00330 +
" completed.");
00331
00332
if (responses.numSuspectedMembers() > 0)
00333 {
00334 logger.warn(responses.numSuspectedMembers()
00335 +
" controller(s) died during execution of stored procedure "
00336 + proc.getId());
00337 }
00338
00339
00340 Vector failedOnAllBackends = null;
00341 SQLException exception = null;
00342
int size = groupMembers.size();
00343
00344
for (
int i = 0; i < size; i++)
00345 {
00346 Address address = (Address) groupMembers.get(i);
00347
if (responses.isSuspected(address))
00348 {
00349 logger.warn(
"Controller " + address +
" is suspected of failure.");
00350
continue;
00351 }
00352 Object r = responses.get(address);
00353
if (r instanceof
ControllerResultSet)
00354 {
00355
if (result == null)
00356 result = (
ControllerResultSet) r;
00357 }
00358
else if (r instanceof
AllBackendsFailedException)
00359 {
00360
if (failedOnAllBackends == null)
00361 failedOnAllBackends =
new Vector();
00362 failedOnAllBackends.add(address);
00363 }
00364
else if (r instanceof SQLException)
00365 {
00366 String msg =
"Stored procedure " + proc.getId()
00367 +
" failed on controller " + address +
" (" + r +
")";
00368 logger.warn(msg);
00369 exception = (SQLException) r;
00370 }
00371 }
00372
00373
if (failedOnAllBackends != null)
00374 {
00375 dvdb.getDispatcher().castMessage(
00376 failedOnAllBackends,
00377
CJDBCGroupMessage.getMessage(
new NotifyCompletion(proc,
00378 result != null)), GroupRequest.GET_NONE, proc.getTimeout());
00379 }
00380
00381
if (result != null)
00382 {
00383
if (logger.isDebugEnabled())
00384 logger.debug(
"Stored procedure " + proc.getId()
00385 +
" completed successfully.");
00386
return result;
00387 }
00388
else if (exception != null)
00389
throw exception;
00390
00391
00392 String msg =
"Stored procedure '" + proc +
"' failed on all controllers";
00393 logger.warn(msg);
00394
throw new SQLException(msg);
00395 }
00396
catch (SQLException e)
00397 {
00398 String msg =
Translate.get(
"loadbalancer.request.failed",
new String[]{
00399 proc.getSQLShortForm(vdb.getSQLShortFormLength()), e.getMessage()});
00400 logger.warn(msg);
00401
throw e;
00402 }
00403 }
00404
00408 public int execDistributedWriteStoredProcedure(
StoredProcedure proc)
00409
throws SQLException
00410 {
00411
try
00412 {
00413
int execWriteStoredProcedureResult = -1;
00414
00415 Vector groupMembers = dvdb.getCurrentView().getMembers();
00416
00417
if (logger.isDebugEnabled())
00418 logger.debug(
"Broadcasting stored procedure "
00419 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00420 + (proc.isAutoCommit() ?
"" :
" transaction "
00421 + proc.getTransactionId()) +
") to all controllers ("
00422 + dvdb.getChannel().getLocalAddress() +
"->"
00423 + groupMembers.toString() +
")");
00424
00425
00426
00427
00428 RspList responses = dvdb.getDispatcher().castMessage(groupMembers,
00429
CJDBCGroupMessage.getMessage(
new ExecWriteStoredProcedure(proc)),
00430 GroupRequest.GET_ALL, proc.getTimeout());
00431
00432
if (logger.isDebugEnabled())
00433 logger.debug(
"Stored procedure "
00434 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00435 +
" completed.");
00436
00437
if (responses.numSuspectedMembers() > 0)
00438 {
00439 logger.warn(responses.numSuspectedMembers()
00440 +
" controller(s) died during execution of stored procedure "
00441 + proc.getId());
00442 }
00443
00444
00445 Vector failedOnAllBackends = null;
00446 SQLException exception = null;
00447
int size = groupMembers.size();
00448
00449
for (
int i = 0; i < size; i++)
00450 {
00451 Address address = (Address) groupMembers.get(i);
00452
if (responses.isSuspected(address))
00453 {
00454 logger.warn(
"Controller " + address +
" is suspected of failure.");
00455
continue;
00456 }
00457 Object r = responses.get(address);
00458
if (r instanceof Integer)
00459 {
00460
if (execWriteStoredProcedureResult != -1)
00461 execWriteStoredProcedureResult = ((Integer) r).intValue();
00462 }
00463
else if (r instanceof
AllBackendsFailedException)
00464 {
00465
if (failedOnAllBackends == null)
00466 failedOnAllBackends =
new Vector();
00467 failedOnAllBackends.add(address);
00468 }
00469
else if (r instanceof SQLException)
00470 {
00471 String msg =
"Stored procedure " + proc.getId()
00472 +
" failed on controller " + address +
" (" + r +
")";
00473 logger.warn(msg);
00474 exception = (SQLException) r;
00475 }
00476 }
00477
00478
if (failedOnAllBackends != null)
00479 {
00480 dvdb.getDispatcher().castMessage(
00481 failedOnAllBackends,
00482
CJDBCGroupMessage.getMessage(
new NotifyCompletion(proc,
00483 execWriteStoredProcedureResult != -1)), GroupRequest.GET_NONE,
00484 proc.getTimeout());
00485 }
00486
00487
if (execWriteStoredProcedureResult != -1)
00488
return execWriteStoredProcedureResult;
00489
else if (exception != null)
00490
throw exception;
00491
00492
00493 String msg =
"Stored procedure '" + proc +
"' failed on all controllers";
00494 logger.warn(msg);
00495
throw new SQLException(msg);
00496 }
00497
catch (SQLException e)
00498 {
00499 String msg =
Translate.get(
"loadbalancer.request.failed",
new String[]{
00500 proc.getSQLShortForm(vdb.getSQLShortFormLength()), e.getMessage()});
00501 logger.warn(msg);
00502
throw e;
00503 }
00504 }
00505
00509 public void distributedCommit(
long transactionId)
throws SQLException
00510 {
00511
try
00512 {
00513 Vector groupMembers = dvdb.getCurrentView().getMembers();
00514
if (logger.isDebugEnabled())
00515 logger.debug(
"Broadcasting transaction " + transactionId
00516 +
" commit to all controllers ("
00517 + dvdb.getChannel().getLocalAddress() +
"->"
00518 + groupMembers.toString() +
")");
00519
00520
00521 RspList responses = dvdb.getDispatcher().castMessage(groupMembers,
00522
CJDBCGroupMessage.getMessage(
new Commit(transactionId)),
00523 GroupRequest.GET_ALL,
this.commitTimeout);
00524
00525
if (logger.isDebugEnabled())
00526 logger.debug(
"Commit of transaction " + transactionId +
" completed.");
00527
00528
if (responses.numSuspectedMembers() > 0)
00529 {
00530 logger.warn(responses.numSuspectedMembers()
00531 +
" controller(s) died during execution of commit for transaction "
00532 + transactionId);
00533 }
00534
00535
00536 Vector failedOnAllBackends = null;
00537 SQLException exception = null;
00538
int size = groupMembers.size();
00539
boolean success =
false;
00540
00541
for (
int i = 0; i < size; i++)
00542 {
00543 Address address = (Address) groupMembers.get(i);
00544
if (responses.isSuspected(address))
00545 {
00546 logger.warn(
"Controller " + address +
" is suspected of failure.");
00547
continue;
00548 }
00549 Object r = responses.get(address);
00550
if (r instanceof Boolean)
00551 {
00552
if (((Boolean) r).booleanValue())
00553 success =
true;
00554
else
00555 logger.error(
"Unexpected result for controller " + address);
00556 }
00557
else if (r instanceof
AllBackendsFailedException)
00558 {
00559
if (failedOnAllBackends == null)
00560 failedOnAllBackends =
new Vector();
00561 failedOnAllBackends.add(address);
00562
if (logger.isDebugEnabled())
00563 logger.debug(
"Commit failed on all backends of controller "
00564 + address +
" (" + r +
")");
00565 }
00566
else if (r instanceof SQLException)
00567 {
00568 String msg =
"Commit of transaction " + transactionId
00569 +
" failed on controller " + address +
" (" + r +
")";
00570 logger.warn(msg);
00571 exception = (SQLException) r;
00572 }
00573 }
00574
00575
if (failedOnAllBackends != null)
00576 {
00577
00578
AbstractRequest request =
new UnknownRequest(
"commit",
false, 0,
"\n");
00579 request.
setTransactionId(transactionId);
00580 dvdb.getDispatcher().castMessage(
00581 failedOnAllBackends,
00582
CJDBCGroupMessage
00583 .getMessage(
new NotifyCompletion(request, success)),
00584 GroupRequest.GET_NONE, commitTimeout);
00585 }
00586
00587
if (success)
00588
return;
00589
00590
if (exception != null)
00591
throw exception;
00592
00593 String msg =
"Transaction " + transactionId
00594 +
" failed to commit on all controllers";
00595 logger.warn(msg);
00596
throw new SQLException(msg);
00597 }
00598
catch (SQLException e)
00599 {
00600 String msg =
"Transaction " + transactionId +
" commit failed (" + e
00601 +
")";
00602 logger.warn(msg);
00603
throw e;
00604 }
00605 }
00606
00610 public void distributedRollback(
long transactionId)
throws SQLException
00611 {
00612
try
00613 {
00614 Vector groupMembers = dvdb.getCurrentView().getMembers();
00615
if (logger.isDebugEnabled())
00616 logger.debug(
"Broadcasting transaction " + transactionId
00617 +
" rollback to all controllers ("
00618 + dvdb.getChannel().getLocalAddress() +
"->"
00619 + groupMembers.toString() +
")");
00620
00621
00622 RspList responses = dvdb.getDispatcher().castMessage(groupMembers,
00623
CJDBCGroupMessage.getMessage(
new Rollback(transactionId)),
00624 GroupRequest.GET_ALL,
this.rollbackTimeout);
00625
00626
if (logger.isDebugEnabled())
00627 logger
00628 .debug(
"rollback of transaction " + transactionId +
" completed.");
00629
00630
if (responses.numSuspectedMembers() > 0)
00631 {
00632 logger
00633 .warn(responses.numSuspectedMembers()
00634 +
" controller(s) died during execution of rollback for transaction "
00635 + transactionId);
00636 }
00637
00638
00639 Vector failedOnAllBackends = null;
00640 SQLException exception = null;
00641
int size = groupMembers.size();
00642
boolean success =
false;
00643
00644
for (
int i = 0; i < size; i++)
00645 {
00646 Address address = (Address) groupMembers.get(i);
00647
if (responses.isSuspected(address))
00648 {
00649 logger.warn(
"Controller " + address +
" is suspected of failure.");
00650
continue;
00651 }
00652 Object r = responses.get(address);
00653
if (r instanceof Boolean)
00654 {
00655
if (((Boolean) r).booleanValue())
00656 success =
true;
00657
else
00658 logger.error(
"Unexpected result for controller " + address);
00659 }
00660
else if (r instanceof
AllBackendsFailedException)
00661 {
00662
if (failedOnAllBackends == null)
00663 failedOnAllBackends =
new Vector();
00664 failedOnAllBackends.add(address);
00665
if (logger.isDebugEnabled())
00666 logger.debug(
"rollback failed on all backends of controller "
00667 + address +
" (" + r +
")");
00668 }
00669
else if (r instanceof SQLException)
00670 {
00671 String msg =
"rollback of transaction " + transactionId
00672 +
" failed on controller " + address +
" (" + r +
")";
00673 logger.warn(msg);
00674 exception = (SQLException) r;
00675 }
00676 }
00677
00678
if (failedOnAllBackends != null)
00679 {
00680
00681
AbstractRequest request =
new UnknownRequest(
"rollback",
false, 0,
"\n");
00682 request.
setTransactionId(transactionId);
00683 dvdb.getDispatcher().castMessage(
00684 failedOnAllBackends,
00685
CJDBCGroupMessage
00686 .getMessage(
new NotifyCompletion(request, success)),
00687 GroupRequest.GET_NONE, rollbackTimeout);
00688 }
00689
00690
if (success)
00691
return;
00692
if (exception != null)
00693
throw exception;
00694
00695 String msg =
"Transaction " + transactionId
00696 +
" failed to rollback on all controllers";
00697 logger.warn(msg);
00698
throw new SQLException(msg);
00699 }
00700
catch (SQLException e)
00701 {
00702 String msg =
"Transaction " + transactionId +
" rollback failed (" + e
00703 +
")";
00704 logger.warn(msg);
00705
throw e;
00706 }
00707 }
00708
00709 }