Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread Class Reference

Collaboration diagram for org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 BackendWorkerThread (DatabaseBackend backend, AbstractLoadBalancer loadBalancer) throws SQLException
 BackendWorkerThread (String name, DatabaseBackend backend, AbstractLoadBalancer loadBalancer) throws SQLException
void addTask (AbstractTask task)
void addTask (AbstractTask task, long transactionId)
void insertTaskAfterLastWriteForTransaction (AbstractTask task, Long transactionId)
void addPriorityTask (AbstractTask task)
void addPriorityTask (AbstractTask task, long transactionId)
boolean hasTaskForTransaction (Long tid)
void waitForAllTasksToComplete (long transactionId)
void waitForAllTasksToComplete ()
synchronized void kill ()
void run ()
DatabaseBackend getBackend ()
Trace getLogger ()

Detailed Description

Process sequentially a set of tasks and send them to a backend.

Author:
Emmanuel Cecchet
Version:
1.0

Definition at line 41 of file BackendWorkerThread.java.


Constructor & Destructor Documentation

org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.BackendWorkerThread DatabaseBackend  backend,
AbstractLoadBalancer  loadBalancer
throws SQLException
 

Creates a new BackendWorkerThread.

Parameters:
backend the backend this thread is associated to.
loadBalancer the load balancer instanciating this thread
Exceptions:
SQLException if an error occurs

Definition at line 75 of file BackendWorkerThread.java.

References org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getRAIDbLevel().

00077   {
00078     this("BackendWorkerThread for backend:" + backend.getName()
00079         + " and a loadBalancer level:" + loadBalancer.getRAIDbLevel(), backend,
00080         loadBalancer);
00081   }

org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.BackendWorkerThread String  name,
DatabaseBackend  backend,
AbstractLoadBalancer  loadBalancer
throws SQLException
 

Creates a new BackendWorkerThread.

Parameters:
name the name to give to the thread
backend the backend this thread is associated to.
loadBalancer the load balancer instanciating this thread
Exceptions:
SQLException if an error occurs

Definition at line 91 of file BackendWorkerThread.java.

00093   {
00094     super(name);
00095     // Sanity checks
00096     if (backend == null)
00097     {
00098       String msg = Translate.get("backendworkerthread.null.backend");
00099       logger = Trace
00100           .getLogger("org.objectweb.cjdbc.controller.backend.DatabaseBackend");
00101       logger.error(msg);
00102       throw new SQLException(msg);
00103     }
00104 
00105     backend.checkDriverCompliance();
00106 
00107     logger = Trace
00108         .getLogger("org.objectweb.cjdbc.controller.backend.DatabaseBackend."
00109             + backend.getName());
00110 
00111     if (loadBalancer == null)
00112     {
00113       String msg = Translate.get("backendworkerthread.null.loadbalancer");
00114       logger.error(msg);
00115       throw new SQLException(msg);
00116     }
00117 
00118     this.backend = backend;
00119     this.loadBalancer = loadBalancer;
00120     taskList = new ArrayList();
00121     tidList = new ArrayList();
00122   }


Member Function Documentation

void org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask AbstractTask  task,
long  transactionId
 

Adds a task upfront to the task list so that this task will be the next executed task. Warning! This method is not synchronized and the caller must synchronize on the thread before calling this method

Parameters:
task the task to add
transactionId transaction id in which this task execute

Definition at line 244 of file BackendWorkerThread.java.

References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask().

00245   {
00246     if (!isKilled)
00247     {
00248       task.setHasTid(true);
00249       addPriorityTask(task);
00250       tidList.add(0, new Long(transactionId));
00251     }
00252     else
00253       task.notifyCompletion();
00254   }

void org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask AbstractTask  task  ) 
 

Adds a task upfront to the task list so that this task will be the next executed task. Warning! This method is not synchronized and the caller must synchronize on the thread before calling this method.

Parameters:
task the task to add

Definition at line 224 of file BackendWorkerThread.java.

