Public Member Functions | |
BackendWorkerThread (DatabaseBackend backend, AbstractLoadBalancer loadBalancer) throws SQLException | |
BackendWorkerThread (String name, DatabaseBackend backend, AbstractLoadBalancer loadBalancer) throws SQLException | |
void | addTask (AbstractTask task) |
void | addTask (AbstractTask task, long transactionId) |
void | insertTaskAfterLastWriteForTransaction (AbstractTask task, Long transactionId) |
void | addPriorityTask (AbstractTask task) |
void | addPriorityTask (AbstractTask task, long transactionId) |
boolean | hasTaskForTransaction (Long tid) |
void | waitForAllTasksToComplete (long transactionId) |
void | waitForAllTasksToComplete () |
synchronized void | kill () |
void | run () |
DatabaseBackend | getBackend () |
Trace | getLogger () |
Definition at line 41 of file BackendWorkerThread.java.
|
Creates a new
Definition at line 75 of file BackendWorkerThread.java. References org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getRAIDbLevel(). 00077 { 00078 this("BackendWorkerThread for backend:" + backend.getName() 00079 + " and a loadBalancer level:" + loadBalancer.getRAIDbLevel(), backend, 00080 loadBalancer); 00081 }
|
|
Creates a new
Definition at line 91 of file BackendWorkerThread.java. 00093 { 00094 super(name); 00095 // Sanity checks 00096 if (backend == null) 00097 { 00098 String msg = Translate.get("backendworkerthread.null.backend"); 00099 logger = Trace 00100 .getLogger("org.objectweb.cjdbc.controller.backend.DatabaseBackend"); 00101 logger.error(msg); 00102 throw new SQLException(msg); 00103 } 00104 00105 backend.checkDriverCompliance(); 00106 00107 logger = Trace 00108 .getLogger("org.objectweb.cjdbc.controller.backend.DatabaseBackend." 00109 + backend.getName()); 00110 00111 if (loadBalancer == null) 00112 { 00113 String msg = Translate.get("backendworkerthread.null.loadbalancer"); 00114 logger.error(msg); 00115 throw new SQLException(msg); 00116 } 00117 00118 this.backend = backend; 00119 this.loadBalancer = loadBalancer; 00120 taskList = new ArrayList(); 00121 tidList = new ArrayList(); 00122 }
|
|
Adds a task upfront to the task list so that this task will be the next executed task. Warning! This method is not synchronized and the caller must synchronize on the thread before calling this method
Definition at line 244 of file BackendWorkerThread.java. References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(). 00245 { 00246 if (!isKilled) 00247 { 00248 task.setHasTid(true); 00249 addPriorityTask(task); 00250 tidList.add(0, new Long(transactionId)); 00251 } 00252 else 00253 task.notifyCompletion(); 00254 }
|
|
Adds a task upfront to the task list so that this task will be the next executed task. Warning! This method is not synchronized and the caller must synchronize on the thread before calling this method.
Definition at line 224 of file BackendWorkerThread.java. Referenced by org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(), org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec.disableBackend(), org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.disableBackend(), org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec.disableBackend(), org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.disableBackend(), and org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0.disableBackend(). 00225 { 00226 if (!isKilled) 00227 { 00228 taskList.add(0, task); 00229 // We assume that all requests here are writes 00230 backend.addPendingWriteRequest(task); 00231 } 00232 else 00233 task.notifyCompletion(); 00234 }
|
|
Adds a task at the end of the task list. Warning! This method is not synchronized and the caller must synchronize on the thread before calling this method.
Definition at line 155 of file BackendWorkerThread.java. References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(). 00156 { 00157 if (!isKilled) 00158 { 00159 tidList.add(new Long(transactionId)); 00160 task.setHasTid(true); 00161 addTask(task); 00162 } 00163 else 00164 task.notifyCompletion(); 00165 }
|
|
Adds a task at the end of the task list. Warning! This method is not synchronized and the caller must synchronize on the thread before calling this method.
Definition at line 135 of file BackendWorkerThread.java. Referenced by org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.commit(), org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.commit(), org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0.commit(), org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.rollback(), org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.rollback(), and org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0.rollback(). 00136 { 00137 if (!isKilled) 00138 { 00139 taskList.add(task); 00140 // We assume that all requests here are writes 00141 backend.addPendingWriteRequest(task); 00142 } 00143 else 00144 task.notifyCompletion(); 00145 }
|
|
|
Returns the logger for tracing.
Definition at line 533 of file BackendWorkerThread.java. 00534 {
00535 return logger;
00536 }
|
|
Returns true if the thread has pending tasks for the given transaction.
Definition at line 262 of file BackendWorkerThread.java. Referenced by org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.commit(), and org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.rollback(). 00263 { 00264 synchronized (this) 00265 { 00266 if ((currentTaskTid != null) && (currentTaskTid.equals(tid))) 00267 // Currently executing task belong to this transaction 00268 return true; 00269 else 00270 return tidList.contains(tid); 00271 } 00272 }
|
|
Adds a task just after the last write task for the given transaction in the task list. Warning! This method is not synchronized and the caller must synchronize on the thread before calling this method. This method is usually used to insert a commit/rollback task when asynchrony is allowed between backends.
Definition at line 178 of file BackendWorkerThread.java. Referenced by org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.commit(), and org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.rollback(). 00180 { 00181 if (!isKilled) 00182 { 00183 task.setHasTid(true); 00184 00185 // Find the last task index in the tid queue 00186 int lastTidIndex = tidList.lastIndexOf(transactionId); 00187 if (lastTidIndex == -1) 00188 { // Not found, add in last position 00189 taskList.add(task); 00190 tidList.add(transactionId); 00191 backend.addPendingWriteRequest(task); 00192 return; 00193 } 00194 00195 // Find the corresponding task in the task list (we have to skip 00196 // autocommit tasks) 00197 int lastRequestIndex = 0; 00198 while (lastTidIndex >= 0) 00199 { 00200 AbstractTask t = (AbstractTask) taskList.get(lastRequestIndex); 00201 if (t.hasTid()) 00202 lastTidIndex--; 00203 lastRequestIndex++; 00204 } 00205 00206 // Add the task after the last write task and the tid in the tid list. 00207 taskList.add(lastRequestIndex, task); 00208 tidList.add(lastTidIndex + 1, transactionId); 00209 // Warning, the task is added in queue (not sorted) in the backend pending 00210 // request list. 00211 backend.addPendingWriteRequest(task); 00212 } 00213 else 00214 task.notifyCompletion(); 00215 }
|
|
Kills this thread after the next task processing. It also marks all remaining tasks in the task list as failed. Definition at line 378 of file BackendWorkerThread.java. References org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.disableBackend(). 00379 { 00380 if (backend.isKilled()) 00381 return; 00382 00383 String msg = "Backend " + backend.getName() + " is shutting down"; 00384 // Remove all tasks 00385 while (!taskList.isEmpty()) 00386 { 00387 AbstractTask task = (AbstractTask) taskList.remove(0); 00388 try 00389 { 00390 task.notifyFailure(this, 1, new SQLException(msg)); 00391 } 00392 catch (SQLException ignore) 00393 { 00394 } 00395 } 00396 isKilled = true; 00397 notify(); // Wake up thread 00398 logger.info(msg); 00399 try 00400 { 00401 // This ensure that all worker threads get removed from the load balancer 00402 // list and that the backend state is set to disable. 00403 loadBalancer.disableBackend(backend); 00404 } 00405 catch (SQLException ignore) 00406 { 00407 } 00408 }
|
|
Process the tasklist and call Definition at line 414 of file BackendWorkerThread.java. References org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.disableBackend(). 00415 { 00416 currentlyProcessingTask = null; 00417 00418 while (!isKilled) 00419 { 00420 synchronized (this) 00421 { 00422 if (taskList.isEmpty()) 00423 { // Nothing to do, go to bed! 00424 try 00425 { 00426 wait(); 00427 } 00428 catch (InterruptedException e) 00429 { 00430 logger.warn(Translate.get("backendworkerthread.wait.interrupted")); 00431 } 00432 } 00433 try 00434 { // Take the 1st task from the list 00435 currentlyProcessingTask = (AbstractTask) taskList.remove(0); 00436 if (currentlyProcessingTask.hasTid()) 00437 currentTaskTid = (Long) tidList.remove(0); 00438 else 00439 currentTaskTid = null; 00440 } 00441 catch (IndexOutOfBoundsException oob) 00442 { 00443 logger.warn(Translate.get("backendworkerthread.no.task"), oob); 00444 } 00445 } 00446 // Execute the task out of the sync block 00447 try 00448 { 00449 if (logger.isDebugEnabled()) 00450 logger.debug(Translate.get("backendworkerthread.execute.task", 00451 currentlyProcessingTask.toString())); 00452 currentlyProcessingTask.execute(this); 00453 } 00454 catch (SQLException e) 00455 { 00456 // Task should have notified of failure 00457 logger.warn(Translate.get("backendworkerthread.task.failed", e)); 00458 } 00459 catch (RuntimeException re) 00460 { 00461 // We can't know for sure if the task has notified the failure or not. 00462 // To prevent a deadlock, we force the failure notification here. 00463 try 00464 { 00465 currentlyProcessingTask.notifyFailure(this, 1, new SQLException(re 00466 .getMessage())); 00467 } 00468 catch (SQLException e1) 00469 { 00470 // just notify 00471 } 00472 logger.fatal(Translate.get( 00473 "backendworkerthread.task.runtime.exception", 00474 currentlyProcessingTask.toString()), re); 00475 } 00476 finally 00477 { 00478 try 00479 { 00480 backend.removePendingRequest(currentlyProcessingTask); 00481 } 00482 catch (RuntimeException e) 00483 { 00484 logger.warn( 00485 Translate.get("backendworkerthread.remove.task.error", e), e); 00486 } 00487 } 00488 00489 // Notify the completion of the task if someone is waiting for 00490 // the completion of this transaction. 00491 // @see #waitForAllTasksToComplete() 00492 // @see #waitForAllTasksToComplete(long) 00493 synchronized (this) 00494 { 00495 notifyAll(); 00496 currentlyProcessingTask = null; 00497 currentTaskTid = null; 00498 } 00499 } 00500 00501 // Automatically disable the backend when the thread dies 00502 try 00503 { 00504 if (backend.isReadEnabled() || backend.isWriteEnabled()) 00505 loadBalancer.disableBackend(backend); 00506 } 00507 catch (SQLException e) 00508 { 00509 logger.error(Translate.get("backendworkerthread.backend.disable.failed", 00510 new String[]{backend.getName(), e.getMessage()})); 00511 } 00512 }
|
|
Waits for all current tasks to complete. Definition at line 327 of file BackendWorkerThread.java. 00328 { 00329 synchronized (this) 00330 { 00331 Object current; 00332 if (taskList.size() == 0) 00333 { 00334 if (currentlyProcessingTask != null) 00335 { 00336 try 00337 { 00338 if (logger.isDebugEnabled()) 00339 logger.debug(Translate.get("backendworkerthread.waiting.task")); 00340 wait(); 00341 } 00342 catch (InterruptedException ignore) 00343 { 00344 logger.warn(Translate 00345 .get("backendworkerthread.no.full.task.synchronization")); 00346 } 00347 return; 00348 } 00349 else 00350 { // No task currently executing 00351 return; 00352 } 00353 } 00354 else 00355 current = taskList.get(taskList.size() - 1); 00356 00357 if (logger.isDebugEnabled()) 00358 logger.debug(Translate.get("backendworkerthread.waiting.request", 00359 current.toString())); 00360 00361 while (taskList.contains(current)) 00362 { 00363 try 00364 { 00365 wait(); 00366 } 00367 catch (InterruptedException ignore) 00368 { 00369 } 00370 } 00371 } 00372 }
|
|
Waits for all tasks of the specified transaction to complete.
Definition at line 279 of file BackendWorkerThread.java. Referenced by org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.waitForAllWritesToComplete(), and org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.waitForAllWritesToComplete(). 00280 { 00281 if ((transactionId == 0) || (tidList == null)) 00282 return; 00283 00284 Long tid = new Long(transactionId); 00285 synchronized (this) 00286 { 00287 if (!tidList.contains(tid)) 00288 { 00289 if ((currentTaskTid != null) 00290 && (currentTaskTid.longValue() == transactionId)) 00291 { 00292 try 00293 { 00294 if (logger.isDebugEnabled()) 00295 logger.debug(Translate.get("backendworkerthread.waiting.task")); 00296 wait(); 00297 } 00298 catch (InterruptedException ignore) 00299 { 00300 } 00301 return; 00302 } 00303 else 00304 return; 00305 } 00306 00307 while (tidList.contains(tid)) 00308 { 00309 if (logger.isDebugEnabled()) 00310 logger.debug(Translate.get("backendworkerthread.waiting.transaction", 00311 String.valueOf(tid))); 00312 00313 try 00314 { 00315 wait(); 00316 } 00317 catch (InterruptedException ignore) 00318 { 00319 } 00320 } 00321 } 00322 }
|