src/org/objectweb/cjdbc/controller/loadbalancer/BackendWorkerThread.java

説明を見る。
00001 00025 package org.objectweb.cjdbc.controller.loadbalancer; 00026 00027 import java.sql.SQLException; 00028 import java.util.ArrayList; 00029 00030 import org.objectweb.cjdbc.common.i18n.Translate; 00031 import org.objectweb.cjdbc.common.log.Trace; 00032 import org.objectweb.cjdbc.controller.backend.DatabaseBackend; 00033 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask; 00034 00041 public class BackendWorkerThread extends Thread 00042 { 00043 // 00044 // How the code is organized ? 00045 // 1. Member variables 00046 // 2. Constructor(s) 00047 // 3. Task management 00048 // 4. Getter/Setters 00049 // 00050 00051 private AbstractLoadBalancer loadBalancer; 00052 private DatabaseBackend backend; 00053 private ArrayList taskList; 00054 private ArrayList tidList; 00055 private boolean isKilled = false; 00056 00057 // true if we are currently processing a task 00058 private boolean currentlyProcessing = false; 00059 00060 private Trace logger = null; 00061 00062 /* 00063 * Constructor 00064 */ 00065 00073 public BackendWorkerThread(DatabaseBackend backend, 00074 AbstractLoadBalancer loadBalancer) throws SQLException 00075 { 00076 this("BackendWorkerThread for backend:" + backend.getName() 00077 + " and a loadBalancer level:" + loadBalancer.getRAIDbLevel(), backend, 00078 loadBalancer); 00079 } 00080 00089 public BackendWorkerThread(String name, DatabaseBackend backend, 00090 AbstractLoadBalancer loadBalancer) throws SQLException 00091 { 00092 super(name); 00093 // Sanity checks 00094 if (backend == null) 00095 { 00096 String msg = Translate.get("backendworkerthread.null.backend"); 00097 logger = Trace 00098 .getLogger("org.objectweb.cjdbc.controller.backend.DatabaseBackend"); 00099 logger.error(msg); 00100 throw new SQLException(msg); 00101 } 00102 00103 backend.checkDriverCompliance(); 00104 00105 logger = Trace 00106 .getLogger("org.objectweb.cjdbc.controller.backend.DatabaseBackend." 00107 + backend.getName()); 00108 00109 if (loadBalancer == null) 00110 { 00111 String msg = Translate.get("backendworkerthread.null.loadbalancer"); 00112 logger.error(msg); 00113 throw new SQLException(msg); 00114 } 00115 00116 this.backend = backend; 00117 this.loadBalancer = loadBalancer; 00118 taskList = new ArrayList(); 00119 } 00120 00121 /* 00122 * Task Management 00123 */ 00124 00132 public void addTask(AbstractTask task) 00133 { 00134 if (!isKilled) 00135 { 00136 taskList.add(task); 00137 // We assume that all requests here are writes 00138 backend.addPendingWriteRequest(task); 00139 } 00140 else 00141 task.notifyCompletion(); 00142 } 00143 00152 public void addTask(AbstractTask task, long transactionId) 00153 { 00154 if (!isKilled) 00155 { 00156 task.setHasTid(true); 00157 addTask(task); 00158 if (tidList == null) // Instantiate the list if needed 00159 tidList = new ArrayList(); 00160 tidList.add(new Long(transactionId)); 00161 } 00162 else 00163 task.notifyCompletion(); 00164 } 00165 00173 public void addPriorityTask(AbstractTask task) 00174 { 00175 if (!isKilled) 00176 { 00177 taskList.add(0, task); 00178 // We assume that all requests here are writes 00179 backend.addPendingWriteRequest(task); 00180 } 00181 else 00182 task.notifyCompletion(); 00183 } 00184 00193 public void addPriorityTask(AbstractTask task, long transactionId) 00194 { 00195 if (!isKilled) 00196 { 00197 task.setHasTid(true); 00198 addPriorityTask(task); 00199 tidList.add(0, new Long(transactionId)); 00200 } 00201 else 00202 task.notifyCompletion(); 00203 } 00204 00214 public boolean removeTask(AbstractTask task) 00215 { 00216 backend.removePendingRequest(task); 00217 if (tidList == null) 00218 return taskList.remove(task); 00219 else 00220 { 00221 int idx = taskList.indexOf(task); 00222 if (idx == -1) 00223 return false; 00224 else 00225 { 00226 taskList.remove(idx); 00227 tidList.remove(idx); 00228 return true; 00229 } 00230 } 00231 } 00232 00238 public void waitForAllTasksToComplete(long transactionId) 00239 { 00240 if ((transactionId == 0) || (tidList == null)) 00241 return; 00242 00243 Long tid = new Long(transactionId); 00244 synchronized (this) 00245 { 00246 if (!tidList.contains(tid)) 00247 { 00248 if (currentlyProcessing) 00249 { // We are not sure that the task currently executing belongs to the 00250 // transaction we are interested in or not. So we wait for the 00251 // completion of the current task, it is safer. 00252 try 00253 { 00254 if (logger.isDebugEnabled()) 00255 logger.debug(Translate.get("backendworkerthread.waiting.task")); 00256 wait(); 00257 } 00258 catch (InterruptedException ignore) 00259 { 00260 } 00261 return; 00262 } 00263 else 00264 return; 00265 } 00266 00267 while (tidList.contains(tid)) 00268 { 00269 if (logger.isDebugEnabled()) 00270 logger.debug(Translate.get("backendworkerthread.waiting.transaction", 00271 String.valueOf(tid))); 00272 00273 try 00274 { 00275 wait(); 00276 } 00277 catch (InterruptedException ignore) 00278 { 00279 } 00280 } 00281 } 00282 } 00283 00287 public void waitForAllTasksToComplete() 00288 { 00289 synchronized (this) 00290 { 00291 Object current; 00292 if (taskList.size() == 0) 00293 { 00294 if (currentlyProcessing) 00295 { 00296 try 00297 { 00298 if (logger.isDebugEnabled()) 00299 logger.debug(Translate.get("backendworkerthread.waiting.task")); 00300 wait(); 00301 } 00302 catch (InterruptedException ignore) 00303 { 00304 logger.warn(Translate 00305 .get("backendworkerthread.no.full.task.synchronization")); 00306 } 00307 return; 00308 } 00309 else 00310 { // No task currently executing 00311 return; 00312 } 00313 } 00314 else 00315 current = taskList.get(taskList.size() - 1); 00316 00317 if (logger.isDebugEnabled()) 00318 logger.debug(Translate.get("backendworkerthread.waiting.request", 00319 current.toString())); 00320 00321 while (taskList.contains(current)) 00322 { 00323 try 00324 { 00325 wait(); 00326 } 00327 catch (InterruptedException ignore) 00328 { 00329 } 00330 } 00331 } 00332 } 00333 00338 public synchronized void kill() 00339 { 00340 if (backend.isKilled()) 00341 return; 00342 00343 String msg = "Backend " + backend.getName() + " is shutting down"; 00344 // Remove all tasks 00345 while (!taskList.isEmpty()) 00346 { 00347 AbstractTask task = (AbstractTask) taskList.remove(0); 00348 try 00349 { 00350 task.notifyFailure(this, 1, new SQLException(msg)); 00351 } 00352 catch (SQLException ignore) 00353 { 00354 } 00355 } 00356 isKilled = true; 00357 notify(); // Wake up thread 00358 logger.info(msg); 00359 try 00360 { 00361 // This ensure that all worker threads get removed from the load balancer 00362 // list and that the backend state is set to disable. 00363 loadBalancer.disableBackend(backend); 00364 } 00365 catch (SQLException ignore) 00366 { 00367 } 00368 } 00369 00374 public void run() 00375 { 00376 AbstractTask task = null; 00377 00378 while (!isKilled) 00379 { 00380 synchronized (this) 00381 { 00382 if (taskList.isEmpty()) 00383 { // Nothing to do, go to bed! 00384 try 00385 { 00386 wait(); 00387 } 00388 catch (InterruptedException e) 00389 { 00390 logger.warn(Translate.get("backendworkerthread.wait.interrupted")); 00391 } 00392 } 00393 try 00394 { // Take the 1st task from the list 00395 task = (AbstractTask) taskList.remove(0); 00396 currentlyProcessing = true; 00397 } 00398 catch (IndexOutOfBoundsException oob) 00399 { 00400 logger.warn(Translate.get("backendworkerthread.no.task")); 00401 } 00402 } 00403 // Execute the task out of the sync block 00404 try 00405 { 00406 if (logger.isDebugEnabled()) 00407 logger.debug(Translate.get("backendworkerthread.execute.task", task 00408 .toString())); 00409 task.execute(this); 00410 } 00411 catch (SQLException e) 00412 { 00413 // Task should have notified of failure 00414 logger.warn(Translate.get("backendworkerthread.task.failed", e)); 00415 } 00416 catch (RuntimeException re) 00417 { 00418 // We can't know for sure if the task has notified the failure or not. 00419 // To prevent a deadlock, we force the failure notification here. 00420 try 00421 { 00422 task.notifyFailure(this, 1, new SQLException(re.getMessage())); 00423 } 00424 catch (SQLException e1) 00425 { 00426 // just notify 00427 } 00428 logger.fatal(Translate.get( 00429 "backendworkerthread.task.runtime.exception", task.toString()), re); 00430 } 00431 finally 00432 { 00433 try 00434 { 00435 backend.removePendingRequest(task); 00436 } 00437 catch (RuntimeException e) 00438 { 00439 logger.warn( 00440 Translate.get("backendworkerthread.remove.task.error", e), e); 00441 } 00442 } 00443 00444 if (task.hasTid()) 00445 { 00446 // Notify the completion of the task if someone is waiting for 00447 // the completion of this transaction (@see #waitForAllTasksToComplete) 00448 synchronized (this) 00449 { 00450 if (tidList != null) 00451 tidList.remove(0); 00452 notifyAll(); 00453 currentlyProcessing = false; 00454 } 00455 } 00456 } 00457 00458 // Automatically disable the backend when the thread dies 00459 try 00460 { 00461 if (backend.isReadEnabled() || backend.isWriteEnabled()) 00462 loadBalancer.disableBackend(backend); 00463 } 00464 catch (SQLException e) 00465 { 00466 logger.error(Translate.get("backendworkerthread.backend.disable.failed", 00467 new String[]{backend.getName(), e.getMessage()})); 00468 } 00469 } 00470 00471 /* 00472 * Getter/Setter 00473 */ 00474 00480 public DatabaseBackend getBackend() 00481 { 00482 return backend; 00483 } 00484 00490 public Trace getLogger() 00491 { 00492 return logger; 00493 } 00494 }

CJDBCversion1.0.4に対してTue Oct 12 15:16:01 2004に生成されました。 doxygen 1.3.8