00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.requestmanager.distributed;
00026
00027 import java.sql.ResultSet;
00028 import java.sql.SQLException;
00029 import java.util.ArrayList;
00030
00031 import javax.management.NotCompliantMBeanException;
00032
00033 import org.objectweb.cjdbc.common.i18n.Translate;
00034 import org.objectweb.cjdbc.common.sql.AbstractRequest;
00035 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00036 import org.objectweb.cjdbc.common.sql.SelectRequest;
00037 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00038 import org.objectweb.cjdbc.common.sql.UnknownRequest;
00039 import org.objectweb.cjdbc.controller.cache.result.AbstractResultCache;
00040 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00041 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00042 import org.objectweb.cjdbc.controller.recoverylog.AbstractRecoveryLog;
00043 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00044 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00045 import org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase;
00046 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Commit;
00047 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecReadRequest;
00048 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecReadStoredProcedure;
00049 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecWriteRequest;
00050 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecWriteRequestWithKeys;
00051 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecWriteStoredProcedure;
00052 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.NotifyCompletion;
00053 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Rollback;
00054 import org.objectweb.tribe.adapters.MulticastRequestAdapter;
00055 import org.objectweb.tribe.adapters.MulticastResponse;
00056 import org.objectweb.tribe.common.Member;
00057
00058
00059
00060
00061
00062
00063
00064 public class RAIDb2DistributedRequestManager extends DistributedRequestManager
00065 {
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081 public RAIDb2DistributedRequestManager(DistributedVirtualDatabase vdb,
00082 AbstractScheduler scheduler, AbstractResultCache cache,
00083 AbstractLoadBalancer loadBalancer, AbstractRecoveryLog recoveryLog,
00084 long beginTimeout, long commitTimeout, long rollbackTimeout)
00085 throws SQLException, NotCompliantMBeanException
00086 {
00087 super(vdb, scheduler, cache, loadBalancer, recoveryLog, beginTimeout,
00088 commitTimeout, rollbackTimeout);
00089 }
00090
00091
00092
00093
00094
00095 public ControllerResultSet execRemoteReadRequest(long controllerUniqueId,
00096 SelectRequest request) throws SQLException
00097 {
00098 try
00099 {
00100
00101
00102
00103
00104
00105 Member nextController = (Member) dvdb.getAllMemberButUs().get(0);
00106 ArrayList groupMembers = new ArrayList();
00107 groupMembers.add(nextController);
00108
00109 if (logger.isDebugEnabled())
00110 logger.debug("Sending request "
00111 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00112 + (request.isAutoCommit() ? "" : " transaction "
00113 + request.getTransactionId()) + " to next controller ("
00114 + groupMembers.get(0) + ")");
00115
00116
00117 MulticastResponse responses;
00118 responses = dvdb.getMulticastRequestAdapter().multicastMessage(
00119 groupMembers, new ExecReadRequest(getControllerId(), request),
00120 MulticastRequestAdapter.WAIT_ALL, request.getTimeout());
00121
00122 if (logger.isDebugEnabled())
00123 logger.debug("Request "
00124 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00125 + " completed.");
00126
00127 return (ControllerResultSet) responses.getResult(nextController);
00128 }
00129 catch (Exception e)
00130 {
00131 String msg = "An error occured while executing remote select request "
00132 + request.getId();
00133 logger.warn(msg, e);
00134 throw new SQLException(msg + " (" + e + ")");
00135 }
00136 }
00137
00138
00139
00140
00141 public int execDistributedWriteRequest(AbstractWriteRequest request)
00142 throws SQLException
00143 {
00144 try
00145 {
00146 int execWriteRequestResult = -1;
00147
00148 ArrayList groupMembers = dvdb.getCurrentGroup().getMembers();
00149
00150 if (logger.isDebugEnabled())
00151 logger.debug("Broadcasting request "
00152 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00153 + (request.isAutoCommit() ? "" : " transaction "
00154 + request.getTransactionId()) + ") to all controllers ("
00155 + dvdb.getChannel().getLocalMembership() + "->"
00156 + groupMembers.toString() + ")");
00157
00158
00159
00160
00161 MulticastResponse responses;
00162 try
00163 {
00164 responses = dvdb.getMulticastRequestAdapter().multicastMessage(
00165 groupMembers, new ExecWriteRequest(request),
00166 MulticastRequestAdapter.WAIT_ALL, request.getTimeout());
00167 }
00168 catch (Exception e)
00169 {
00170 String msg = "An error occured while executing distributed write request "
00171 + request.getId();
00172 logger.warn(msg, e);
00173 throw new SQLException(msg + " (" + e + ")");
00174 }
00175
00176 if (logger.isDebugEnabled())
00177 logger.debug("Request "
00178 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00179 + " completed.");
00180
00181 if (responses.getFailedMembers() != null)
00182 {
00183 logger.warn(responses.getFailedMembers().size()
00184 + " controller(s) died during execution of request "
00185 + request.getId());
00186 }
00187
00188
00189 ArrayList failedOnAllBackends = null;
00190 SQLException exception = null;
00191 int size = groupMembers.size();
00192
00193 for (int i = 0; i < size; i++)
00194 {
00195 Member member = (Member) groupMembers.get(i);
00196 if ((responses.getFailedMembers() != null)
00197 && responses.getFailedMembers().contains(member))
00198 {
00199 logger.warn("Controller " + member + " is suspected of failure.");
00200 continue;
00201 }
00202 Object r = responses.getResult(member);
00203 if (r instanceof Integer)
00204 {
00205 if (execWriteRequestResult == -1)
00206 execWriteRequestResult = ((Integer) r).intValue();
00207 else if (execWriteRequestResult != ((Integer) r).intValue())
00208 logger.error("Controllers have different results for request "
00209 + request.getId());
00210 }
00211 else if (r instanceof AllBackendsFailedException)
00212 {
00213 if (failedOnAllBackends == null)
00214 failedOnAllBackends = new ArrayList();
00215 failedOnAllBackends.add(member);
00216 if (logger.isDebugEnabled())
00217 logger.debug("Request failed on all backends of controller "
00218 + member + " (" + r + ")");
00219 }
00220 else if (r instanceof SQLException)
00221 {
00222 String msg = "Request " + request.getId() + " failed on controller "
00223 + member + " (" + r + ")";
00224 logger.warn(msg);
00225 exception = (SQLException) r;
00226 }
00227 }
00228
00229 if (failedOnAllBackends != null)
00230 {
00231 try
00232 {
00233 dvdb.getMulticastRequestAdapter().multicastMessage(
00234 failedOnAllBackends,
00235 new NotifyCompletion(request, execWriteRequestResult != -1),
00236 MulticastRequestAdapter.WAIT_NONE, request.getTimeout());
00237 }
00238 catch (Exception e)
00239 {
00240 String msg = "An error occured while notifying all controllers of failure of distributed write request "
00241 + request.getId();
00242 logger.warn(msg, e);
00243 throw new SQLException(msg + " (" + e + ")");
00244 }
00245 }
00246
00247 if (execWriteRequestResult != -1)
00248 return execWriteRequestResult;
00249 else if (exception != null)
00250 throw exception;
00251
00252 String msg = "Request '" + request + "' failed on all controllers";
00253 logger.warn(msg);
00254 throw new SQLException(msg);
00255 }
00256 catch (SQLException e)
00257 {
00258 String msg = Translate
00259 .get("loadbalancer.request.failed", new String[]{
00260 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00261 e.getMessage()});
00262 logger.warn(msg);
00263 throw e;
00264 }
00265 }
00266
00267
00268
00269
00270 public ControllerResultSet execDistributedWriteRequestWithKeys(
00271 AbstractWriteRequest request) throws SQLException
00272 {
00273 try
00274 {
00275 ControllerResultSet execWriteRequestResult = null;
00276
00277 ArrayList groupMembers = dvdb.getCurrentGroup().getMembers();
00278
00279 if (logger.isDebugEnabled())
00280 logger.debug("Broadcasting request "
00281 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00282 + (request.isAutoCommit() ? "" : " transaction "
00283 + request.getTransactionId()) + ") to all controllers ("
00284 + dvdb.getChannel().getLocalMembership() + "->"
00285 + groupMembers.toString() + ")");
00286
00287
00288
00289
00290 MulticastResponse responses;
00291 try
00292 {
00293 responses = dvdb.getMulticastRequestAdapter().multicastMessage(
00294 groupMembers, new ExecWriteRequestWithKeys(request),
00295 MulticastRequestAdapter.WAIT_ALL, request.getTimeout());
00296 }
00297 catch (Exception e)
00298 {
00299 String msg = "An error occured while executing distributed write request with keys "
00300 + request.getId();
00301 logger.warn(msg, e);
00302 throw new SQLException(msg + " (" + e + ")");
00303 }
00304
00305 if (logger.isDebugEnabled())
00306 logger.debug("Request "
00307 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00308 + " completed.");
00309
00310 if (responses.getFailedMembers() != null)
00311 {
00312 logger.warn(responses.getFailedMembers().size()
00313 + " controller(s) died during execution of request "
00314 + request.getId());
00315 }
00316
00317
00318 ArrayList failedOnAllBackends = null;
00319 SQLException exception = null;
00320 int size = groupMembers.size();
00321
00322 for (int i = 0; i < size; i++)
00323 {
00324 Member member = (Member) groupMembers.get(i);
00325 if ((responses.getFailedMembers() != null)
00326 && responses.getFailedMembers().contains(member))
00327 {
00328 logger.warn("Controller " + member + " is suspected of failure.");
00329 continue;
00330 }
00331 Object r = responses.getResult(member);
00332 if (r instanceof ResultSet)
00333 {
00334 if (execWriteRequestResult == null)
00335 execWriteRequestResult = (ControllerResultSet) r;
00336 }
00337 else if (r instanceof AllBackendsFailedException)
00338 {
00339 if (failedOnAllBackends == null)
00340 failedOnAllBackends = new ArrayList();
00341 failedOnAllBackends.add(member);
00342 if (logger.isDebugEnabled())
00343 logger.debug("Request failed on all backends of controller "
00344 + member + " (" + r + ")");
00345 }
00346 else if (r instanceof SQLException)
00347 {
00348 String msg = "Request " + request.getId() + " failed on controller "
00349 + member + " (" + r + ")";
00350 logger.warn(msg);
00351 exception = (SQLException) r;
00352 }
00353 }
00354
00355 if (failedOnAllBackends != null)
00356 {
00357 try
00358 {
00359 dvdb.getMulticastRequestAdapter().multicastMessage(
00360 failedOnAllBackends,
00361 new NotifyCompletion(request, execWriteRequestResult != null),
00362 MulticastRequestAdapter.WAIT_NONE, request.getTimeout());
00363 }
00364 catch (Exception e)
00365 {
00366 String msg = "An error occured while notifying all controllers of failure of distributed write request with keys "
00367 + request.getId();
00368 logger.warn(msg, e);
00369 throw new SQLException(msg + " (" + e + ")");
00370 }
00371 }
00372
00373 if (execWriteRequestResult != null)
00374 return execWriteRequestResult;
00375 else if (exception != null)
00376 throw exception;
00377
00378 String msg = "Request '" + request + "' failed on all controllers";
00379 logger.warn(msg);
00380 throw new SQLException(msg);
00381 }
00382 catch (SQLException e)
00383 {
00384 String msg = Translate
00385 .get("loadbalancer.request.failed", new String[]{
00386 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00387 e.getMessage()});
00388 logger.warn(msg);
00389 throw e;
00390 }
00391 }
00392
00393
00394
00395
00396 public ControllerResultSet execDistributedReadStoredProcedure(
00397 StoredProcedure proc) throws SQLException
00398 {
00399 try
00400 {
00401 ControllerResultSet result = null;
00402
00403 ArrayList groupMembers = dvdb.getCurrentGroup().getMembers();
00404
00405 if (logger.isDebugEnabled())
00406 logger.debug("Broadcasting read stored procedure "
00407 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00408 + (proc.isAutoCommit() ? "" : " transaction "
00409 + proc.getTransactionId()) + ") to all controllers ("
00410 + dvdb.getChannel().getLocalMembership() + "->"
00411 + groupMembers.toString() + ")");
00412
00413
00414
00415
00416 MulticastResponse responses;
00417 try
00418 {
00419 responses = dvdb.getMulticastRequestAdapter().multicastMessage(
00420 groupMembers, new ExecReadStoredProcedure(proc),
00421 MulticastRequestAdapter.WAIT_ALL, proc.getTimeout());
00422 }
00423 catch (Exception e)
00424 {
00425 String msg = "An error occured while executing distributed read stored procedure "
00426 + proc.getId();
00427 logger.warn(msg, e);
00428 throw new SQLException(msg + " (" + e + ")");
00429 }
00430
00431 if (logger.isDebugEnabled())
00432 logger.debug("Stored procedure "
00433 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00434 + " completed.");
00435
00436 if (responses.getFailedMembers() != null)
00437 {
00438 logger.warn(responses.getFailedMembers().size()
00439 + " controller(s) died during execution of stored procedure "
00440 + proc.getId());
00441 }
00442
00443
00444 ArrayList failedOnAllBackends = null;
00445 SQLException exception = null;
00446 int size = groupMembers.size();
00447
00448 for (int i = 0; i < size; i++)
00449 {
00450 Member member = (Member) groupMembers.get(i);
00451 if ((responses.getFailedMembers() != null)
00452 && responses.getFailedMembers().contains(member))
00453 {
00454 logger.warn("Controller " + member + " is suspected of failure.");
00455 continue;
00456 }
00457 Object r = responses.getResult(member);
00458 if (r instanceof ControllerResultSet)
00459 {
00460 if (result == null)
00461 result = (ControllerResultSet) r;
00462 }
00463 else if (r instanceof AllBackendsFailedException)
00464 {
00465 if (failedOnAllBackends == null)
00466 failedOnAllBackends = new ArrayList();
00467 failedOnAllBackends.add(member);
00468 }
00469 else if (r instanceof SQLException)
00470 {
00471 String msg = "Stored procedure " + proc.getId()
00472 + " failed on controller " + member + " (" + r + ")";
00473 logger.warn(msg);
00474 exception = (SQLException) r;
00475 }
00476 }
00477
00478 if (failedOnAllBackends != null)
00479 {
00480 try
00481 {
00482 dvdb.getMulticastRequestAdapter().multicastMessage(
00483 failedOnAllBackends, new NotifyCompletion(proc, result != null),
00484 MulticastRequestAdapter.WAIT_NONE, proc.getTimeout());
00485 }
00486 catch (Exception e)
00487 {
00488 String msg = "An error occured while notifying all controllers of failure of read stored procedure "
00489 + proc.getId();
00490 logger.warn(msg, e);
00491 throw new SQLException(msg + " (" + e + ")");
00492 }
00493 }
00494
00495 if (result != null)
00496 {
00497 if (logger.isDebugEnabled())
00498 logger.debug("Stored procedure " + proc.getId()
00499 + " completed successfully.");
00500 return result;
00501 }
00502 else if (exception != null)
00503 throw exception;
00504
00505
00506 String msg = "Stored procedure '" + proc + "' failed on all controllers";
00507 logger.warn(msg);
00508 throw new SQLException(msg);
00509 }
00510 catch (SQLException e)
00511 {
00512 String msg = Translate.get("loadbalancer.request.failed", new String[]{
00513 proc.getSQLShortForm(vdb.getSQLShortFormLength()), e.getMessage()});
00514 logger.warn(msg);
00515 throw e;
00516 }
00517 }
00518
00519
00520
00521
00522 public int execDistributedWriteStoredProcedure(StoredProcedure proc)
00523 throws SQLException
00524 {
00525 try
00526 {
00527 int execWriteStoredProcedureResult = -1;
00528
00529 ArrayList groupMembers = dvdb.getCurrentGroup().getMembers();
00530
00531 if (logger.isDebugEnabled())
00532 logger.debug("Broadcasting stored procedure "
00533 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00534 + (proc.isAutoCommit() ? "" : " transaction "
00535 + proc.getTransactionId()) + ") to all controllers ("
00536 + dvdb.getChannel().getLocalMembership() + "->"
00537 + groupMembers.toString() + ")");
00538
00539
00540
00541
00542 MulticastResponse responses;
00543 try
00544 {
00545 responses = dvdb.getMulticastRequestAdapter().multicastMessage(
00546 groupMembers, new ExecWriteStoredProcedure(proc),
00547 MulticastRequestAdapter.WAIT_ALL, proc.getTimeout());
00548 }
00549 catch (Exception e)
00550 {
00551 String msg = "An error occured while executing distributed write stored procedure "
00552 + proc.getId();
00553 logger.warn(msg, e);
00554 throw new SQLException(msg + " (" + e + ")");
00555 }
00556
00557 if (logger.isDebugEnabled())
00558 logger.debug("Stored procedure "
00559 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00560 + " completed.");
00561
00562 if (responses.getFailedMembers() != null)
00563 {
00564 logger.warn(responses.getFailedMembers().size()
00565 + " controller(s) died during execution of stored procedure "
00566 + proc.getId());
00567 }
00568
00569
00570 ArrayList failedOnAllBackends = null;
00571 SQLException exception = null;
00572 int size = groupMembers.size();
00573
00574 for (int i = 0; i < size; i++)
00575 {
00576 Member member = (Member) groupMembers.get(i);
00577 if ((responses.getFailedMembers() != null)
00578 && responses.getFailedMembers().contains(member))
00579 {
00580 logger.warn("Controller " + member + " is suspected of failure.");
00581 continue;
00582 }
00583 Object r = responses.getResult(member);
00584 if (r instanceof Integer)
00585 {
00586 if (execWriteStoredProcedureResult != -1)
00587 execWriteStoredProcedureResult = ((Integer) r).intValue();
00588 }
00589 else if (r instanceof AllBackendsFailedException)
00590 {
00591 if (failedOnAllBackends == null)
00592 failedOnAllBackends = new ArrayList();
00593 failedOnAllBackends.add(member);
00594 }
00595 else if (r instanceof SQLException)
00596 {
00597 String msg = "Stored procedure " + proc.getId()
00598 + " failed on controller " + member + " (" + r + ")";
00599 logger.warn(msg);
00600 exception = (SQLException) r;
00601 }
00602 }
00603
00604 if (failedOnAllBackends != null)
00605 {
00606 try
00607 {
00608 dvdb.getMulticastRequestAdapter().multicastMessage(
00609 failedOnAllBackends,
00610 new NotifyCompletion(proc, execWriteStoredProcedureResult != -1),
00611 MulticastRequestAdapter.WAIT_NONE, proc.getTimeout());
00612 }
00613 catch (Exception e)
00614 {
00615 String msg = "An error occured while notifying all controllers of failure of write stored procedure "
00616 + proc.getId();
00617 logger.warn(msg, e);
00618 throw new SQLException(msg + " (" + e + ")");
00619 }
00620 }
00621
00622 if (execWriteStoredProcedureResult != -1)
00623 return execWriteStoredProcedureResult;
00624 else if (exception != null)
00625 throw exception;
00626
00627
00628 String msg = "Stored procedure '" + proc + "' failed on all controllers";
00629 logger.warn(msg);
00630 throw new SQLException(msg);
00631 }
00632 catch (SQLException e)
00633 {
00634 String msg = Translate.get("loadbalancer.request.failed", new String[]{
00635 proc.getSQLShortForm(vdb.getSQLShortFormLength()), e.getMessage()});
00636 logger.warn(msg);
00637 throw e;
00638 }
00639 }
00640
00641
00642
00643
00644 public void distributedCommit(long transactionId) throws SQLException
00645 {
00646 try
00647 {
00648 ArrayList groupMembers = dvdb.getCurrentGroup().getMembers();
00649 if (logger.isDebugEnabled())
00650 logger.debug("Broadcasting transaction " + transactionId
00651 + " commit to all controllers ("
00652 + dvdb.getChannel().getLocalMembership() + "->"
00653 + groupMembers.toString() + ")");
00654
00655
00656 MulticastResponse responses;
00657 try
00658 {
00659 responses = dvdb.getMulticastRequestAdapter().multicastMessage(
00660 groupMembers, new Commit(transactionId),
00661 MulticastRequestAdapter.WAIT_ALL, this.commitTimeout);
00662 }
00663 catch (Exception e)
00664 {
00665 String msg = "An error occured while executing distributed rollback for transaction "
00666 + transactionId;
00667 logger.warn(msg, e);
00668 throw new SQLException(msg + "(" + e + ")");
00669 }
00670
00671 if (logger.isDebugEnabled())
00672 logger.debug("Commit of transaction " + transactionId + " completed.");
00673
00674 if (responses.getFailedMembers() != null)
00675 {
00676 logger.warn(responses.getFailedMembers().size()
00677 + " controller(s) died during execution of commit for transaction "
00678 + transactionId);
00679 }
00680
00681
00682 ArrayList failedOnAllBackends = null;
00683 SQLException exception = null;
00684 int size = groupMembers.size();
00685 boolean success = false;
00686
00687 for (int i = 0; i < size; i++)
00688 {
00689 Member member = (Member) groupMembers.get(i);
00690 if ((responses.getFailedMembers() != null)
00691 && responses.getFailedMembers().contains(member))
00692 {
00693 logger.warn("Controller " + member + " is suspected of failure.");
00694 continue;
00695 }
00696 Object r = responses.getResult(member);
00697 if (r instanceof Boolean)
00698 {
00699 if (((Boolean) r).booleanValue())
00700 success = true;
00701 else
00702 logger.error("Unexpected result for controller " + member);
00703 }
00704 else if (r instanceof AllBackendsFailedException)
00705 {
00706 if (failedOnAllBackends == null)
00707 failedOnAllBackends = new ArrayList();
00708 failedOnAllBackends.add(member);
00709 if (logger.isDebugEnabled())
00710 logger.debug("Commit failed on all backends of controller "
00711 + member + " (" + r + ")");
00712 }
00713 else if (r instanceof SQLException)
00714 {
00715 String msg = "Commit of transaction " + transactionId
00716 + " failed on controller " + member + " (" + r + ")";
00717 logger.warn(msg);
00718 exception = (SQLException) r;
00719 }
00720 }
00721
00722 if (failedOnAllBackends != null)
00723 {
00724
00725 AbstractRequest request = new UnknownRequest("commit", false, 0, "\n");
00726 request.setTransactionId(transactionId);
00727 try
00728 {
00729 dvdb.getMulticastRequestAdapter().multicastMessage(
00730 failedOnAllBackends, new NotifyCompletion(request, success),
00731 MulticastRequestAdapter.WAIT_NONE, commitTimeout);
00732 }
00733 catch (Exception e)
00734 {
00735 String msg = "An error occured while notifying all controllers of failure to commit transaction "
00736 + transactionId;
00737 logger.warn(msg, e);
00738 throw new SQLException(msg + " (" + e + ")");
00739 }
00740 }
00741
00742 if (success)
00743 return;
00744
00745 if (exception != null)
00746 throw exception;
00747
00748 String msg = "Transaction " + transactionId
00749 + " failed to commit on all controllers";
00750 logger.warn(msg);
00751 throw new SQLException(msg);
00752 }
00753 catch (SQLException e)
00754 {
00755 String msg = "Transaction " + transactionId + " commit failed (" + e
00756 + ")";
00757 logger.warn(msg);
00758 throw e;
00759 }
00760 }
00761
00762
00763
00764
00765 public void distributedRollback(long transactionId) throws SQLException
00766 {
00767 try
00768 {
00769 ArrayList groupMembers = dvdb.getCurrentGroup().getMembers();
00770 if (logger.isDebugEnabled())
00771 logger.debug("Broadcasting transaction " + transactionId
00772 + " rollback to all controllers ("
00773 + dvdb.getChannel().getLocalMembership() + "->"
00774 + groupMembers.toString() + ")");
00775
00776
00777 MulticastResponse responses;
00778 try
00779 {
00780 responses = dvdb.getMulticastRequestAdapter().multicastMessage(
00781 groupMembers, new Rollback(transactionId),
00782 MulticastRequestAdapter.WAIT_ALL, this.rollbackTimeout);
00783 }
00784 catch (Exception e)
00785 {
00786 String msg = "An error occured while executing distributed rollback for transaction "
00787 + transactionId;
00788 logger.warn(msg, e);
00789 throw new SQLException(msg + "(" + e + ")");
00790 }
00791
00792 if (logger.isDebugEnabled())
00793 logger
00794 .debug("rollback of transaction " + transactionId + " completed.");
00795
00796 if (responses.getFailedMembers() != null)
00797 {
00798 logger
00799 .warn(responses.getFailedMembers().size()
00800 + " controller(s) died during execution of rollback for transaction "
00801 + transactionId);
00802 }
00803
00804
00805 ArrayList failedOnAllBackends = null;
00806 SQLException exception = null;
00807 int size = groupMembers.size();
00808 boolean success = false;
00809
00810 for (int i = 0; i < size; i++)
00811 {
00812 Member member = (Member) groupMembers.get(i);
00813 if ((responses.getFailedMembers() != null)
00814 && responses.getFailedMembers().contains(member))
00815 {
00816 logger.warn("Controller " + member + " is suspected of failure.");
00817 continue;
00818 }
00819 Object r = responses.getResult(member);
00820 if (r instanceof Boolean)
00821 {
00822 if (((Boolean) r).booleanValue())
00823 success = true;
00824 else
00825 logger.error("Unexpected result for controller " + member);
00826 }
00827 else if (r instanceof AllBackendsFailedException)
00828 {
00829 if (failedOnAllBackends == null)
00830 failedOnAllBackends = new ArrayList();
00831 failedOnAllBackends.add(member);
00832 if (logger.isDebugEnabled())
00833 logger.debug("rollback failed on all backends of controller "
00834 + member + " (" + r + ")");
00835 }
00836 else if (r instanceof SQLException)
00837 {
00838 String msg = "rollback of transaction " + transactionId
00839 + " failed on controller " + member + " (" + r + ")";
00840 logger.warn(msg);
00841 exception = (SQLException) r;
00842 }
00843 }
00844
00845 if (failedOnAllBackends != null)
00846 {
00847
00848 AbstractRequest request = new UnknownRequest("rollback", false, 0, "\n");
00849 request.setTransactionId(transactionId);
00850 try
00851 {
00852 dvdb.getMulticastRequestAdapter().multicastMessage(
00853 failedOnAllBackends, new NotifyCompletion(request, success),
00854 MulticastRequestAdapter.WAIT_NONE, rollbackTimeout);
00855 }
00856 catch (Exception e)
00857 {
00858 String msg = "An error occured while notifying all controllers of failure to rollback transaction "
00859 + transactionId;
00860 logger.warn(msg, e);
00861 throw new SQLException(msg + " (" + e + ")");
00862 }
00863 }
00864
00865 if (success)
00866 return;
00867 if (exception != null)
00868 throw exception;
00869
00870 String msg = "Transaction " + transactionId
00871 + " failed to rollback on all controllers";
00872 logger.warn(msg);
00873 throw new SQLException(msg);
00874 }
00875 catch (SQLException e)
00876 {
00877 String msg = "Transaction " + transactionId + " rollback failed (" + e
00878 + ")";
00879 logger.warn(msg);
00880 throw e;
00881 }
00882 }
00883
00884 }