Referenced by org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(), org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec.disableBackend(), org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.disableBackend(), org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec.disableBackend(), org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.disableBackend(), and org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0.disableBackend().

00225   {
00226     if (!isKilled)
00227     {
00228       taskList.add(0, task);
00229       // We assume that all requests here are writes
00230       backend.addPendingWriteRequest(task);
00231     }
00232     else
00233       task.notifyCompletion();
00234   }

void org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask AbstractTask  task,
long  transactionId
 

Adds a task at the end of the task list. Warning! This method is not synchronized and the caller must synchronize on the thread before calling this method.

Parameters:
task the task to add
transactionId transaction id in which this task execute

Definition at line 155 of file BackendWorkerThread.java.

References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask().

00156   {
00157     if (!isKilled)
00158     {
00159       tidList.add(new Long(transactionId));
00160       task.setHasTid(true);
00161       addTask(task);
00162     }
00163     else
00164       task.notifyCompletion();
00165   }

void org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask AbstractTask  task  ) 
 

Adds a task at the end of the task list. Warning! This method is not synchronized and the caller must synchronize on the thread before calling this method.

Parameters:
task the task to add

Definition at line 135 of file BackendWorkerThread.java.

Referenced by org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.commit(), org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.commit(), org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0.commit(), org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.rollback(), org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.rollback(), and org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0.rollback().

00136   {
00137     if (!isKilled)
00138     {
00139       taskList.add(task);
00140       // We assume that all requests here are writes
00141       backend.addPendingWriteRequest(task);
00142     }
00143     else
00144       task.notifyCompletion();
00145   }

DatabaseBackend org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend  ) 
 

Returns the backend.

Returns:
a DatabaseBackend instance

Definition at line 523 of file BackendWorkerThread.java.

Referenced by org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.commit(), org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.commit(), org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0.commit(), org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec.disableBackend(), org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.disableBackend(), org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec.disableBackend(), org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.disableBackend(), org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0.disableBackend(), org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.rollback(), org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.rollback(), org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0.rollback(), org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.waitForAllWritesToComplete(), and org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.waitForAllWritesToComplete().

00524   {
00525     return backend;
00526   }

Trace org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getLogger  ) 
 

Returns the logger for tracing.

Returns:
a Trace instance

Definition at line 533 of file BackendWorkerThread.java.

00534   {
00535     return logger;
00536   }

boolean org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.hasTaskForTransaction Long  tid  ) 
 

Returns true if the thread has pending tasks for the given transaction.

Parameters:
tid the transaction identifier
Returns:
true if the task list contains task(s) for transaction tid.

Definition at line 262 of file BackendWorkerThread.java.

Referenced by org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.commit(), and org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.rollback().

00263   {
00264     synchronized (this)
00265     {
00266       if ((currentTaskTid != null) && (currentTaskTid.equals(tid)))
00267         // Currently executing task belong to this transaction
00268         return true;
00269       else
00270         return tidList.contains(tid);
00271     }
00272   }

void org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.insertTaskAfterLastWriteForTransaction AbstractTask  task,
Long  transactionId
 

Adds a task just after the last write task for the given transaction in the task list. Warning! This method is not synchronized and the caller must synchronize on the thread before calling this method.

This method is usually used to insert a commit/rollback task when asynchrony is allowed between backends.

Parameters:
task the task to add
transactionId transaction id in which this task execute

Definition at line 178 of file BackendWorkerThread.java.

Referenced by org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.commit(), and org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.rollback().

