00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.requestmanager.distributed;
00026
00027 import java.sql.ResultSet;
00028 import java.sql.SQLException;
00029 import java.util.ArrayList;
00030
00031 import javax.management.NotCompliantMBeanException;
00032
00033 import org.objectweb.cjdbc.common.i18n.Translate;
00034 import org.objectweb.cjdbc.common.sql.AbstractRequest;
00035 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00036 import org.objectweb.cjdbc.common.sql.SelectRequest;
00037 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00038 import org.objectweb.cjdbc.common.sql.UnknownRequest;
00039 import org.objectweb.cjdbc.controller.cache.result.AbstractResultCache;
00040 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00041 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00042 import org.objectweb.cjdbc.controller.recoverylog.AbstractRecoveryLog;
00043 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00044 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00045 import org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase;
00046 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Commit;
00047 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecReadRequest;
00048 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecReadStoredProcedure;
00049 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecWriteRequest;
00050 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecWriteRequestWithKeys;
00051 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ExecWriteStoredProcedure;
00052 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.NotifyCompletion;
00053 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Rollback;
00054 import org.objectweb.tribe.adapters.MulticastRequestAdapter;
00055 import org.objectweb.tribe.adapters.MulticastResponse;
00056 import org.objectweb.tribe.common.Member;
00057
00058
00059
00060
00061
00062
00063
00064 public class RAIDb1DistributedRequestManager extends DistributedRequestManager
00065 {
00066
00067 private static final int NO_RESULT = -5;
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083 public RAIDb1DistributedRequestManager(DistributedVirtualDatabase vdb,
00084 AbstractScheduler scheduler, AbstractResultCache cache,
00085 AbstractLoadBalancer loadBalancer, AbstractRecoveryLog recoveryLog,
00086 long beginTimeout, long commitTimeout, long rollbackTimeout)
00087 throws SQLException, NotCompliantMBeanException
00088 {
00089 super(vdb, scheduler, cache, loadBalancer, recoveryLog, beginTimeout,
00090 commitTimeout, rollbackTimeout);
00091 }
00092
00093
00094
00095
00096
00097 public ControllerResultSet execRemoteReadRequest(long controllerUniqueId,
00098 SelectRequest request) throws SQLException
00099 {
00100 try
00101 {
00102
00103
00104 Member nextController = (Member) dvdb.getAllMemberButUs().get(0);
00105 ArrayList groupMembers = new ArrayList();
00106 groupMembers.add(nextController);
00107
00108 if (logger.isDebugEnabled())
00109 logger.debug("Sending request "
00110 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00111 + (request.isAutoCommit() ? "" : " transaction "
00112 + request.getTransactionId()) + " to next controller ("
00113 + groupMembers.get(0) + ")");
00114
00115
00116 MulticastResponse responses;
00117 responses = dvdb.getMulticastRequestAdapter().multicastMessage(
00118 groupMembers, new ExecReadRequest(getControllerId(), request),
00119 MulticastRequestAdapter.WAIT_ALL, request.getTimeout());
00120
00121 if (logger.isDebugEnabled())
00122 logger.debug("Request "
00123 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00124 + " completed.");
00125
00126 return (ControllerResultSet) responses.getResult(nextController);
00127 }
00128 catch (Exception e)
00129 {
00130 String msg = "An error occured while executing remote select request "
00131 + request.getId();
00132 logger.warn(msg, e);
00133 throw new SQLException(msg + " (" + e + ")");
00134 }
00135 }
00136
00137
00138
00139
00140 public int execDistributedWriteRequest(AbstractWriteRequest request)
00141 throws SQLException
00142 {
00143 try
00144 {
00145 int execWriteRequestResult = -1;
00146
00147 ArrayList groupMembers = dvdb.getCurrentGroup().getMembers();
00148
00149 if (logger.isDebugEnabled())
00150 logger.debug("Broadcasting request "
00151 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00152 + (request.isAutoCommit() ? "" : " transaction "
00153 + request.getTransactionId()) + " to all controllers ("
00154 + dvdb.getChannel().getLocalMembership() + "->"
00155 + groupMembers.toString() + ")");
00156
00157
00158 MulticastResponse responses;
00159 try
00160 {
00161 responses = dvdb.getMulticastRequestAdapter().multicastMessage(
00162 groupMembers, new ExecWriteRequest(request),
00163 MulticastRequestAdapter.WAIT_ALL, request.getTimeout());
00164 }
00165 catch (Exception e)
00166 {
00167 String msg = "An error occured while executing distributed write request "
00168 + request.getId();
00169 logger.warn(msg, e);
00170 throw new SQLException(msg + " (" + e + ")");
00171 }
00172
00173 if (logger.isDebugEnabled())
00174 logger.debug("Request "
00175 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00176 + " completed.");
00177
00178 if (responses.getFailedMembers() != null)
00179 {
00180 logger.warn(responses.getFailedMembers().size()
00181 + " controller(s) died during execution of request "
00182 + request.getId());
00183 }
00184
00185
00186 ArrayList failedOnAllBackends = null;
00187 SQLException exception = null;
00188 int size = groupMembers.size();
00189
00190 for (int i = 0; i < size; i++)
00191 {
00192 Member member = (Member) groupMembers.get(i);
00193 if ((responses.getFailedMembers() != null)
00194 && responses.getFailedMembers().contains(member))
00195 {
00196 logger.warn("Controller " + member + " is suspected of failure.");
00197 continue;
00198 }
00199 Object r = responses.getResult(member);
00200 if (r instanceof Integer)
00201 {
00202 if (execWriteRequestResult == -1)
00203 execWriteRequestResult = ((Integer) r).intValue();
00204 else if (execWriteRequestResult != ((Integer) r).intValue())
00205 logger.error("Controllers have different results for request "
00206 + request.getId());
00207 }
00208 else if (r instanceof AllBackendsFailedException)
00209 {
00210 if (failedOnAllBackends == null)
00211 failedOnAllBackends = new ArrayList();
00212 failedOnAllBackends.add(member);
00213 if (logger.isDebugEnabled())
00214 logger.debug("Request failed on all backends of controller "
00215 + member + " (" + r + ")");
00216 }
00217 else if (r instanceof SQLException)
00218 {
00219 String msg = "Request " + request.getId() + " failed on controller "
00220 + member + " (" + r + ")";
00221 logger.warn(msg);
00222 exception = (SQLException) r;
00223 }
00224 }
00225
00226 if (failedOnAllBackends != null)
00227 {
00228 try
00229 {
00230 dvdb.getMulticastRequestAdapter().multicastMessage(
00231 failedOnAllBackends,
00232 new NotifyCompletion(request, execWriteRequestResult != -1),
00233 MulticastRequestAdapter.WAIT_NONE, request.getTimeout());
00234 }
00235 catch (Exception e)
00236 {
00237 String msg = "An error occured while notifying all controllers of failure of distributed write request "
00238 + request.getId();
00239 logger.warn(msg, e);
00240 throw new SQLException(msg + " (" + e + ")");
00241 }
00242 }
00243
00244 if (execWriteRequestResult != -1)
00245 {
00246 if (logger.isDebugEnabled())
00247 logger.debug("Request " + request.getId()
00248 + " completed successfully.");
00249 return execWriteRequestResult;
00250 }
00251 else if (exception != null)
00252 throw exception;
00253
00254
00255 String msg = "Request '" + request + "' failed on all controllers";
00256 logger.warn(msg);
00257 throw new SQLException(msg);
00258 }
00259 catch (SQLException e)
00260 {
00261 String msg = Translate
00262 .get("loadbalancer.request.failed", new String[]{
00263 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00264 e.getMessage()});
00265 logger.warn(msg);
00266 throw e;
00267 }
00268 }
00269
00270
00271
00272
00273 public ControllerResultSet execDistributedWriteRequestWithKeys(
00274 AbstractWriteRequest request) throws SQLException
00275 {
00276 try
00277 {
00278 ControllerResultSet execWriteRequestResult = null;
00279
00280 ArrayList groupMembers = dvdb.getCurrentGroup().getMembers();
00281
00282 if (logger.isDebugEnabled())
00283 logger.debug("Broadcasting request "
00284 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00285 + (request.isAutoCommit() ? "" : " transaction "
00286 + request.getTransactionId()) + ") to all controllers ("
00287 + dvdb.getChannel().getLocalMembership() + "->"
00288 + groupMembers.toString() + ")");
00289
00290
00291 MulticastResponse responses;
00292 try
00293 {
00294 responses = dvdb.getMulticastRequestAdapter().multicastMessage(
00295 groupMembers, new ExecWriteRequestWithKeys(request),
00296 MulticastRequestAdapter.WAIT_ALL, request.getTimeout());
00297 }
00298 catch (Exception e)
00299 {
00300 String msg = "An error occured while executing distributed write request with keys "
00301 + request.getId();
00302 logger.warn(msg, e);
00303 throw new SQLException(msg + " (" + e + ")");
00304 }
00305
00306 if (logger.isDebugEnabled())
00307 logger.debug("Request "
00308 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00309 + " completed.");
00310
00311 if (responses.getFailedMembers() != null)
00312 {
00313 logger.warn(responses.getFailedMembers().size()
00314 + " controller(s) died during execution of request "
00315 + request.getId());
00316 }
00317
00318
00319 ArrayList failedOnAllBackends = null;
00320 SQLException exception = null;
00321 int size = groupMembers.size();
00322
00323 for (int i = 0; i < size; i++)
00324 {
00325 Member member = (Member) groupMembers.get(i);
00326 if ((responses.getFailedMembers() != null)
00327 && responses.getFailedMembers().contains(member))
00328 {
00329 logger.warn("Controller " + member + " is suspected of failure.");
00330 continue;
00331 }
00332 Object r = responses.getResult(member);
00333 if (r instanceof ResultSet)
00334 {
00335 if (execWriteRequestResult == null)
00336 execWriteRequestResult = (ControllerResultSet) r;
00337 }
00338 else if (r instanceof AllBackendsFailedException)
00339 {
00340 if (failedOnAllBackends == null)
00341 failedOnAllBackends = new ArrayList();
00342 failedOnAllBackends.add(member);
00343 if (logger.isDebugEnabled())
00344 logger.debug("Request failed on all backends of controller "
00345 + member + " (" + r + ")");
00346 }
00347 else if (r instanceof SQLException)
00348 {
00349 String msg = "Request " + request.getId() + " failed on controller "
00350 + member + " (" + r + ")";
00351 logger.warn(msg);
00352 exception = (SQLException) r;
00353 }
00354 }
00355
00356 if (failedOnAllBackends != null)
00357 {
00358 try
00359 {
00360 dvdb.getMulticastRequestAdapter().multicastMessage(
00361 failedOnAllBackends,
00362 new NotifyCompletion(request, execWriteRequestResult != null),
00363 MulticastRequestAdapter.WAIT_NONE, request.getTimeout());
00364 }
00365 catch (Exception e)
00366 {
00367 String msg = "An error occured while notifying all controllers of failure of distributed write request with keys "
00368 + request.getId();
00369 logger.warn(msg, e);
00370 throw new SQLException(msg + " (" + e + ")");
00371 }
00372 }
00373
00374 if (execWriteRequestResult != null)
00375 {
00376 if (logger.isDebugEnabled())
00377 logger.debug("Request " + request.getId()
00378 + " completed successfully.");
00379 return execWriteRequestResult;
00380 }
00381 else if (exception != null)
00382 throw exception;
00383
00384 String msg = "Request '" + request + "' failed on all controllers";
00385 logger.warn(msg);
00386 throw new SQLException(msg);
00387 }
00388 catch (SQLException e)
00389 {
00390 String msg = Translate
00391 .get("loadbalancer.request.failed", new String[]{
00392 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00393 e.getMessage()});
00394 logger.warn(msg);
00395 throw e;
00396 }
00397 }
00398
00399
00400
00401
00402 public ControllerResultSet execDistributedReadStoredProcedure(
00403 StoredProcedure proc) throws SQLException
00404 {
00405 try
00406 {
00407 ControllerResultSet result = null;
00408
00409 ArrayList groupMembers = dvdb.getCurrentGroup().getMembers();
00410
00411 if (logger.isDebugEnabled())
00412 logger.debug("Broadcasting read stored procedure "
00413 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00414 + (proc.isAutoCommit() ? "" : " transaction "
00415 + proc.getTransactionId()) + ") to all controllers ("
00416 + dvdb.getChannel().getLocalMembership() + "->"
00417 + groupMembers.toString() + ")");
00418
00419
00420 MulticastResponse responses;
00421 try
00422 {
00423 responses = dvdb.getMulticastRequestAdapter().multicastMessage(
00424 groupMembers, new ExecReadStoredProcedure(proc),
00425 MulticastRequestAdapter.WAIT_ALL, proc.getTimeout());
00426 }
00427 catch (Exception e)
00428 {
00429 String msg = "An error occured while executing distributed read stored procedure "
00430 + proc.getId();
00431 logger.warn(msg, e);
00432 throw new SQLException(msg + " (" + e + ")");
00433 }
00434
00435 if (logger.isDebugEnabled())
00436 logger.debug("Stored procedure "
00437 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00438 + " completed.");
00439
00440 if (responses.getFailedMembers() != null)
00441 {
00442 logger.warn(responses.getFailedMembers().size()
00443 + " controller(s) died during execution of stored procedure "
00444 + proc.getId());
00445 }
00446
00447
00448 ArrayList failedOnAllBackends = null;
00449 SQLException exception = null;
00450 int size = groupMembers.size();
00451
00452 for (int i = 0; i < size; i++)
00453 {
00454 Member member = (Member) groupMembers.get(i);
00455 if ((responses.getFailedMembers() != null)
00456 && responses.getFailedMembers().contains(member))
00457 {
00458 logger.warn("Controller " + member + " is suspected of failure.");
00459 continue;
00460 }
00461 Object r = responses.getResult(member);
00462 if (r instanceof ControllerResultSet)
00463 {
00464 if (result == null)
00465 result = (ControllerResultSet) r;
00466 }
00467 else if (r instanceof AllBackendsFailedException)
00468 {
00469 if (failedOnAllBackends == null)
00470 failedOnAllBackends = new ArrayList();
00471 failedOnAllBackends.add(member);
00472 }
00473 else if (r instanceof SQLException)
00474 {
00475 String msg = "Stored procedure " + proc.getId()
00476 + " failed on controller " + member + " (" + r + ")";
00477 logger.warn(msg);
00478 exception = (SQLException) r;
00479 }
00480 }
00481
00482 if (failedOnAllBackends != null)
00483 {
00484 try
00485 {
00486 dvdb.getMulticastRequestAdapter().multicastMessage(
00487 failedOnAllBackends, new NotifyCompletion(proc, result != null),
00488 MulticastRequestAdapter.WAIT_NONE, proc.getTimeout());
00489 }
00490 catch (Exception e)
00491 {
00492 String msg = "An error occured while notifying all controllers of failure of read stored procedure "
00493 + proc.getId();
00494 logger.warn(msg, e);
00495 throw new SQLException(msg + " (" + e + ")");
00496 }
00497 }
00498
00499 if (result != null)
00500 {
00501 if (logger.isDebugEnabled())
00502 logger.debug("Stored procedure " + proc.getId()
00503 + " completed successfully.");
00504 return result;
00505 }
00506 else if (exception != null)
00507 throw exception;
00508
00509
00510 String msg = "Stored procedure '" + proc + "' failed on all controllers";
00511 logger.warn(msg);
00512 throw new SQLException(msg);
00513 }
00514 catch (SQLException e)
00515 {
00516 String msg = Translate.get("loadbalancer.request.failed", new String[]{
00517 proc.getSQLShortForm(vdb.getSQLShortFormLength()), e.getMessage()});
00518 logger.warn(msg);
00519 throw e;
00520 }
00521 }
00522
00523
00524
00525
00526 public int execDistributedWriteStoredProcedure(StoredProcedure proc)
00527 throws SQLException
00528 {
00529 try
00530 {
00531 int execWriteStoredProcedureResult = NO_RESULT;
00532
00533 ArrayList groupMembers = dvdb.getCurrentGroup().getMembers();
00534
00535 if (logger.isDebugEnabled())
00536 logger.debug("Broadcasting write store procedure "
00537 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00538 + (proc.isAutoCommit() ? "" : " transaction "
00539 + proc.getTransactionId()) + ") to all controllers ("
00540 + dvdb.getChannel().getLocalMembership() + "->"
00541 + groupMembers.toString() + ")");
00542
00543
00544 MulticastResponse responses;
00545 try
00546 {
00547 responses = dvdb.getMulticastRequestAdapter().multicastMessage(
00548 groupMembers, new ExecWriteStoredProcedure(proc),
00549 MulticastRequestAdapter.WAIT_ALL, proc.getTimeout());
00550 }
00551 catch (Exception e)
00552 {
00553 String msg = "An error occured while executing distributed write stored procedure "
00554 + proc.getId();
00555 logger.warn(msg, e);
00556 throw new SQLException(msg + " (" + e + ")");
00557 }
00558
00559 if (logger.isDebugEnabled())
00560 logger.debug("Stored procedure "
00561 + proc.getSQLShortForm(dvdb.getSQLShortFormLength())
00562 + " completed.");
00563
00564 if (responses.getFailedMembers() != null)
00565 {
00566 logger.warn(responses.getFailedMembers().size()
00567 + " controller(s) died during execution of stored procedure "
00568 + proc.getId());
00569 }
00570
00571
00572 ArrayList failedOnAllBackends = null;
00573 SQLException exception = null;
00574 int size = groupMembers.size();
00575
00576 for (int i = 0; i < size; i++)
00577 {
00578 Member member = (Member) groupMembers.get(i);
00579 if ((responses.getFailedMembers() != null)
00580 && responses.getFailedMembers().contains(member))
00581 {
00582 logger.warn("Controller " + member + " is suspected of failure.");
00583 continue;
00584 }
00585 Object r = responses.getResult(member);
00586 if (r instanceof Integer)
00587 {
00588 if (execWriteStoredProcedureResult == NO_RESULT)
00589 execWriteStoredProcedureResult = ((Integer) r).intValue();
00590 }
00591 else if (r instanceof AllBackendsFailedException)
00592 {
00593 if (failedOnAllBackends == null)
00594 failedOnAllBackends = new ArrayList();
00595 failedOnAllBackends.add(member);
00596 }
00597 else if (r instanceof SQLException)
00598 {
00599 String msg = "Stored procedure " + proc.getId()
00600 + " failed on controller " + member + " (" + r + ")";
00601 logger.warn(msg);
00602 exception = (SQLException) r;
00603 }
00604 }
00605
00606 if (failedOnAllBackends != null)
00607 {
00608 try
00609 {
00610 dvdb.getMulticastRequestAdapter().multicastMessage(
00611 failedOnAllBackends,
00612 new NotifyCompletion(proc,
00613 execWriteStoredProcedureResult != NO_RESULT),
00614 MulticastRequestAdapter.WAIT_NONE, proc.getTimeout());
00615 }
00616 catch (Exception e)
00617 {
00618 String msg = "An error occured while notifying all controllers of failure of write stored procedure "
00619 + proc.getId();
00620 logger.warn(msg, e);
00621 throw new SQLException(msg + " (" + e + ")");
00622 }
00623 }
00624
00625 if (execWriteStoredProcedureResult != NO_RESULT)
00626 {
00627 if (logger.isDebugEnabled())
00628 logger.debug("Stored procedure " + proc.getId()
00629 + " completed successfully.");
00630 return execWriteStoredProcedureResult;
00631 }
00632 else if (exception != null)
00633 throw exception;
00634
00635
00636 String msg = "Stored procedure '" + proc + "' failed on all controllers";
00637 logger.warn(msg);
00638 throw new SQLException(msg);
00639 }
00640 catch (SQLException e)
00641 {
00642 String msg = Translate.get("loadbalancer.request.failed", new String[]{
00643 proc.getSQLShortForm(vdb.getSQLShortFormLength()), e.getMessage()});
00644 logger.warn(msg);
00645 throw e;
00646 }
00647 }
00648
00649
00650
00651
00652 public void distributedCommit(long transactionId) throws SQLException
00653 {
00654 try
00655 {
00656 ArrayList groupMembers = dvdb.getCurrentGroup().getMembers();
00657 if (logger.isDebugEnabled())
00658 logger.debug("Broadcasting transaction " + transactionId
00659 + " commit to all controllers ("
00660 + dvdb.getChannel().getLocalMembership() + "->"
00661 + groupMembers.toString() + ")");
00662
00663
00664 MulticastResponse responses;
00665 try
00666 {
00667 responses = dvdb.getMulticastRequestAdapter().multicastMessage(
00668 groupMembers, new Commit(transactionId),
00669 MulticastRequestAdapter.WAIT_ALL, this.commitTimeout);
00670 }
00671 catch (Exception e)
00672 {
00673 String msg = "An error occured while executing distributed rollback for transaction "
00674 + transactionId;
00675 logger.warn(msg, e);
00676 throw new SQLException(msg + "(" + e + ")");
00677 }
00678
00679 if (logger.isDebugEnabled())
00680 logger.debug("Commit of transaction " + transactionId + " completed.");
00681
00682 if (responses.getFailedMembers() != null)
00683 {
00684 logger.warn(responses.getFailedMembers().size()
00685 + " controller(s) died during execution of commit for transaction "
00686 + transactionId);
00687 }
00688
00689
00690 ArrayList failedOnAllBackends = null;
00691 SQLException exception = null;
00692 int size = groupMembers.size();
00693 boolean success = false;
00694
00695 for (int i = 0; i < size; i++)
00696 {
00697 Member member = (Member) groupMembers.get(i);
00698 if ((responses.getFailedMembers() != null)
00699 && responses.getFailedMembers().contains(member))
00700 {
00701 logger.warn("Controller " + member + " is suspected of failure.");
00702 continue;
00703 }
00704 Object r = responses.getResult(member);
00705 if (r instanceof Boolean)
00706 {
00707 if (((Boolean) r).booleanValue())
00708 success = true;
00709 else
00710 logger.error("Unexpected result for controller " + member);
00711 }
00712 else if (r instanceof AllBackendsFailedException)
00713 {
00714 if (failedOnAllBackends == null)
00715 failedOnAllBackends = new ArrayList();
00716 failedOnAllBackends.add(member);
00717 if (logger.isDebugEnabled())
00718 logger.debug("Commit failed on all backends of controller "
00719 + member + " (" + r + ")");
00720 }
00721 else if (r instanceof SQLException)
00722 {
00723 String msg = "Commit of transaction " + transactionId
00724 + " failed on controller " + member + " (" + r + ")";
00725 logger.warn(msg);
00726 exception = (SQLException) r;
00727 }
00728 }
00729
00730 if (failedOnAllBackends != null)
00731 {
00732
00733 AbstractRequest request = new UnknownRequest("commit", false, 0, "\n");
00734 request.setTransactionId(transactionId);
00735 try
00736 {
00737 dvdb.getMulticastRequestAdapter().multicastMessage(
00738 failedOnAllBackends, new NotifyCompletion(request, success),
00739 MulticastRequestAdapter.WAIT_NONE, commitTimeout);
00740 }
00741 catch (Exception e)
00742 {
00743 String msg = "An error occured while notifying all controllers of failure to commit transaction "
00744 + transactionId;
00745 logger.warn(msg, e);
00746 throw new SQLException(msg + " (" + e + ")");
00747 }
00748 }
00749
00750 if (success)
00751 return;
00752
00753 if (exception != null)
00754 throw exception;
00755
00756 String msg = "Transaction " + transactionId
00757 + " failed to commit on all controllers";
00758 logger.warn(msg);
00759 throw new SQLException(msg);
00760 }
00761 catch (SQLException e)
00762 {
00763 String msg = "Transaction " + transactionId + " commit failed (" + e
00764 + ")";
00765 logger.warn(msg);
00766 throw e;
00767 }
00768 }
00769
00770
00771
00772
00773 public void distributedRollback(long transactionId) throws SQLException
00774 {
00775 try
00776 {
00777 ArrayList groupMembers = dvdb.getCurrentGroup().getMembers();
00778 if (logger.isDebugEnabled())
00779 logger.debug("Broadcasting transaction " + transactionId
00780 + " rollback to all controllers ("
00781 + dvdb.getChannel().getLocalMembership() + "->"
00782 + groupMembers.toString() + ")");
00783
00784
00785 MulticastResponse responses;
00786 try
00787 {
00788 responses = dvdb.getMulticastRequestAdapter().multicastMessage(
00789 groupMembers, new Rollback(transactionId),
00790 MulticastRequestAdapter.WAIT_ALL, this.rollbackTimeout);
00791 }
00792 catch (Exception e)
00793 {
00794 String msg = "An error occured while executing distributed rollback for transaction "
00795 + transactionId;
00796 logger.warn(msg, e);
00797 throw new SQLException(msg + "(" + e + ")");
00798 }
00799
00800 if (logger.isDebugEnabled())
00801 logger
00802 .debug("rollback of transaction " + transactionId + " completed.");
00803
00804 if (responses.getFailedMembers() != null)
00805 {
00806 logger
00807 .warn(responses.getFailedMembers().size()
00808 + " controller(s) died during execution of rollback for transaction "
00809 + transactionId);
00810 }
00811
00812
00813 ArrayList failedOnAllBackends = null;
00814 SQLException exception = null;
00815 int size = groupMembers.size();
00816 boolean success = false;
00817
00818 for (int i = 0; i < size; i++)
00819 {
00820 Member member = (Member) groupMembers.get(i);
00821 if ((responses.getFailedMembers() != null)
00822 && responses.getFailedMembers().contains(member))
00823 {
00824 logger.warn("Controller " + member + " is suspected of failure.");
00825 continue;
00826 }
00827 Object r = responses.getResult(member);
00828 if (r instanceof Boolean)
00829 {
00830 if (((Boolean) r).booleanValue())
00831 success = true;
00832 else
00833 logger.error("Unexpected result for controller " + member);
00834 }
00835 else if (r instanceof AllBackendsFailedException)
00836 {
00837 if (failedOnAllBackends == null)
00838 failedOnAllBackends = new ArrayList();
00839 failedOnAllBackends.add(member);
00840 if (logger.isDebugEnabled())
00841 logger.debug("rollback failed on all backends of controller "
00842 + member + " (" + r + ")");
00843 }
00844 else if (r instanceof SQLException)
00845 {
00846 String msg = "rollback of transaction " + transactionId
00847 + " failed on controller " + member + " (" + r + ")";
00848 logger.warn(msg);
00849 exception = (SQLException) r;
00850 }
00851 }
00852
00853 if (failedOnAllBackends != null)
00854 {
00855
00856 AbstractRequest request = new UnknownRequest("rollback", false, 0, "\n");
00857 request.setTransactionId(transactionId);
00858 try
00859 {
00860 dvdb.getMulticastRequestAdapter().multicastMessage(
00861 failedOnAllBackends, new NotifyCompletion(request, success),
00862 MulticastRequestAdapter.WAIT_NONE, rollbackTimeout);
00863 }
00864 catch (Exception e)
00865 {
00866 String msg = "An error occured while notifying all controllers of failure to rollback transaction "
00867 + transactionId;
00868 logger.warn(msg, e);
00869 throw new SQLException(msg + " (" + e + ")");
00870 }
00871 }
00872
00873 if (success)
00874 return;
00875
00876 if (exception != null)
00877 throw exception;
00878
00879
00880 String msg = "Transaction " + transactionId
00881 + " failed to rollback on all controllers";
00882 logger.warn(msg);
00883 throw new SQLException(msg);
00884 }
00885 catch (SQLException e)
00886 {
00887 String msg = "Transaction " + transactionId + " rollback failed (" + e
00888 + ")";
00889 logger.warn(msg);
00890 throw e;
00891 }
00892 }
00893
00894 }