00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.loadbalancer.tasks;
00026
00027 import java.sql.Connection;
00028 import java.sql.SQLException;
00029
00030 import org.objectweb.cjdbc.common.exceptions.NoTransactionStartWhenDisablingException;
00031 import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
00032 import org.objectweb.cjdbc.common.log.Trace;
00033 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00034 import org.objectweb.cjdbc.common.sql.CreateRequest;
00035 import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema;
00036 import org.objectweb.cjdbc.common.sql.schema.DatabaseTable;
00037 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00038 import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
00039 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00040 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050 public class WriteRequestTask extends AbstractTask
00051 {
00052 private AbstractWriteRequest request;
00053 private int result;
00054
00055
00056
00057
00058
00059
00060
00061
00062 public WriteRequestTask(int nbToComplete, int totalNb,
00063 AbstractWriteRequest request)
00064 {
00065 super(nbToComplete, totalNb);
00066 this.request = request;
00067 }
00068
00069
00070
00071
00072
00073
00074
00075 public void executeTask(BackendWorkerThread backendThread)
00076 throws SQLException
00077 {
00078 DatabaseBackend backend = backendThread.getBackend();
00079
00080 AbstractConnectionManager cm = backend.getConnectionManager(request
00081 .getLogin());
00082 if (cm == null)
00083 {
00084 SQLException se = new SQLException(
00085 "No Connection Manager for Virtual Login:" + request.getLogin());
00086 try
00087 {
00088 notifyFailure(backendThread, 1, se);
00089 }
00090 catch (SQLException ignore)
00091 {
00092 }
00093 throw se;
00094 }
00095
00096 Trace logger = backendThread.getLogger();
00097 if (request.isAutoCommit())
00098 {
00099 if (backend.isDisabling())
00100 {
00101
00102
00103
00104 notifyCompletion();
00105 return;
00106 }
00107
00108
00109 Connection c = null;
00110 try
00111 {
00112 c = cm.getConnection();
00113 }
00114 catch (UnreachableBackendException e1)
00115 {
00116 SQLException se = new SQLException("Backend " + backend.getName()
00117 + " is no more reachable.");
00118 try
00119 {
00120 notifyFailure(backendThread, 1, se);
00121 }
00122 catch (SQLException ignore)
00123 {
00124 }
00125
00126
00127 backendThread.kill();
00128 logger.error("Disabling backend " + backend.getName()
00129 + " because it is no more reachable.");
00130 throw se;
00131 }
00132
00133
00134 if (c == null)
00135 {
00136 SQLException se = new SQLException("No more connections");
00137 try
00138 {
00139 if (!notifyFailure(backendThread, (long) request.getTimeout() * 1000,
00140 se))
00141 {
00142 return;
00143 }
00144 }
00145 catch (SQLException ignore)
00146 {
00147 }
00148
00149
00150 backendThread.kill();
00151 String msg = "Request '"
00152 + request.getSQLShortForm(backend.getSQLShortFormLength())
00153 + "' failed on backend " + backend.getName() + " but "
00154 + getSuccess() + " succeeded (" + se + ")";
00155 logger.error(msg);
00156 throw new SQLException(msg);
00157 }
00158
00159
00160 try
00161 {
00162 result = AbstractLoadBalancer.executeUpdateRequestOnBackend(request,
00163 backend, c);
00164
00165
00166 if (request.isCreate())
00167 {
00168 DatabaseSchema dbs = backend.getDatabaseSchema();
00169 if (dbs != null)
00170 {
00171 DatabaseTable t = ((CreateRequest) request).getDatabaseTable();
00172 if (t != null)
00173 {
00174 dbs.addTable(t);
00175 if (logger.isDebugEnabled())
00176 logger.debug("Added table '" + request.getTableName()
00177 + "' to backend database schema");
00178 }
00179 }
00180 }
00181 else if (request.isDrop())
00182 {
00183 DatabaseSchema dbs = backend.getDatabaseSchema();
00184 if (dbs != null)
00185 {
00186 DatabaseTable t = dbs.getTable(request.getTableName());
00187 if (t != null)
00188 {
00189 dbs.removeTable(t);
00190 if (logger.isDebugEnabled())
00191 logger.debug("Removed table '" + request.getTableName()
00192 + "' from backend database schema");
00193 }
00194 }
00195 }
00196 }
00197 catch (Exception e)
00198 {
00199 try
00200 {
00201 if (!notifyFailure(backendThread, (long) request.getTimeout() * 1000,
00202 e))
00203 return;
00204 }
00205 catch (SQLException ignore)
00206 {
00207 }
00208
00209
00210 backendThread.kill();
00211 String msg = "Request '"
00212 + request.getSQLShortForm(backend.getSQLShortFormLength())
00213 + "' failed on backend " + backend.getName() + " but "
00214 + getSuccess() + " succeeded (" + e + ")";
00215
00216 if (logger.isDebugEnabled())
00217 logger.debug(msg, e);
00218 else
00219 logger.error(msg);
00220 throw new SQLException(msg);
00221 }
00222 finally
00223 {
00224 cm.releaseConnection(c);
00225 }
00226 }
00227 else
00228 {
00229 Connection c;
00230 long tid = request.getTransactionId();
00231 Long lTid = new Long(tid);
00232
00233 try
00234 {
00235 c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00236 }
00237 catch (UnreachableBackendException ube)
00238 {
00239 SQLException se = new SQLException("Backend " + backend.getName()
00240 + " is no more reachable.");
00241 try
00242 {
00243 notifyFailure(backendThread, 1, se);
00244 }
00245 catch (SQLException ignore)
00246 {
00247 }
00248
00249
00250 backendThread.kill();
00251 logger.error("Disabling backend " + backend.getName()
00252 + " because it is no more reachable.");
00253 throw se;
00254 }
00255 catch (NoTransactionStartWhenDisablingException e)
00256 {
00257
00258
00259
00260 notifyCompletion();
00261 return;
00262 }
00263 catch (SQLException e1)
00264 {
00265 SQLException se = new SQLException(
00266 "Unable to get connection for transaction " + tid);
00267 try
00268 {
00269 if (!notifyFailure(backendThread, (long) request.getTimeout() * 1000,
00270 se))
00271 return;
00272 }
00273 catch (SQLException ignore)
00274 {
00275 }
00276
00277
00278 backendThread.kill();
00279 String msg = "Request '"
00280 + request.getSQLShortForm(backend.getSQLShortFormLength())
00281 + "' failed on backend " + backend.getName() + " but "
00282 + getSuccess() + " succeeded (" + se + ")";
00283 logger.error(msg);
00284 throw new SQLException(msg);
00285 }
00286
00287
00288 if (c == null)
00289 {
00290 SQLException se = new SQLException(
00291 "Unable to retrieve connection for transaction " + tid);
00292 try
00293 {
00294 if (!notifyFailure(backendThread, (long) request.getTimeout() * 1000,
00295 se))
00296 return;
00297 }
00298 catch (SQLException ignore)
00299 {
00300 }
00301
00302
00303 backendThread.kill();
00304 String msg = "Request '"
00305 + request.getSQLShortForm(backend.getSQLShortFormLength())
00306 + "' failed on backend " + backend.getName() + " but "
00307 + getSuccess() + " succeeded (" + se + ")";
00308 logger.error(msg);
00309 throw new SQLException(msg);
00310 }
00311
00312
00313 try
00314 {
00315 result = AbstractLoadBalancer.executeUpdateRequestOnBackend(request,
00316 backend, c);
00317
00318
00319 if (request.isCreate())
00320 {
00321 DatabaseSchema dbs = backend.getDatabaseSchema();
00322 if (dbs != null)
00323 {
00324 DatabaseTable t = ((CreateRequest) request).getDatabaseTable();
00325 if (t != null)
00326 {
00327 dbs.addTable(t);
00328 if (logger.isDebugEnabled())
00329 logger.debug("Added table '" + request.getTableName()
00330 + "' to backend database schema");
00331 }
00332 }
00333 }
00334 else if (request.isDrop())
00335 {
00336 DatabaseSchema dbs = backend.getDatabaseSchema();
00337 if (dbs != null)
00338 {
00339 DatabaseTable t = dbs.getTable(request.getTableName());
00340 if (t != null)
00341 {
00342 dbs.removeTable(t);
00343 if (logger.isDebugEnabled())
00344 logger.debug("Removed table '" + request.getTableName()
00345 + "' from backend database schema");
00346 }
00347 }
00348 }
00349 }
00350 catch (Exception e)
00351 {
00352 try
00353 {
00354 if (!notifyFailure(backendThread, (long) request.getTimeout() * 1000,
00355 e))
00356 return;
00357 }
00358 catch (SQLException ignore)
00359 {
00360 }
00361
00362
00363 backendThread.kill();
00364 String msg = "Request '"
00365 + request.getSQLShortForm(backend.getSQLShortFormLength())
00366 + "' failed on backend " + backend.getName() + " but "
00367 + getSuccess() + " succeeded (" + e + ")";
00368 if (logger.isDebugEnabled())
00369 logger.debug(msg, e);
00370 else
00371 logger.error(msg);
00372 throw new SQLException(msg);
00373 }
00374 }
00375 notifySuccess();
00376 }
00377
00378
00379
00380
00381
00382
00383 public int getResult()
00384 {
00385 return result;
00386 }
00387
00388
00389
00390
00391 public String toString()
00392 {
00393 if (request.isAutoCommit())
00394 return "WriteAutocommit Task (" + request.getSQL() + ")";
00395 else
00396 return "Write Task from transaction:" + request.getTransactionId() + "("
00397 + request.getSQL() + ")";
00398 }
00399
00400 }