00001
00025 package org.objectweb.cjdbc.controller.requestmanager.distributed;
00026
00027 import java.sql.SQLException;
00028 import java.util.Vector;
00029
00030 import org.jgroups.Address;
00031 import org.jgroups.blocks.GroupRequest;
00032 import org.jgroups.util.RspList;
00033 import org.objectweb.cjdbc.common.i18n.Translate;
00034 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00035 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00036 import org.objectweb.cjdbc.controller.cache.result.AbstractResultCache;
00037 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00038 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00039 import org.objectweb.cjdbc.controller.recoverylog.AbstractRecoveryLog;
00040 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00041 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00042 import org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase;
00043 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CJDBCGroupMessage;
00044 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CacheInvalidate;
00045
00053 public class SingleDBDistributedRequestManager
00054 extends
00055 DistributedRequestManager
00056 {
00057
00071 public SingleDBDistributedRequestManager(DistributedVirtualDatabase vdb,
00072 AbstractScheduler scheduler, AbstractResultCache cache,
00073 AbstractLoadBalancer loadBalancer, AbstractRecoveryLog recoveryLog,
00074 long beginTimeout, long commitTimeout, long rollbackTimeout)
00075 throws SQLException
00076 {
00077 super(vdb, scheduler, cache, loadBalancer, recoveryLog, beginTimeout,
00078 commitTimeout, rollbackTimeout);
00079 }
00080
00081
00082
00083
00084
00088 public int execDistributedWriteRequest(AbstractWriteRequest request)
00089 throws SQLException
00090 {
00091 try
00092 {
00093 int execWriteRequestResult = -1;
00094
00095 Vector allButUs = dvdb.getChannel().getView().getMembers();
00096 allButUs.remove(dvdb.getChannel().getLocalAddress());
00097
00098
00099 if (logger.isDebugEnabled())
00100 logger
00101 .debug("Sending cache invalidation to all controllers for request "
00102 + request.getSQLShortForm(dvdb.getSQLShortFormLength()));
00103
00104 RspList responses = dvdb.getDispatcher().castMessage(allButUs,
00105 CJDBCGroupMessage.getMessage(new CacheInvalidate(request)),
00106 GroupRequest.GET_ALL, request.getTimeout());
00107
00108
00109 try
00110 {
00111 scheduleExecWriteRequest((AbstractWriteRequest) request);
00112 execWriteRequestResult = loadBalanceExecWriteRequest((AbstractWriteRequest) request);
00113 }
00114 catch (SQLException e)
00115 {
00116 throw e;
00117 }
00118 catch (RuntimeException re)
00119 {
00120 logger.warn("Error while executing distributed write request", re);
00121 throw new SQLException(re.getMessage());
00122 }
00123 catch (AllBackendsFailedException abe)
00124 {
00125 throw new SQLException(abe.getMessage());
00126 }
00127 finally
00128 {
00129 updateAndNotifyExecWriteRequest((AbstractWriteRequest) request, true);
00130 }
00131 if (logger.isDebugEnabled())
00132 logger.debug("Request "
00133 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00134 + " completed.");
00135
00136 if (responses.numSuspectedMembers() > 0)
00137 {
00138 logger.warn(responses.numSuspectedMembers()
00139 + " controller(s) died during execution of request "
00140 + request.getSQLShortForm(dvdb.getSQLShortFormLength()));
00141 }
00142
00143 SQLException exception = null;
00144 Vector suspectedMembers = responses.getSuspectedMembers();
00145 int size = suspectedMembers.size();
00146
00147 for (int i = 0; i < size; i++)
00148 {
00149 Address address = (Address) suspectedMembers.get(i);
00150 logger.warn("Controller " + address + " is suspected of failure.");
00151 continue;
00152 }
00153
00154 return execWriteRequestResult;
00155 }
00156 catch (SQLException e)
00157 {
00158 String msg = Translate
00159 .get("loadbalancer.request.failed", new String[]{
00160 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00161 e.getMessage()});
00162 logger.warn(msg);
00163 throw e;
00164 }
00165 }
00166
00170 public ControllerResultSet execDistributedWriteRequestWithKeys(
00171 AbstractWriteRequest request) throws SQLException
00172 {
00173 try
00174 {
00175 ControllerResultSet execWriteRequestResult = null;
00176
00177 Vector allButUs = dvdb.getChannel().getView().getMembers();
00178 allButUs.remove(dvdb.getChannel().getLocalAddress());
00179
00180
00181 if (logger.isDebugEnabled())
00182 logger
00183 .debug("Sending cache invalidation to all controllers for request "
00184 + request.getSQLShortForm(dvdb.getSQLShortFormLength()));
00185
00186 RspList responses = dvdb.getDispatcher().castMessage(allButUs,
00187 CJDBCGroupMessage.getMessage(new CacheInvalidate(request)),
00188 GroupRequest.GET_ALL, request.getTimeout());
00189
00190
00191 try
00192 {
00193 scheduleExecWriteRequest((AbstractWriteRequest) request);
00194 execWriteRequestResult = loadBalanceExecWriteRequestWithKeys((AbstractWriteRequest) request);
00195 }
00196 catch (SQLException e)
00197 {
00198 throw e;
00199 }
00200 catch (RuntimeException re)
00201 {
00202 logger.warn("Error while executing distributed write request", re);
00203 throw new SQLException(re.getMessage());
00204 }
00205 catch (AllBackendsFailedException abe)
00206 {
00207 throw new SQLException(abe.getMessage());
00208 }
00209 finally
00210 {
00211 updateAndNotifyExecWriteRequest((AbstractWriteRequest) request, true);
00212 }
00213 if (logger.isDebugEnabled())
00214 logger.debug("Request "
00215 + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00216 + " completed.");
00217
00218 if (responses.numSuspectedMembers() > 0)
00219 {
00220 logger.warn(responses.numSuspectedMembers()
00221 + " controller(s) died during execution of request "
00222 + request.getSQLShortForm(dvdb.getSQLShortFormLength()));
00223 }
00224
00225 SQLException exception = null;
00226 Vector suspectedMembers = responses.getSuspectedMembers();
00227 int size = suspectedMembers.size();
00228
00229 for (int i = 0; i < size; i++)
00230 {
00231 Address address = (Address) suspectedMembers.get(i);
00232 logger.warn("Controller " + address + " is suspected of failure.");
00233 continue;
00234 }
00235
00236 return execWriteRequestResult;
00237 }
00238 catch (SQLException e)
00239 {
00240 String msg = "Request '" + request + "' failed (" + e + ")";
00241 logger.warn(msg);
00242 throw e;
00243 }
00244 }
00245
00249 public int execDistributedWriteStoredProcedure(StoredProcedure proc)
00250 throws SQLException
00251 {
00252
00253 return 0;
00254 }
00255
00259 public void distributedCommit(long transactionId) throws SQLException
00260 {
00261
00262
00263
00264
00265 super.commit(transactionId);
00266 }
00267
00271 public void distributedRollback(long transactionId) throws SQLException
00272 {
00273
00274
00275
00276
00277 super.rollback(transactionId);
00278 }
00279 }