00180   {
00181     if (!isKilled)
00182     {
00183       task.setHasTid(true);
00184 
00185       // Find the last task index in the tid queue
00186       int lastTidIndex = tidList.lastIndexOf(transactionId);
00187       if (lastTidIndex == -1)
00188       { // Not found, add in last position
00189         taskList.add(task);
00190         tidList.add(transactionId);
00191         backend.addPendingWriteRequest(task);
00192         return;
00193       }
00194 
00195       // Find the corresponding task in the task list (we have to skip
00196       // autocommit tasks)
00197       int lastRequestIndex = 0;
00198       while (lastTidIndex >= 0)
00199       {
00200         AbstractTask t = (AbstractTask) taskList.get(lastRequestIndex);
00201         if (t.hasTid())
00202           lastTidIndex--;
00203         lastRequestIndex++;
00204       }
00205 
00206       // Add the task after the last write task and the tid in the tid list.
00207       taskList.add(lastRequestIndex, task);
00208       tidList.add(lastTidIndex + 1, transactionId);
00209       // Warning, the task is added in queue (not sorted) in the backend pending
00210       // request list.
00211       backend.addPendingWriteRequest(task);
00212     }
00213     else
00214       task.notifyCompletion();
00215   }

synchronized void org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.kill  ) 
 

Kills this thread after the next task processing. It also marks all remaining tasks in the task list as failed.

Definition at line 378 of file BackendWorkerThread.java.

References org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.disableBackend().

00379   {
00380     if (backend.isKilled())
00381       return;
00382 
00383     String msg = "Backend " + backend.getName() + " is shutting down";
00384     // Remove all tasks
00385     while (!taskList.isEmpty())
00386     {
00387       AbstractTask task = (AbstractTask) taskList.remove(0);
00388       try
00389       {
00390         task.notifyFailure(this, 1, new SQLException(msg));
00391       }
00392       catch (SQLException ignore)
00393       {
00394       }
00395     }
00396     isKilled = true;
00397     notify(); // Wake up thread
00398     logger.info(msg);
00399     try
00400     {
00401       // This ensure that all worker threads get removed from the load balancer
00402       // list and that the backend state is set to disable.
00403       loadBalancer.disableBackend(backend);
00404     }
00405     catch (SQLException ignore)
00406     {
00407     }
00408   }

void org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.run  ) 
 

Process the tasklist and call wait() (on itself) when the tasklist becomes empty.

Definition at line 414 of file BackendWorkerThread.java.

References org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.disableBackend().

00415   {
00416     currentlyProcessingTask = null;
00417 
00418     while (!isKilled)
00419     {
00420       synchronized (this)
00421       {
00422         if (taskList.isEmpty())
00423         { // Nothing to do, go to bed!
00424           try
00425           {
00426             wait();
00427           }
00428           catch (InterruptedException e)
00429           {
00430             logger.warn(Translate.get("backendworkerthread.wait.interrupted"));
00431           }
00432         }
00433         try
00434         { // Take the 1st task from the list
00435           currentlyProcessingTask = (AbstractTask) taskList.remove(0);
00436           if (currentlyProcessingTask.hasTid())
00437             currentTaskTid = (Long) tidList.remove(0);
00438           else
00439             currentTaskTid = null;
00440         }
00441         catch (IndexOutOfBoundsException oob)
00442         {
00443           logger.warn(Translate.get("backendworkerthread.no.task"), oob);
00444         }
00445       }
00446       // Execute the task out of the sync block
00447       try
00448       {
00449         if (logger.isDebugEnabled())
00450           logger.debug(Translate.get("backendworkerthread.execute.task",
00451               currentlyProcessingTask.toString()));
00452         currentlyProcessingTask.execute(this);
00453       }
00454       catch (SQLException e)
00455       {
00456         // Task should have notified of failure
00457         logger.warn(Translate.get("backendworkerthread.task.failed", e));
00458       }
00459       catch (RuntimeException re)
00460       {
00461         // We can't know for sure if the task has notified the failure or not.
00462         // To prevent a deadlock, we force the failure notification here.
00463         try
00464         {
00465           currentlyProcessingTask.notifyFailure(this, 1, new SQLException(re
00466               .getMessage()));
00467         }
00468         catch (SQLException e1)
00469         {
00470           // just notify
00471         }
00472         logger.fatal(Translate.get(
00473             "backendworkerthread.task.runtime.exception",
00474             currentlyProcessingTask.toString()), re);
00475       }
00476       finally
00477       {
00478         try
00479         {
00480           backend.removePendingRequest(currentlyProcessingTask);
00481         }
00482         catch (RuntimeException e)
00483         {
00484           logger.warn(
00485               Translate.get("backendworkerthread.remove.task.error", e), e);
00486         }
00487       }
00488 
00489       // Notify the completion of the task if someone is waiting for
00490       // the completion of this transaction.
00491       // @see #waitForAllTasksToComplete()
00492       // @see #waitForAllTasksToComplete(long)
00493       synchronized (this)
00494       {
00495         notifyAll();
00496         currentlyProcessingTask = null;
00497         currentTaskTid = null;
00498       }
00499     }
00500 
00501     // Automatically disable the backend when the thread dies
00502     try
00503     {
00504       if (backend.isReadEnabled() || backend.isWriteEnabled())
00505         loadBalancer.disableBackend(backend);
00506     }
00507     catch (SQLException e)
00508     {
00509       logger.error(Translate.get("backendworkerthread.backend.disable.failed",
00510           new String[]{backend.getName(), e.getMessage()}));
00511     }
00512   }

