00001
00025
package org.objectweb.cjdbc.controller.loadbalancer.tasks;
00026
00027
import java.sql.Connection;
00028
import java.sql.SQLException;
00029
00030
import org.objectweb.cjdbc.common.exceptions.UnreachableBackendException;
00031
import org.objectweb.cjdbc.common.log.Trace;
00032
import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00033
import org.objectweb.cjdbc.common.sql.CreateRequest;
00034
import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema;
00035
import org.objectweb.cjdbc.common.sql.schema.DatabaseTable;
00036
import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00037
import org.objectweb.cjdbc.controller.connection.AbstractConnectionManager;
00038
import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00039
import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00040
00049 public class WriteRequestTask extends AbstractTask
00050 {
00051 private AbstractWriteRequest
request;
00052 private int result;
00053
00061 public WriteRequestTask(
int nbToComplete,
int totalNb,
00062 AbstractWriteRequest request)
00063 {
00064 super(nbToComplete, totalNb);
00065
this.request = request;
00066 }
00067
00074 public void execute(
BackendWorkerThread backendThread)
throws SQLException
00075 {
00076
DatabaseBackend backend = backendThread.getBackend();
00077
00078
AbstractConnectionManager cm = backend.
getConnectionManager(
request
00079 .getLogin());
00080
if (cm == null)
00081 {
00082 SQLException se =
new SQLException(
00083
"No Connection Manager for Virtual Login:" +
request.getLogin());
00084
try
00085 {
00086
notifyFailure(backendThread, 1, se);
00087 }
00088
catch (SQLException ignore)
00089 {
00090 }
00091
throw se;
00092 }
00093
00094
Trace logger = backendThread.getLogger();
00095
if (
request.isAutoCommit())
00096 {
00097
if (backend.
isDisabling())
00098 {
00099
00100
00101
00102
notifyCompletion();
00103
return;
00104 }
00105
00106
00107 Connection c = null;
00108
try
00109 {
00110 c = cm.
getConnection();
00111 }
00112
catch (
UnreachableBackendException e1)
00113 {
00114 SQLException se =
new SQLException(
"Backend " + backend.
getName()
00115 +
" is no more reachable.");
00116
try
00117 {
00118
notifyFailure(backendThread, 1, se);
00119 }
00120
catch (SQLException ignore)
00121 {
00122 }
00123 logger.
error(
"Disabling backend " + backend.
getName()
00124 +
" because it is no more reachable.");
00125 backend.
disable();
00126
throw se;
00127 }
00128
00129
00130
if (c == null)
00131 {
00132 SQLException se =
new SQLException(
"No more connections");
00133
try
00134 {
00135
if (!
notifyFailure(backendThread, (
long)
request.getTimeout() * 1000,
00136 se))
00137 {
00138
return;
00139 }
00140 }
00141
catch (SQLException ignore)
00142 {
00143 }
00144
00145
00146 backendThread.kill();
00147 String msg =
"Request '"
00148 +
request.getSQLShortForm(backend.
getSQLShortFormLength())
00149 +
"' failed on backend " + backend.
getName() +
" but "
00150 +
getSuccess() +
" succeeded (" + se +
")";
00151 logger.
error(msg);
00152
throw new SQLException(msg);
00153 }
00154
00155
00156
try
00157 {
00158
result =
AbstractLoadBalancer.executeUpdateRequestOnBackend(
request,
00159 backend, c);
00160
00161
00162
if (
request.isCreate())
00163 {
00164
DatabaseSchema dbs = backend.
getDatabaseSchema();
00165
if (dbs != null)
00166 {
00167
DatabaseTable t = ((
CreateRequest)
request).getDatabaseTable();
00168
if (t != null)
00169 {
00170 dbs.
addTable(t);
00171
if (logger.
isDebugEnabled())
00172 logger.
debug(
"Added table '" +
request.getTableName()
00173 +
"' to backend database schema");
00174 }
00175 }
00176 }
00177
else if (
request.isDrop())
00178 {
00179
DatabaseSchema dbs = backend.
getDatabaseSchema();
00180
if (dbs != null)
00181 {
00182
DatabaseTable t = dbs.
getTable(
request.getTableName());
00183
if (t != null)
00184 {
00185 dbs.
removeTable(t);
00186
if (logger.
isDebugEnabled())
00187 logger.
debug(
"Removed table '" +
request.getTableName()
00188 +
"' from backend database schema");
00189 }
00190 }
00191 }
00192 }
00193
catch (Exception e)
00194 {
00195
try
00196 {
00197
if (!
notifyFailure(backendThread, (
long)
request.getTimeout() * 1000,
00198 e))
00199
return;
00200 }
00201
catch (SQLException ignore)
00202 {
00203 }
00204
00205
00206 backendThread.kill();
00207 String msg =
"Request '"
00208 +
request.getSQLShortForm(backend.
getSQLShortFormLength())
00209 +
"' failed on backend " + backend.
getName() +
" but "
00210 +
getSuccess() +
" succeeded (" + e +
")";
00211
00212
if (logger.
isDebugEnabled())
00213 logger.
debug(msg, e);
00214
else
00215 logger.
error(msg);
00216
throw new SQLException(msg);
00217 }
00218 finally
00219 {
00220 cm.
releaseConnection(c);
00221 }
00222 }
00223
else
00224 {
00225 Connection c;
00226
long tid =
request.getTransactionId();
00227 Long lTid =
new Long(tid);
00228
00229
if (!backend.
isStartedTransaction(lTid))
00230 {
00231
if (backend.
isDisabling())
00232 {
00233
00234
00235
notifyCompletion();
00236
return;
00237 }
00238
00239
00240
try
00241 {
00242 c = cm.
getConnection(tid);
00243 }
00244
catch (
UnreachableBackendException e1)
00245 {
00246 SQLException se =
new SQLException(
"Backend " + backend.
getName()
00247 +
" is no more reachable.");
00248
try
00249 {
00250
notifyFailure(backendThread, 1, se);
00251 }
00252
catch (SQLException ignore)
00253 {
00254 }
00255 logger.
error(
"Disabling backend " + backend.
getName()
00256 +
" because it is no more reachable.");
00257 backend.
disable();
00258
throw se;
00259 }
00260
00261
00262
if (c == null)
00263 {
00264 SQLException se =
new SQLException(
00265
"Unable to get connection for transaction " + tid);
00266
try
00267 {
00268
if (!
notifyFailure(backendThread,
00269 (
long)
request.getTimeout() * 1000, se))
00270
return;
00271 }
00272
catch (SQLException ignore)
00273 {
00274 }
00275
00276
00277 backendThread.kill();
00278 String msg =
"Request '"
00279 +
request.getSQLShortForm(backend.
getSQLShortFormLength())
00280 +
"' failed on backend " + backend.
getName() +
" but "
00281 +
getSuccess() +
" succeeded (" + se +
")";
00282 logger.
error(msg);
00283
throw new SQLException(msg);
00284 }
00285
00286
00287 backend.
startTransaction(lTid);
00288
try
00289 {
00290 c.setAutoCommit(
false);
00291 }
00292
catch (SQLException e2)
00293 {
00294 SQLException se =
new SQLException(
00295
"Unable to set autocommit value for transaction " + tid);
00296
try
00297 {
00298
if (!
notifyFailure(backendThread,
00299 (
long)
request.getTimeout() * 1000, se))
00300
return;
00301 }
00302
catch (SQLException ignore)
00303 {
00304 }
00305
00306
00307 backendThread.kill();
00308 String msg =
"Request '"
00309 +
request.getSQLShortForm(backend.
getSQLShortFormLength())
00310 +
"' failed on backend " + backend.
getName() +
" but "
00311 +
getSuccess() +
" succeeded (" + se +
")";
00312 logger.
error(msg);
00313
throw new SQLException(msg);
00314 }
00315 }
00316
else
00317 {
00318 c = cm.
retrieveConnection(tid);
00319
00320
00321
if (c == null)
00322 {
00323 SQLException se =
new SQLException(
00324
"Unable to retrieve connection for transaction " + tid);
00325
try
00326 {
00327
if (!
notifyFailure(backendThread,
00328 (
long)
request.getTimeout() * 1000, se))
00329
return;
00330 }
00331
catch (SQLException ignore)
00332 {
00333 }
00334
00335
00336 backendThread.kill();
00337 String msg =
"Request '"
00338 +
request.getSQLShortForm(backend.
getSQLShortFormLength())
00339 +
"' failed on backend " + backend.
getName() +
" but "
00340 +
getSuccess() +
" succeeded (" + se +
")";
00341 logger.
error(msg);
00342
throw new SQLException(msg);
00343 }
00344 }
00345
00346
try
00347 {
00348
result =
AbstractLoadBalancer.executeUpdateRequestOnBackend(
request,
00349 backend, c);
00350
00351
00352
if (
request.isCreate())
00353 {
00354
DatabaseSchema dbs = backend.
getDatabaseSchema();
00355
if (dbs != null)
00356 {
00357
DatabaseTable t = ((
CreateRequest)
request).getDatabaseTable();
00358
if (t != null)
00359 {
00360 dbs.
addTable(t);
00361
if (logger.
isDebugEnabled())
00362 logger.
debug(
"Added table '" +
request.getTableName()
00363 +
"' to backend database schema");
00364 }
00365 }
00366 }
00367
else if (
request.isDrop())
00368 {
00369
DatabaseSchema dbs = backend.
getDatabaseSchema();
00370
if (dbs != null)
00371 {
00372
DatabaseTable t = dbs.
getTable(
request.getTableName());
00373
if (t != null)
00374 {
00375 dbs.
removeTable(t);
00376
if (logger.
isDebugEnabled())
00377 logger.
debug(
"Removed table '" +
request.getTableName()
00378 +
"' from backend database schema");
00379 }
00380 }
00381 }
00382 }
00383
catch (Exception e)
00384 {
00385
try
00386 {
00387
if (!
notifyFailure(backendThread, (
long)
request.getTimeout() * 1000,
00388 e))
00389
return;
00390 }
00391
catch (SQLException ignore)
00392 {
00393 }
00394
00395
00396 backendThread.kill();
00397 String msg =
"Request '"
00398 +
request.getSQLShortForm(backend.
getSQLShortFormLength())
00399 +
"' failed on backend " + backend.
getName() +
" but "
00400 +
getSuccess() +
" succeeded (" + e +
")";
00401
if (logger.
isDebugEnabled())
00402 logger.
debug(msg, e);
00403
else
00404 logger.
error(msg);
00405
throw new SQLException(msg);
00406 }
00407 }
00408
notifySuccess();
00409 }
00410
00416 public int getResult()
00417 {
00418
return result;
00419 }
00420
00424 public String
toString()
00425 {
00426
if (
request.isAutoCommit())
00427
return "WriteAutocommit Task (" +
request.getSQL() +
")";
00428
else
00429
return "Write Task from transaction:" +
request.getTransactionId() +
"("
00430 +
request.getSQL() +
")";
00431 }
00432
00433 }