00001
00025
package org.objectweb.cjdbc.controller.loadbalancer;
00026
00027
import java.sql.SQLException;
00028
import java.util.ArrayList;
00029
00030
import org.objectweb.cjdbc.common.i18n.Translate;
00031
import org.objectweb.cjdbc.common.log.Trace;
00032
import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00033
import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
00034
00041 public class BackendWorkerThread extends Thread
00042 {
00043
00044
00045
00046
00047
00048
00049
00050
00051 private AbstractLoadBalancer loadBalancer;
00052 private DatabaseBackend
backend;
00053 private ArrayList
taskList;
00054 private ArrayList
tidList;
00055 private boolean isKilled =
false;
00056
00057
00058 private boolean currentlyProcessing =
false;
00059
00060 private Trace
logger = null;
00061
00062
00063
00064
00065
00073 public BackendWorkerThread(DatabaseBackend backend,
00074
AbstractLoadBalancer loadBalancer)
throws SQLException
00075 {
00076
this(
"BackendWorkerThread for backend:" +
backend.getName()
00077 +
" and a loadBalancer level:" +
loadBalancer.
getRAIDbLevel(),
backend,
00078
loadBalancer);
00079 }
00080
00089 public BackendWorkerThread(String name, DatabaseBackend backend,
00090
AbstractLoadBalancer loadBalancer)
throws SQLException
00091 {
00092 super(name);
00093
00094
if (
backend == null)
00095 {
00096 String msg =
Translate.get(
"backendworkerthread.null.backend");
00097
logger = Trace
00098 .getLogger(
"org.objectweb.cjdbc.controller.backend.DatabaseBackend");
00099
logger.error(msg);
00100
throw new SQLException(msg);
00101 }
00102
00103
backend.checkDriverCompliance();
00104
00105
logger = Trace
00106 .getLogger(
"org.objectweb.cjdbc.controller.backend.DatabaseBackend."
00107 +
backend.getName());
00108
00109
if (
loadBalancer == null)
00110 {
00111 String msg =
Translate.get(
"backendworkerthread.null.loadbalancer");
00112
logger.error(msg);
00113
throw new SQLException(msg);
00114 }
00115
00116
this.backend =
backend;
00117
this.loadBalancer =
loadBalancer;
00118
taskList =
new ArrayList();
00119 }
00120
00121
00122
00123
00124
00132 public void addTask(
AbstractTask task)
00133 {
00134
if (!
isKilled)
00135 {
00136
taskList.add(task);
00137
00138
backend.addPendingWriteRequest(task);
00139 }
00140
else
00141 task.
notifyCompletion();
00142 }
00143
00152 public void addTask(
AbstractTask task,
long transactionId)
00153 {
00154
if (!
isKilled)
00155 {
00156 task.
setHasTid(
true);
00157
addTask(task);
00158
if (
tidList == null)
00159
tidList =
new ArrayList();
00160
tidList.add(
new Long(transactionId));
00161 }
00162
else
00163 task.
notifyCompletion();
00164 }
00165
00173 public void addPriorityTask(
AbstractTask task)
00174 {
00175
if (!
isKilled)
00176 {
00177
taskList.add(0, task);
00178
00179
backend.addPendingWriteRequest(task);
00180 }
00181
else
00182 task.
notifyCompletion();
00183 }
00184
00193 public void addPriorityTask(
AbstractTask task,
long transactionId)
00194 {
00195
if (!
isKilled)
00196 {
00197 task.
setHasTid(
true);
00198
addPriorityTask(task);
00199
tidList.add(0,
new Long(transactionId));
00200 }
00201
else
00202 task.
notifyCompletion();
00203 }
00204
00214 public boolean removeTask(
AbstractTask task)
00215 {
00216
backend.removePendingRequest(task);
00217
if (
tidList == null)
00218
return taskList.remove(task);
00219
else
00220 {
00221
int idx =
taskList.indexOf(task);
00222
if (idx == -1)
00223
return false;
00224
else
00225 {
00226
taskList.remove(idx);
00227
tidList.remove(idx);
00228
return true;
00229 }
00230 }
00231 }
00232
00238 public void waitForAllTasksToComplete(
long transactionId)
00239 {
00240
if ((transactionId == 0) || (
tidList == null))
00241
return;
00242
00243 Long tid =
new Long(transactionId);
00244
synchronized (
this)
00245 {
00246
if (!
tidList.contains(tid))
00247 {
00248
if (
currentlyProcessing)
00249 {
00250
00251
00252
try
00253 {
00254
if (
logger.isDebugEnabled())
00255
logger.debug(
Translate.get(
"backendworkerthread.waiting.task"));
00256 wait();
00257 }
00258
catch (InterruptedException ignore)
00259 {
00260 }
00261
return;
00262 }
00263
else
00264
return;
00265 }
00266
00267
while (
tidList.contains(tid))
00268 {
00269
if (
logger.isDebugEnabled())
00270
logger.debug(
Translate.get(
"backendworkerthread.waiting.transaction",
00271 String.valueOf(tid)));
00272
00273
try
00274 {
00275 wait();
00276 }
00277
catch (InterruptedException ignore)
00278 {
00279 }
00280 }
00281 }
00282 }
00283
00287 public void waitForAllTasksToComplete()
00288 {
00289
synchronized (
this)
00290 {
00291 Object current;
00292
if (
taskList.size() == 0)
00293 {
00294
if (
currentlyProcessing)
00295 {
00296
try
00297 {
00298
if (
logger.isDebugEnabled())
00299
logger.debug(
Translate.get(
"backendworkerthread.waiting.task"));
00300 wait();
00301 }
00302
catch (InterruptedException ignore)
00303 {
00304
logger.warn(
Translate
00305 .get(
"backendworkerthread.no.full.task.synchronization"));
00306 }
00307
return;
00308 }
00309
else
00310 {
00311
return;
00312 }
00313 }
00314
else
00315 current =
taskList.get(
taskList.size() - 1);
00316
00317
if (
logger.isDebugEnabled())
00318
logger.debug(
Translate.get(
"backendworkerthread.waiting.request",
00319 current.toString()));
00320
00321
while (
taskList.contains(current))
00322 {
00323
try
00324 {
00325 wait();
00326 }
00327
catch (InterruptedException ignore)
00328 {
00329 }
00330 }
00331 }
00332 }
00333
00338 public synchronized void kill()
00339 {
00340
if (
backend.isKilled())
00341
return;
00342
00343 String msg =
"Backend " +
backend.getName() +
" is shutting down";
00344
00345
while (!
taskList.isEmpty())
00346 {
00347
AbstractTask task = (
AbstractTask)
taskList.remove(0);
00348
try
00349 {
00350 task.
notifyFailure(
this, 1,
new SQLException(msg));
00351 }
00352
catch (SQLException ignore)
00353 {
00354 }
00355 }
00356
isKilled =
true;
00357 notify();
00358
logger.info(msg);
00359
try
00360 {
00361
00362
00363
loadBalancer.
disableBackend(
backend);
00364 }
00365
catch (SQLException ignore)
00366 {
00367 }
00368 }
00369
00374 public void run()
00375 {
00376
AbstractTask task = null;
00377
00378
while (!
isKilled)
00379 {
00380
synchronized (
this)
00381 {
00382
if (
taskList.isEmpty())
00383 {
00384
try
00385 {
00386 wait();
00387 }
00388
catch (InterruptedException e)
00389 {
00390
logger.warn(
Translate.get(
"backendworkerthread.wait.interrupted"));
00391 }
00392 }
00393
try
00394 {
00395 task = (
AbstractTask)
taskList.remove(0);
00396
currentlyProcessing =
true;
00397 }
00398
catch (IndexOutOfBoundsException oob)
00399 {
00400
logger.warn(
Translate.get(
"backendworkerthread.no.task"));
00401 }
00402 }
00403
00404
try
00405 {
00406
if (
logger.isDebugEnabled())
00407
logger.debug(
Translate.get(
"backendworkerthread.execute.task", task
00408 .toString()));
00409 task.
execute(
this);
00410 }
00411
catch (SQLException e)
00412 {
00413
00414
logger.warn(
Translate.get(
"backendworkerthread.task.failed", e));
00415 }
00416
catch (RuntimeException re)
00417 {
00418
00419
00420
try
00421 {
00422 task.
notifyFailure(
this, 1,
new SQLException(re.getMessage()));
00423 }
00424
catch (SQLException e1)
00425 {
00426
00427 }
00428
logger.fatal(
Translate.get(
00429
"backendworkerthread.task.runtime.exception", task.toString()), re);
00430 }
00431 finally
00432 {
00433
try
00434 {
00435
backend.removePendingRequest(task);
00436 }
00437
catch (RuntimeException e)
00438 {
00439
logger.warn(
00440
Translate.get(
"backendworkerthread.remove.task.error", e), e);
00441 }
00442 }
00443
00444
if (task.
hasTid())
00445 {
00446
00447
00448
synchronized (
this)
00449 {
00450
if (
tidList != null)
00451
tidList.remove(0);
00452 notifyAll();
00453
currentlyProcessing =
false;
00454 }
00455 }
00456 }
00457
00458
00459
try
00460 {
00461
if (
backend.isReadEnabled() ||
backend.isWriteEnabled())
00462
loadBalancer.
disableBackend(
backend);
00463 }
00464
catch (SQLException e)
00465 {
00466
logger.error(
Translate.get(
"backendworkerthread.backend.disable.failed",
00467
new String[]{
backend.getName(), e.getMessage()}));
00468 }
00469 }
00470
00471
00472
00473
00474
00480 public DatabaseBackend getBackend()
00481 {
00482
return backend;
00483 }
00484
00490 public Trace getLogger()
00491 {
00492
return logger;
00493 }
00494 }