void org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete  ) 
 

Waits for all current tasks to complete.

Definition at line 327 of file BackendWorkerThread.java.

00328   {
00329     synchronized (this)
00330     {
00331       Object current;
00332       if (taskList.size() == 0)
00333       {
00334         if (currentlyProcessingTask != null)
00335         {
00336           try
00337           {
00338             if (logger.isDebugEnabled())
00339               logger.debug(Translate.get("backendworkerthread.waiting.task"));
00340             wait();
00341           }
00342           catch (InterruptedException ignore)
00343           {
00344             logger.warn(Translate
00345                 .get("backendworkerthread.no.full.task.synchronization"));
00346           }
00347           return;
00348         }
00349         else
00350         { // No task currently executing
00351           return;
00352         }
00353       }
00354       else
00355         current = taskList.get(taskList.size() - 1);
00356 
00357       if (logger.isDebugEnabled())
00358         logger.debug(Translate.get("backendworkerthread.waiting.request",
00359             current.toString()));
00360 
00361       while (taskList.contains(current))
00362       {
00363         try
00364         {
00365           wait();
00366         }
00367         catch (InterruptedException ignore)
00368         {
00369         }
00370       }
00371     }
00372   }

void org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete long  transactionId  ) 
 

Waits for all tasks of the specified transaction to complete.

Parameters:
transactionId the transaction identifier

Definition at line 279 of file BackendWorkerThread.java.

Referenced by org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.waitForAllWritesToComplete(), and org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.waitForAllWritesToComplete().

00280   {
00281     if ((transactionId == 0) || (tidList == null))
00282       return;
00283 
00284     Long tid = new Long(transactionId);
00285     synchronized (this)
00286     {
00287       if (!tidList.contains(tid))
00288       {
00289         if ((currentTaskTid != null)
00290             && (currentTaskTid.longValue() == transactionId))
00291         {
00292           try
00293           {
00294             if (logger.isDebugEnabled())
00295               logger.debug(Translate.get("backendworkerthread.waiting.task"));
00296             wait();
00297           }
00298           catch (InterruptedException ignore)
00299           {
00300           }
00301           return;
00302         }
00303         else
00304           return;
00305       }
00306 
00307       while (tidList.contains(tid))
00308       {
00309         if (logger.isDebugEnabled())
00310           logger.debug(Translate.get("backendworkerthread.waiting.transaction",
00311               String.valueOf(tid)));
00312 
00313         try
00314         {
00315           wait();
00316         }
00317         catch (InterruptedException ignore)
00318         {
00319         }
00320       }
00321     }
00322   }


The documentation for this class was generated from the following file:
Generated on Mon Apr 11 22:03:54 2005 for C-JDBC by  doxygen 1.3.9.1