00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.loadbalancer;
00026
00027 import java.sql.SQLException;
00028 import java.util.ArrayList;
00029
00030 import org.objectweb.cjdbc.common.i18n.Translate;
00031 import org.objectweb.cjdbc.common.log.Trace;
00032 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00033 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
00034
00035
00036
00037
00038
00039
00040
00041 public class BackendWorkerThread extends Thread
00042 {
00043
00044
00045
00046
00047
00048
00049
00050
00051 private AbstractLoadBalancer loadBalancer;
00052 private DatabaseBackend backend;
00053 private ArrayList taskList;
00054 private ArrayList tidList;
00055 private boolean isKilled = false;
00056
00057
00058 private AbstractTask currentlyProcessingTask;
00059
00060 private Long currentTaskTid;
00061
00062 private Trace logger = null;
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075 public BackendWorkerThread(DatabaseBackend backend,
00076 AbstractLoadBalancer loadBalancer) throws SQLException
00077 {
00078 this("BackendWorkerThread for backend:" + backend.getName()
00079 + " and a loadBalancer level:" + loadBalancer.getRAIDbLevel(), backend,
00080 loadBalancer);
00081 }
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091 public BackendWorkerThread(String name, DatabaseBackend backend,
00092 AbstractLoadBalancer loadBalancer) throws SQLException
00093 {
00094 super(name);
00095
00096 if (backend == null)
00097 {
00098 String msg = Translate.get("backendworkerthread.null.backend");
00099 logger = Trace
00100 .getLogger("org.objectweb.cjdbc.controller.backend.DatabaseBackend");
00101 logger.error(msg);
00102 throw new SQLException(msg);
00103 }
00104
00105 backend.checkDriverCompliance();
00106
00107 logger = Trace
00108 .getLogger("org.objectweb.cjdbc.controller.backend.DatabaseBackend."
00109 + backend.getName());
00110
00111 if (loadBalancer == null)
00112 {
00113 String msg = Translate.get("backendworkerthread.null.loadbalancer");
00114 logger.error(msg);
00115 throw new SQLException(msg);
00116 }
00117
00118 this.backend = backend;
00119 this.loadBalancer = loadBalancer;
00120 taskList = new ArrayList();
00121 tidList = new ArrayList();
00122 }
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135 public void addTask(AbstractTask task)
00136 {
00137 if (!isKilled)
00138 {
00139 taskList.add(task);
00140
00141 backend.addPendingWriteRequest(task);
00142 }
00143 else
00144 task.notifyCompletion();
00145 }
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155 public void addTask(AbstractTask task, long transactionId)
00156 {
00157 if (!isKilled)
00158 {
00159 tidList.add(new Long(transactionId));
00160 task.setHasTid(true);
00161 addTask(task);
00162 }
00163 else
00164 task.notifyCompletion();
00165 }
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178 public void insertTaskAfterLastWriteForTransaction(AbstractTask task,
00179 Long transactionId)
00180 {
00181 if (!isKilled)
00182 {
00183 task.setHasTid(true);
00184
00185
00186 int lastTidIndex = tidList.lastIndexOf(transactionId);
00187 if (lastTidIndex == -1)
00188 {
00189 taskList.add(task);
00190 tidList.add(transactionId);
00191 backend.addPendingWriteRequest(task);
00192 return;
00193 }
00194
00195
00196
00197 int lastRequestIndex = 0;
00198 while (lastTidIndex >= 0)
00199 {
00200 AbstractTask t = (AbstractTask) taskList.get(lastRequestIndex);
00201 if (t.hasTid())
00202 lastTidIndex--;
00203 lastRequestIndex++;
00204 }
00205
00206
00207 taskList.add(lastRequestIndex, task);
00208 tidList.add(lastTidIndex + 1, transactionId);
00209
00210
00211 backend.addPendingWriteRequest(task);
00212 }
00213 else
00214 task.notifyCompletion();
00215 }
00216
00217
00218
00219
00220
00221
00222
00223
00224 public void addPriorityTask(AbstractTask task)
00225 {
00226 if (!isKilled)
00227 {
00228 taskList.add(0, task);
00229
00230 backend.addPendingWriteRequest(task);
00231 }
00232 else
00233 task.notifyCompletion();
00234 }
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244 public void addPriorityTask(AbstractTask task, long transactionId)
00245 {
00246 if (!isKilled)
00247 {
00248 task.setHasTid(true);
00249 addPriorityTask(task);
00250 tidList.add(0, new Long(transactionId));
00251 }
00252 else
00253 task.notifyCompletion();
00254 }
00255
00256
00257
00258
00259
00260
00261
00262 public boolean hasTaskForTransaction(Long tid)
00263 {
00264 synchronized (this)
00265 {
00266 if ((currentTaskTid != null) && (currentTaskTid.equals(tid)))
00267
00268 return true;
00269 else
00270 return tidList.contains(tid);
00271 }
00272 }
00273
00274
00275
00276
00277
00278
00279 public void waitForAllTasksToComplete(long transactionId)
00280 {
00281 if ((transactionId == 0) || (tidList == null))
00282 return;
00283
00284 Long tid = new Long(transactionId);
00285 synchronized (this)
00286 {
00287 if (!tidList.contains(tid))
00288 {
00289 if ((currentTaskTid != null)
00290 && (currentTaskTid.longValue() == transactionId))
00291 {
00292 try
00293 {
00294 if (logger.isDebugEnabled())
00295 logger.debug(Translate.get("backendworkerthread.waiting.task"));
00296 wait();
00297 }
00298 catch (InterruptedException ignore)
00299 {
00300 }
00301 return;
00302 }
00303 else
00304 return;
00305 }
00306
00307 while (tidList.contains(tid))
00308 {
00309 if (logger.isDebugEnabled())
00310 logger.debug(Translate.get("backendworkerthread.waiting.transaction",
00311 String.valueOf(tid)));
00312
00313 try
00314 {
00315 wait();
00316 }
00317 catch (InterruptedException ignore)
00318 {
00319 }
00320 }
00321 }
00322 }
00323
00324
00325
00326
00327 public void waitForAllTasksToComplete()
00328 {
00329 synchronized (this)
00330 {
00331 Object current;
00332 if (taskList.size() == 0)
00333 {
00334 if (currentlyProcessingTask != null)
00335 {
00336 try
00337 {
00338 if (logger.isDebugEnabled())
00339 logger.debug(Translate.get("backendworkerthread.waiting.task"));
00340 wait();
00341 }
00342 catch (InterruptedException ignore)
00343 {
00344 logger.warn(Translate
00345 .get("backendworkerthread.no.full.task.synchronization"));
00346 }
00347 return;
00348 }
00349 else
00350 {
00351 return;
00352 }
00353 }
00354 else
00355 current = taskList.get(taskList.size() - 1);
00356
00357 if (logger.isDebugEnabled())
00358 logger.debug(Translate.get("backendworkerthread.waiting.request",
00359 current.toString()));
00360
00361 while (taskList.contains(current))
00362 {
00363 try
00364 {
00365 wait();
00366 }
00367 catch (InterruptedException ignore)
00368 {
00369 }
00370 }
00371 }
00372 }
00373
00374
00375
00376
00377
00378 public synchronized void kill()
00379 {
00380 if (backend.isKilled())
00381 return;
00382
00383 String msg = "Backend " + backend.getName() + " is shutting down";
00384
00385 while (!taskList.isEmpty())
00386 {
00387 AbstractTask task = (AbstractTask) taskList.remove(0);
00388 try
00389 {
00390 task.notifyFailure(this, 1, new SQLException(msg));
00391 }
00392 catch (SQLException ignore)
00393 {
00394 }
00395 }
00396 isKilled = true;
00397 notify();
00398 logger.info(msg);
00399 try
00400 {
00401
00402
00403 loadBalancer.disableBackend(backend);
00404 }
00405 catch (SQLException ignore)
00406 {
00407 }
00408 }
00409
00410
00411
00412
00413
00414 public void run()
00415 {
00416 currentlyProcessingTask = null;
00417
00418 while (!isKilled)
00419 {
00420 synchronized (this)
00421 {
00422 if (taskList.isEmpty())
00423 {
00424 try
00425 {
00426 wait();
00427 }
00428 catch (InterruptedException e)
00429 {
00430 logger.warn(Translate.get("backendworkerthread.wait.interrupted"));
00431 }
00432 }
00433 try
00434 {
00435 currentlyProcessingTask = (AbstractTask) taskList.remove(0);
00436 if (currentlyProcessingTask.hasTid())
00437 currentTaskTid = (Long) tidList.remove(0);
00438 else
00439 currentTaskTid = null;
00440 }
00441 catch (IndexOutOfBoundsException oob)
00442 {
00443 logger.warn(Translate.get("backendworkerthread.no.task"), oob);
00444 }
00445 }
00446
00447 try
00448 {
00449 if (logger.isDebugEnabled())
00450 logger.debug(Translate.get("backendworkerthread.execute.task",
00451 currentlyProcessingTask.toString()));
00452 currentlyProcessingTask.execute(this);
00453 }
00454 catch (SQLException e)
00455 {
00456
00457 logger.warn(Translate.get("backendworkerthread.task.failed", e));
00458 }
00459 catch (RuntimeException re)
00460 {
00461
00462
00463 try
00464 {
00465 currentlyProcessingTask.notifyFailure(this, 1, new SQLException(re
00466 .getMessage()));
00467 }
00468 catch (SQLException e1)
00469 {
00470
00471 }
00472 logger.fatal(Translate.get(
00473 "backendworkerthread.task.runtime.exception",
00474 currentlyProcessingTask.toString()), re);
00475 }
00476 finally
00477 {
00478 try
00479 {
00480 backend.removePendingRequest(currentlyProcessingTask);
00481 }
00482 catch (RuntimeException e)
00483 {
00484 logger.warn(
00485 Translate.get("backendworkerthread.remove.task.error", e), e);
00486 }
00487 }
00488
00489
00490
00491
00492
00493 synchronized (this)
00494 {
00495 notifyAll();
00496 currentlyProcessingTask = null;
00497 currentTaskTid = null;
00498 }
00499 }
00500
00501
00502 try
00503 {
00504 if (backend.isReadEnabled() || backend.isWriteEnabled())
00505 loadBalancer.disableBackend(backend);
00506 }
00507 catch (SQLException e)
00508 {
00509 logger.error(Translate.get("backendworkerthread.backend.disable.failed",
00510 new String[]{backend.getName(), e.getMessage()}));
00511 }
00512 }
00513
00514
00515
00516
00517
00518
00519
00520
00521
00522
00523 public DatabaseBackend getBackend()
00524 {
00525 return backend;
00526 }
00527
00528
00529
00530
00531
00532
00533 public Trace getLogger()
00534 {
00535 return logger;
00536 }
00537
00538 }