SingleDBDistributedRequestManager.java

説明を見る。
00001 
00025 package org.objectweb.cjdbc.controller.requestmanager.distributed;
00026 
00027 import java.sql.SQLException;
00028 import java.util.Vector;
00029 
00030 import org.jgroups.Address;
00031 import org.jgroups.blocks.GroupRequest;
00032 import org.jgroups.util.RspList;
00033 import org.objectweb.cjdbc.common.i18n.Translate;
00034 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00035 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00036 import org.objectweb.cjdbc.controller.cache.result.AbstractResultCache;
00037 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00038 import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00039 import org.objectweb.cjdbc.controller.recoverylog.AbstractRecoveryLog;
00040 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00041 import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00042 import org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase;
00043 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CJDBCGroupMessage;
00044 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CacheInvalidate;
00045 
00053 public class SingleDBDistributedRequestManager
00054     extends
00055       DistributedRequestManager
00056 {
00057 
00071   public SingleDBDistributedRequestManager(DistributedVirtualDatabase vdb,
00072       AbstractScheduler scheduler, AbstractResultCache cache,
00073       AbstractLoadBalancer loadBalancer, AbstractRecoveryLog recoveryLog,
00074       long beginTimeout, long commitTimeout, long rollbackTimeout)
00075       throws SQLException
00076   {
00077     super(vdb, scheduler, cache, loadBalancer, recoveryLog, beginTimeout,
00078         commitTimeout, rollbackTimeout);
00079   }
00080 
00081   //
00082   // Request handling
00083   //
00084 
00088   public int execDistributedWriteRequest(AbstractWriteRequest request)
00089       throws SQLException
00090   {
00091     try
00092     {
00093       int execWriteRequestResult = -1;
00094 
00095       Vector allButUs = dvdb.getChannel().getView().getMembers();
00096       allButUs.remove(dvdb.getChannel().getLocalAddress());
00097       // TODO: Exclude controllers without cache from the invalidation list
00098 
00099       if (logger.isDebugEnabled())
00100         logger
00101             .debug("Sending cache invalidation to all controllers for request "
00102                 + request.getSQLShortForm(dvdb.getSQLShortFormLength()));
00103       // Send the query to everybody except us
00104       RspList responses = dvdb.getDispatcher().castMessage(allButUs,
00105           CJDBCGroupMessage.getMessage(new CacheInvalidate(request)),
00106           GroupRequest.GET_ALL, request.getTimeout());
00107 
00108       // Execute locally
00109       try
00110       {
00111         scheduleExecWriteRequest((AbstractWriteRequest) request);
00112         execWriteRequestResult = loadBalanceExecWriteRequest((AbstractWriteRequest) request);
00113       }
00114       catch (SQLException e)
00115       {
00116         throw e;
00117       }
00118       catch (RuntimeException re)
00119       {
00120         logger.warn("Error while executing distributed write request", re);
00121         throw new SQLException(re.getMessage());
00122       }
00123       catch (AllBackendsFailedException abe)
00124       {
00125         throw new SQLException(abe.getMessage());
00126       }
00127       finally
00128       {
00129         updateAndNotifyExecWriteRequest((AbstractWriteRequest) request, true);
00130       }
00131       if (logger.isDebugEnabled())
00132         logger.debug("Request "
00133             + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00134             + " completed.");
00135 
00136       if (responses.numSuspectedMembers() > 0)
00137       { // Some controllers failed ... too bad !
00138         logger.warn(responses.numSuspectedMembers()
00139             + " controller(s) died during execution of request "
00140             + request.getSQLShortForm(dvdb.getSQLShortFormLength()));
00141       }
00142 
00143       SQLException exception = null;
00144       Vector suspectedMembers = responses.getSuspectedMembers();
00145       int size = suspectedMembers.size();
00146       // Get the result of each controller
00147       for (int i = 0; i < size; i++)
00148       {
00149         Address address = (Address) suspectedMembers.get(i);
00150         logger.warn("Controller " + address + " is suspected of failure.");
00151         continue;
00152       }
00153 
00154       return execWriteRequestResult;
00155     }
00156     catch (SQLException e)
00157     {
00158       String msg = Translate
00159           .get("loadbalancer.request.failed", new String[]{
00160               request.getSQLShortForm(vdb.getSQLShortFormLength()),
00161               e.getMessage()});
00162       logger.warn(msg);
00163       throw e;
00164     }
00165   }
00166 
00170   public ControllerResultSet execDistributedWriteRequestWithKeys(
00171       AbstractWriteRequest request) throws SQLException
00172   {
00173     try
00174     {
00175       ControllerResultSet execWriteRequestResult = null;
00176 
00177       Vector allButUs = dvdb.getChannel().getView().getMembers();
00178       allButUs.remove(dvdb.getChannel().getLocalAddress());
00179       // TODO: Exclude controllers without cache from the invalidation list
00180 
00181       if (logger.isDebugEnabled())
00182         logger
00183             .debug("Sending cache invalidation to all controllers for request "
00184                 + request.getSQLShortForm(dvdb.getSQLShortFormLength()));
00185       // Send the query to everybody except us
00186       RspList responses = dvdb.getDispatcher().castMessage(allButUs,
00187           CJDBCGroupMessage.getMessage(new CacheInvalidate(request)),
00188           GroupRequest.GET_ALL, request.getTimeout());
00189 
00190       // Execute locally
00191       try
00192       {
00193         scheduleExecWriteRequest((AbstractWriteRequest) request);
00194         execWriteRequestResult = loadBalanceExecWriteRequestWithKeys((AbstractWriteRequest) request);
00195       }
00196       catch (SQLException e)
00197       {
00198         throw e;
00199       }
00200       catch (RuntimeException re)
00201       {
00202         logger.warn("Error while executing distributed write request", re);
00203         throw new SQLException(re.getMessage());
00204       }
00205       catch (AllBackendsFailedException abe)
00206       {
00207         throw new SQLException(abe.getMessage());
00208       }
00209       finally
00210       {
00211         updateAndNotifyExecWriteRequest((AbstractWriteRequest) request, true);
00212       }
00213       if (logger.isDebugEnabled())
00214         logger.debug("Request "
00215             + request.getSQLShortForm(dvdb.getSQLShortFormLength())
00216             + " completed.");
00217 
00218       if (responses.numSuspectedMembers() > 0)
00219       { // Some controllers failed ... too bad !
00220         logger.warn(responses.numSuspectedMembers()
00221             + " controller(s) died during execution of request "
00222             + request.getSQLShortForm(dvdb.getSQLShortFormLength()));
00223       }
00224 
00225       SQLException exception = null;
00226       Vector suspectedMembers = responses.getSuspectedMembers();
00227       int size = suspectedMembers.size();
00228       // Get the result of each controller
00229       for (int i = 0; i < size; i++)
00230       {
00231         Address address = (Address) suspectedMembers.get(i);
00232         logger.warn("Controller " + address + " is suspected of failure.");
00233         continue;
00234       }
00235 
00236       return execWriteRequestResult;
00237     }
00238     catch (SQLException e)
00239     {
00240       String msg = "Request '" + request + "' failed (" + e + ")";
00241       logger.warn(msg);
00242       throw e;
00243     }
00244   }
00245 
00249   public int execDistributedWriteStoredProcedure(StoredProcedure proc)
00250       throws SQLException
00251   {
00252     // TODO Auto-generated method stub
00253     return 0;
00254   }
00255 
00259   public void distributedCommit(long transactionId) throws SQLException
00260   {
00261     // There is a little subtlety here. We call
00262     // DistributedRequestManager.commit() but as the transaction has already
00263     // been removed from the writeTransaction list, it will execute as a local
00264     // commit.
00265     super.commit(transactionId);
00266   }
00267 
00271   public void distributedRollback(long transactionId) throws SQLException
00272   {
00273     // There is a little subtlety here. We call
00274     // DistributedRequestManager.rollback() but as the transaction has already
00275     // been removed from the writeTransaction list, it will execute as a local
00276     // rollback.
00277     super.rollback(transactionId);
00278   }
00279 }

CJDBCversion1.0rcfに対してWed Jun 23 16:01:25 2004に生成されました。 doxygen 1.3.6