00001
00025
package org.objectweb.cjdbc.controller.requestmanager.distributed;
00026
00027
import java.sql.ResultSet;
00028
import java.sql.SQLException;
00029
import java.util.Vector;
00030
00031
import org.jgroups.Address;
00032
import org.jgroups.blocks.GroupRequest;
00033
import org.jgroups.util.RspList;
00034
import org.objectweb.cjdbc.common.i18n.Translate;
00035
import org.objectweb.cjdbc.common.sql.AbstractRequest;
00036
import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00037
import org.objectweb.cjdbc.common.sql.StoredProcedure;
00038
import org.objectweb.cjdbc.common.sql.UnknownRequest;
00039
import org.objectweb.cjdbc.controller.cache.result.AbstractResultCache;
00040
import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00041
import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00042
import org.objectweb.cjdbc.controller.recoverylog.AbstractRecoveryLog;
00043
import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00044
import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00045
import org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase;
00046
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CJDBCGroupMessage;
00047
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Commit;
00048
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecReadStoredProcedure;
00049
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecWriteRequest;
00050
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecWriteRequestWithKeys;
00051
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecWriteStoredProcedure;
00052
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.NotifyCompletion;
00053
import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Rollback;
00054
00061 public class RAIDb1DistributedRequestManager extends DistributedRequestManager
00062 {
00063
00064 private static final int NO_RESULT = -5;
00065
00079 public RAIDb1DistributedRequestManager(
DistributedVirtualDatabase vdb,
00080
AbstractScheduler scheduler,
AbstractResultCache cache,
00081
AbstractLoadBalancer loadBalancer,
AbstractRecoveryLog recoveryLog,
00082
long beginTimeout,
long commitTimeout,
long rollbackTimeout)
00083
throws SQLException
00084 {
00085 super(vdb, scheduler, cache, loadBalancer, recoveryLog, beginTimeout,
00086 commitTimeout, rollbackTimeout);
00087 }
00088
00092 public int execDistributedWriteRequest(
AbstractWriteRequest request)
00093
throws SQLException
00094 {
00095
try
00096 {
00097
int execWriteRequestResult = -1;
00098
00099 Vector groupMembers = dvdb.
getCurrentView().getMembers();
00100
00101
if (logger.
isDebugEnabled())
00102 logger.
debug(
"Broadcasting request "
00103 + request.getSQLShortForm(dvdb.
getSQLShortFormLength())
00104 + (request.isAutoCommit() ?
"" :
" transaction "
00105 + request.getTransactionId()) +
" to all controllers ("
00106 + dvdb.
getChannel().getLocalAddress() +
"->"
00107 + groupMembers.toString() +
")");
00108
00109
00110 RspList responses = dvdb.
getDispatcher().castMessage(groupMembers,
00111
CJDBCGroupMessage.getMessage(
new ExecWriteRequest(request)),
00112 GroupRequest.GET_ALL, request.getTimeout());
00113
00114
if (logger.
isDebugEnabled())
00115 logger.
debug(
"Request "
00116 + request.getSQLShortForm(dvdb.
getSQLShortFormLength())
00117 +
" completed.");
00118
00119
if (responses.numSuspectedMembers() > 0)
00120 {
00121 logger.
warn(responses.numSuspectedMembers()
00122 +
" controller(s) died during execution of request "
00123 + request.getId());
00124 }
00125
00126
00127 Vector failedOnAllBackends = null;
00128 SQLException exception = null;
00129
int size = groupMembers.size();
00130
00131
for (
int i = 0; i < size; i++)
00132 {
00133 Address address = (Address) groupMembers.get(i);
00134
if (responses.isSuspected(address))
00135 {
00136 logger.
warn(
"Controller " + address +
" is suspected of failure.");
00137
continue;
00138 }
00139 Object r = responses.get(address);
00140
if (r instanceof Integer)
00141 {
00142
if (execWriteRequestResult == -1)
00143 execWriteRequestResult = ((Integer) r).intValue();
00144
else if (execWriteRequestResult != ((Integer) r).intValue())
00145 logger.
error(
"Controllers have different results for request "
00146 + request.getId());
00147 }
00148
else if (r instanceof
AllBackendsFailedException)
00149 {
00150
if (failedOnAllBackends == null)
00151 failedOnAllBackends =
new Vector();
00152 failedOnAllBackends.add(address);
00153
if (logger.
isDebugEnabled())
00154 logger.
debug(
"Request failed on all backends of controller "
00155 + address +
" (" + r +
")");
00156 }
00157
else if (r instanceof SQLException)
00158 {
00159 String msg =
"Request " + request.getId() +
" failed on controller "
00160 + address +
" (" + r +
")";
00161 logger.
warn(msg);
00162 exception = (SQLException) r;
00163 }
00164 }
00165
00166
if (failedOnAllBackends != null)
00167 {
00168 dvdb.
getDispatcher().castMessage(
00169 failedOnAllBackends,
00170
CJDBCGroupMessage.getMessage(
new NotifyCompletion(request,
00171 execWriteRequestResult != -1)), GroupRequest.GET_NONE,
00172 request.getTimeout());
00173 }
00174
00175
if (execWriteRequestResult != -1)
00176 {
00177
if (logger.
isDebugEnabled())
00178 logger.
debug(
"Request " + request.getId()
00179 +
" completed successfully.");
00180
return execWriteRequestResult;
00181 }
00182
else if (exception != null)
00183
throw exception;
00184
00185
00186 String msg =
"Request '" + request +
"' failed on all controllers";
00187 logger.
warn(msg);
00188
throw new SQLException(msg);
00189 }
00190
catch (SQLException e)
00191 {
00192 String msg =
Translate
00193 .get(
"loadbalancer.request.failed",
new String[]{
00194 request.getSQLShortForm(vdb.
getSQLShortFormLength()),
00195 e.getMessage()});
00196 logger.
warn(msg);
00197
throw e;
00198 }
00199 }
00200
00204 public ControllerResultSet execDistributedWriteRequestWithKeys(
00205
AbstractWriteRequest request)
throws SQLException
00206 {
00207
try
00208 {
00209
ControllerResultSet execWriteRequestResult = null;
00210
00211 Vector groupMembers = dvdb.getCurrentView().getMembers();
00212
00213
if (logger.isDebugEnabled())
00214 logger.debug(
"Broadcasting request "
00215 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00216 + (request.isAutoCommit() ?
"" :
" transaction "
00217 + request.getTransactionId()) +
") to all controllers ("
00218 + dvdb.getChannel().getLocalAddress() +
"->"
00219 + groupMembers.toString() +
")");
00220
00221
00222 RspList responses = dvdb.getDispatcher().castMessage(groupMembers,
00223
CJDBCGroupMessage.getMessage(
new ExecWriteRequestWithKeys(request)),
00224 GroupRequest.GET_ALL, request.getTimeout());
00225
00226
if (logger.isDebugEnabled())
00227 logger.debug(
"Request "
00228 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00229 +
" completed.");
00230
00231
if (responses.numSuspectedMembers() > 0)
00232 {
00233 logger.warn(responses.numSuspectedMembers()
00234 +
" controller(s) died during execution of request "
00235 + request.getId());
00236 }
00237
00238
00239 Vector failedOnAllBackends = null;
00240 SQLException exception = null;
00241
int size = groupMembers.size();
00242
00243
for (
int i = 0; i < size; i++)
00244 {
00245 Address address = (Address) groupMembers.get(i);
00246
if (responses.isSuspected(address))
00247 {
00248 logger.warn(
"Controller " + address +
" is suspected of failure.");
00249
continue;
00250 }
00251 Object r = responses.get(address);
00252
if (r instanceof ResultSet)
00253 {
00254
if (execWriteRequestResult == null)
00255 execWriteRequestResult = (
ControllerResultSet) r;
00256 }
00257
else if (r instanceof
AllBackendsFailedException)
00258 {
00259
if (failedOnAllBackends == null)
00260 failedOnAllBackends =
new Vector();
00261 failedOnAllBackends.add(address);
00262
if (logger.isDebugEnabled())
00263 logger.debug(
"Request failed on all backends of controller "
00264 + address +
" (" + r +
")");
00265 }
00266
else if (r instanceof SQLException)
00267 {
00268 String msg =
"Request " + request.getId() +
" failed on controller "
00269 + address +
" (" + r +
")";
00270 logger.warn(msg);
00271 exception = (SQLException) r;
00272 }
00273 }
00274
00275
if (failedOnAllBackends != null)
00276 {
00277 dvdb.getDispatcher().castMessage(
00278 failedOnAllBackends,
00279
CJDBCGroupMessage.getMessage(
new NotifyCompletion(request,
00280 execWriteRequestResult != null)), GroupRequest.GET_NONE,
00281 request.getTimeout());
00282 }
00283
00284
if (execWriteRequestResult != null)
00285 {
00286
if (logger.isDebugEnabled())
00287 logger.debug(
"Request " + request.getId()
00288 +
" completed successfully.");
00289
return execWriteRequestResult;
00290 }
00291
else if (exception != null)
00292
throw exception;
00293
00294 String msg =
"Request '" + request +
"' failed on all controllers";
00295 logger.warn(msg);
00296
throw new SQLException(msg);
00297 }
00298
catch (SQLException e)
00299 {
00300 String msg =
Translate
00301 .get(
"loadbalancer.request.failed",
new String[]{
00302 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00303 e.getMessage()});
00304 logger.warn(msg);
00305
throw e;
00306 }
00307 }
00308
00312 public ControllerResultSet execDistributedReadStoredProcedure(
00313
StoredProcedure proc)
throws SQLException
00314 {
00315
try
00316 {
00317
ControllerResultSet result = null;
00318
00319 Vector groupMembers = dvdb.getCurrentView().getMembers();
00320
00321
if (logger.isDebugEnabled())
00322 logger.debug(
"Broadcasting read stored procedure "
00323 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00324 + (proc.isAutoCommit() ?
"" :
" transaction "
00325 + proc.getTransactionId()) +
") to all controllers ("
00326 + dvdb.getChannel().getLocalAddress() +
"->"
00327 + groupMembers.toString() +
")");
00328
00329
00330 RspList responses = dvdb.getDispatcher().castMessage(groupMembers,
00331
CJDBCGroupMessage.getMessage(
new ExecReadStoredProcedure(proc)),
00332 GroupRequest.GET_ALL, proc.getTimeout());
00333
00334
if (logger.isDebugEnabled())
00335 logger.debug(
"Stored procedure "
00336 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00337 +
" completed.");
00338
00339
if (responses.numSuspectedMembers() > 0)
00340 {
00341 logger.warn(responses.numSuspectedMembers()
00342 +
" controller(s) died during execution of stored procedure "
00343 + proc.getId());
00344 }
00345
00346
00347 Vector failedOnAllBackends = null;
00348 SQLException exception = null;
00349
int size = groupMembers.size();
00350
00351
for (
int i = 0; i < size; i++)
00352 {
00353 Address address = (Address) groupMembers.get(i);
00354
if (responses.isSuspected(address))
00355 {
00356 logger.warn(
"Controller " + address +
" is suspected of failure.");
00357
continue;
00358 }
00359 Object r = responses.get(address);
00360
if (r instanceof
ControllerResultSet)
00361 {
00362
if (result == null)
00363 result = (
ControllerResultSet) r;
00364 }
00365
else if (r instanceof
AllBackendsFailedException)
00366 {
00367
if (failedOnAllBackends == null)
00368 failedOnAllBackends =
new Vector();
00369 failedOnAllBackends.add(address);
00370 }
00371
else if (r instanceof SQLException)
00372 {
00373 String msg =
"Stored procedure " + proc.getId()
00374 +
" failed on controller " + address +
" (" + r +
")";
00375 logger.warn(msg);
00376 exception = (SQLException) r;
00377 }
00378 }
00379
00380
if (failedOnAllBackends != null)
00381 {
00382 dvdb.getDispatcher().castMessage(
00383 failedOnAllBackends,
00384
CJDBCGroupMessage.getMessage(
new NotifyCompletion(proc,
00385 result != null)), GroupRequest.GET_NONE, proc.getTimeout());
00386 }
00387
00388
if (result != null)
00389 {
00390
if (logger.isDebugEnabled())
00391 logger.debug(
"Stored procedure " + proc.getId()
00392 +
" completed successfully.");
00393
return result;
00394 }
00395
else if (exception != null)
00396
throw exception;
00397
00398
00399 String msg =
"Stored procedure '" + proc +
"' failed on all controllers";
00400 logger.warn(msg);
00401
throw new SQLException(msg);
00402 }
00403
catch (SQLException e)
00404 {
00405 String msg =
Translate.get(
"loadbalancer.request.failed",
new String[]{
00406 proc.getSQLShortForm(vdb.getSQLShortFormLength()), e.getMessage()});
00407 logger.warn(msg);
00408
throw e;
00409 }
00410 }
00411
00415 public int execDistributedWriteStoredProcedure(
StoredProcedure proc)
00416
throws SQLException
00417 {
00418
try
00419 {
00420
int execWriteStoredProcedureResult = NO_RESULT;
00421
00422 Vector groupMembers = dvdb.getCurrentView().getMembers();
00423
00424
if (logger.isDebugEnabled())
00425 logger.debug(
"Broadcasting write store procedure "
00426 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00427 + (proc.isAutoCommit() ?
"" :
" transaction "
00428 + proc.getTransactionId()) +
") to all controllers ("
00429 + dvdb.getChannel().getLocalAddress() +
"->"
00430 + groupMembers.toString() +
")");
00431
00432
00433 RspList responses = dvdb.getDispatcher().castMessage(groupMembers,
00434
CJDBCGroupMessage.getMessage(
new ExecWriteStoredProcedure(proc)),
00435 GroupRequest.GET_ALL, proc.getTimeout());
00436
00437
if (logger.isDebugEnabled())
00438 logger.debug(
"Stored procedure "
00439 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00440 +
" completed.");
00441
00442
if (responses.numSuspectedMembers() > 0)
00443 {
00444 logger.warn(responses.numSuspectedMembers()
00445 +
" controller(s) died during execution of stored procedure "
00446 + proc.getId());
00447 }
00448
00449
00450 Vector failedOnAllBackends = null;
00451 SQLException exception = null;
00452
int size = groupMembers.size();
00453
00454
for (
int i = 0; i < size; i++)
00455 {
00456 Address address = (Address) groupMembers.get(i);
00457
if (responses.isSuspected(address))
00458 {
00459 logger.warn(
"Controller " + address +
" is suspected of failure.");
00460
continue;
00461 }
00462 Object r = responses.get(address);
00463
if (r instanceof Integer)
00464 {
00465
if (execWriteStoredProcedureResult == NO_RESULT)
00466 execWriteStoredProcedureResult = ((Integer) r).intValue();
00467 }
00468
else if (r instanceof
AllBackendsFailedException)
00469 {
00470
if (failedOnAllBackends == null)
00471 failedOnAllBackends =
new Vector();
00472 failedOnAllBackends.add(address);
00473 }
00474
else if (r instanceof SQLException)
00475 {
00476 String msg =
"Stored procedure " + proc.getId()
00477 +
" failed on controller " + address +
" (" + r +
")";
00478 logger.warn(msg);
00479 exception = (SQLException) r;
00480 }
00481 }
00482
00483
if (failedOnAllBackends != null)
00484 {
00485 dvdb.getDispatcher().castMessage(
00486 failedOnAllBackends,
00487
CJDBCGroupMessage.getMessage(
new NotifyCompletion(proc,
00488 execWriteStoredProcedureResult != NO_RESULT)),
00489 GroupRequest.GET_NONE, proc.getTimeout());
00490 }
00491
00492
if (execWriteStoredProcedureResult != NO_RESULT)
00493 {
00494
if (logger.isDebugEnabled())
00495 logger.debug(
"Stored procedure " + proc.getId()
00496 +
" completed successfully.");
00497
return execWriteStoredProcedureResult;
00498 }
00499
else if (exception != null)
00500
throw exception;
00501
00502
00503 String msg =
"Stored procedure '" + proc +
"' failed on all controllers";
00504 logger.warn(msg);
00505
throw new SQLException(msg);
00506 }
00507
catch (SQLException e)
00508 {
00509 String msg =
Translate.get(
"loadbalancer.request.failed",
new String[]{
00510 proc.getSQLShortForm(vdb.getSQLShortFormLength()), e.getMessage()});
00511 logger.warn(msg);
00512
throw e;
00513 }
00514 }
00515
00519 public void distributedCommit(
long transactionId)
throws SQLException
00520 {
00521
try
00522 {
00523 Vector groupMembers = dvdb.getCurrentView().getMembers();
00524
if (logger.isDebugEnabled())
00525 logger.debug(
"Broadcasting transaction " + transactionId
00526 +
" commit to all controllers ("
00527 + dvdb.getChannel().getLocalAddress() +
"->"
00528 + groupMembers.toString() +
")");
00529
00530
00531 RspList responses = dvdb.getDispatcher().castMessage(groupMembers,
00532
CJDBCGroupMessage.getMessage(
new Commit(transactionId)),
00533 GroupRequest.GET_ALL,
this.commitTimeout);
00534
00535
if (logger.isDebugEnabled())
00536 logger.debug(
"Commit of transaction " + transactionId +
" completed.");
00537
00538
if (responses.numSuspectedMembers() > 0)
00539 {
00540 logger.warn(responses.numSuspectedMembers()
00541 +
" controller(s) died during execution of commit for transaction "
00542 + transactionId);
00543 }
00544
00545
00546 Vector failedOnAllBackends = null;
00547 SQLException exception = null;
00548
int size = groupMembers.size();
00549
boolean success =
false;
00550
00551
for (
int i = 0; i < size; i++)
00552 {
00553 Address address = (Address) groupMembers.get(i);
00554
if (responses.isSuspected(address))
00555 {
00556 logger.warn(
"Controller " + address +
" is suspected of failure.");
00557
continue;
00558 }
00559 Object r = responses.get(address);
00560
if (r instanceof Boolean)
00561 {
00562
if (((Boolean) r).booleanValue())
00563 success =
true;
00564
else
00565 logger.error(
"Unexpected result for controller " + address);
00566 }
00567
else if (r instanceof
AllBackendsFailedException)
00568 {
00569
if (failedOnAllBackends == null)
00570 failedOnAllBackends =
new Vector();
00571 failedOnAllBackends.add(address);
00572
if (logger.isDebugEnabled())
00573 logger.debug(
"Commit failed on all backends of controller "
00574 + address +
" (" + r +
")");
00575 }
00576
else if (r instanceof SQLException)
00577 {
00578 String msg =
"Commit of transaction " + transactionId
00579 +
" failed on controller " + address +
" (" + r +
")";
00580 logger.warn(msg);
00581 exception = (SQLException) r;
00582 }
00583 }
00584
00585
if (failedOnAllBackends != null)
00586 {
00587
00588
AbstractRequest request =
new UnknownRequest(
"commit",
false, 0,
"\n");
00589 request.
setTransactionId(transactionId);
00590 dvdb.getDispatcher().castMessage(
00591 failedOnAllBackends,
00592
CJDBCGroupMessage
00593 .getMessage(
new NotifyCompletion(request, success)),
00594 GroupRequest.GET_NONE, commitTimeout);
00595 }
00596
00597
if (success)
00598
return;
00599
00600
if (exception != null)
00601
throw exception;
00602
00603 String msg =
"Transaction " + transactionId
00604 +
" failed to commit on all controllers";
00605 logger.warn(msg);
00606
throw new SQLException(msg);
00607 }
00608
catch (SQLException e)
00609 {
00610 String msg =
"Transaction " + transactionId +
" commit failed (" + e
00611 +
")";
00612 logger.warn(msg);
00613
throw e;
00614 }
00615 }
00616
00620 public void distributedRollback(
long transactionId)
throws SQLException
00621 {
00622
try
00623 {
00624 Vector groupMembers = dvdb.getCurrentView().getMembers();
00625
if (logger.isDebugEnabled())
00626 logger.debug(
"Broadcasting transaction " + transactionId
00627 +
" rollback to all controllers ("
00628 + dvdb.getChannel().getLocalAddress() +
"->"
00629 + groupMembers.toString() +
")");
00630
00631
00632 RspList responses = dvdb.getDispatcher().castMessage(groupMembers,
00633
CJDBCGroupMessage.getMessage(
new Rollback(transactionId)),
00634 GroupRequest.GET_ALL,
this.rollbackTimeout);
00635
00636
if (logger.isDebugEnabled())
00637 logger
00638 .debug(
"rollback of transaction " + transactionId +
" completed.");
00639
00640
if (responses.numSuspectedMembers() > 0)
00641 {
00642 logger
00643 .warn(responses.numSuspectedMembers()
00644 +
" controller(s) died during execution of rollback for transaction "
00645 + transactionId);
00646 }
00647
00648
00649 Vector failedOnAllBackends = null;
00650 SQLException exception = null;
00651
int size = groupMembers.size();
00652
boolean success =
false;
00653
00654
for (
int i = 0; i < size; i++)
00655 {
00656 Address address = (Address) groupMembers.get(i);
00657
if (responses.isSuspected(address))
00658 {
00659 logger.warn(
"Controller " + address +
" is suspected of failure.");
00660
continue;
00661 }
00662 Object r = responses.get(address);
00663
if (r instanceof Boolean)
00664 {
00665
if (((Boolean) r).booleanValue())
00666 success =
true;
00667
else
00668 logger.error(
"Unexpected result for controller " + address);
00669 }
00670
else if (r instanceof
AllBackendsFailedException)
00671 {
00672
if (failedOnAllBackends == null)
00673 failedOnAllBackends =
new Vector();
00674 failedOnAllBackends.add(address);
00675
if (logger.isDebugEnabled())
00676 logger.debug(
"rollback failed on all backends of controller "
00677 + address +
" (" + r +
")");
00678 }
00679
else if (r instanceof SQLException)
00680 {
00681 String msg =
"rollback of transaction " + transactionId
00682 +
" failed on controller " + address +
" (" + r +
")";
00683 logger.warn(msg);
00684 exception = (SQLException) r;
00685 }
00686 }
00687
00688
if (failedOnAllBackends != null)
00689 {
00690
00691
AbstractRequest request =
new UnknownRequest(
"rollback",
false, 0,
"\n");
00692 request.
setTransactionId(transactionId);
00693 dvdb.getDispatcher().castMessage(
00694 failedOnAllBackends,
00695
CJDBCGroupMessage
00696 .getMessage(
new NotifyCompletion(request, success)),
00697 GroupRequest.GET_NONE, rollbackTimeout);
00698 }
00699
00700
if (success)
00701
return;
00702
00703
if (exception != null)
00704
throw exception;
00705
00706
00707 String msg =
"Transaction " + transactionId
00708 +
" failed to rollback on all controllers";
00709 logger.warn(msg);
00710
throw new SQLException(msg);
00711 }
00712
catch (SQLException e)
00713 {
00714 String msg =
"Transaction " + transactionId +
" rollback failed (" + e
00715 +
")";
00716 logger.warn(msg);
00717
throw e;
00718 }
00719 }
00720
00721 }