Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

BackendWorkerThread.java

00001 /**
00002  * C-JDBC: Clustered JDBC.
00003  * Copyright (C) 2002-2004 French National Institute For Research In Computer
00004  * Science And Control (INRIA).
00005  * Contact: c-jdbc@objectweb.org
00006  * 
00007  * This library is free software; you can redistribute it and/or modify it
00008  * under the terms of the GNU Lesser General Public License as published by the
00009  * Free Software Foundation; either version 2.1 of the License, or any later
00010  * version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but WITHOUT
00013  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00014  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
00015  * for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public License
00018  * along with this library; if not, write to the Free Software Foundation,
00019  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
00020  *
00021  * Initial developer(s): Emmanuel Cecchet.
00022  * Contributor(s): 
00023  */
00024 
00025 package org.objectweb.cjdbc.controller.loadbalancer;
00026 
00027 import java.sql.SQLException;
00028 import java.util.ArrayList;
00029 
00030 import org.objectweb.cjdbc.common.i18n.Translate;
00031 import org.objectweb.cjdbc.common.log.Trace;
00032 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00033 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
00034 
00035 /**
00036  * Process sequentially a set of tasks and send them to a backend.
00037  * 
00038  * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
00039  * @version 1.0
00040  */
00041 public class BackendWorkerThread extends Thread
00042 {
00043   //
00044   // How the code is organized ?
00045   // 1. Member variables
00046   // 2. Constructor(s)
00047   // 3. Task management
00048   // 4. Getter/Setters
00049   //
00050 
00051   private AbstractLoadBalancer loadBalancer;
00052   private DatabaseBackend      backend;
00053   private ArrayList            taskList;
00054   private ArrayList            tidList;
00055   private boolean              isKilled = false;
00056 
00057   // not null if we are currently processing a task
00058   private AbstractTask         currentlyProcessingTask;
00059   // Tid of the current task if currentlyProcessingTask.hasTid()==true
00060   private Long                 currentTaskTid;
00061 
00062   private Trace                logger   = null;
00063 
00064   /*
00065    * Constructor
00066    */
00067 
00068   /**
00069    * Creates a new <code>BackendWorkerThread</code>.
00070    * 
00071    * @param backend the backend this thread is associated to.
00072    * @param loadBalancer the load balancer instanciating this thread
00073    * @throws SQLException if an error occurs
00074    */
00075   public BackendWorkerThread(DatabaseBackend backend,
00076       AbstractLoadBalancer loadBalancer) throws SQLException
00077   {
00078     this("BackendWorkerThread for backend:" + backend.getName()
00079         + " and a loadBalancer level:" + loadBalancer.getRAIDbLevel(), backend,
00080         loadBalancer);
00081   }
00082 
00083   /**
00084    * Creates a new <code>BackendWorkerThread</code>.
00085    * 
00086    * @param name the name to give to the thread
00087    * @param backend the backend this thread is associated to.
00088    * @param loadBalancer the load balancer instanciating this thread
00089    * @throws SQLException if an error occurs
00090    */
00091   public BackendWorkerThread(String name, DatabaseBackend backend,
00092       AbstractLoadBalancer loadBalancer) throws SQLException
00093   {
00094     super(name);
00095     // Sanity checks
00096     if (backend == null)
00097     {
00098       String msg = Translate.get("backendworkerthread.null.backend");
00099       logger = Trace
00100           .getLogger("org.objectweb.cjdbc.controller.backend.DatabaseBackend");
00101       logger.error(msg);
00102       throw new SQLException(msg);
00103     }
00104 
00105     backend.checkDriverCompliance();
00106 
00107     logger = Trace
00108         .getLogger("org.objectweb.cjdbc.controller.backend.DatabaseBackend."
00109             + backend.getName());
00110 
00111     if (loadBalancer == null)
00112     {
00113       String msg = Translate.get("backendworkerthread.null.loadbalancer");
00114       logger.error(msg);
00115       throw new SQLException(msg);
00116     }
00117 
00118     this.backend = backend;
00119     this.loadBalancer = loadBalancer;
00120     taskList = new ArrayList();
00121     tidList = new ArrayList();
00122   }
00123 
00124   /*
00125    * Task Management
00126    */
00127 
00128   /**
00129    * Adds a task at the end of the task list. Warning! This method is not
00130    * synchronized and the caller must synchronize on the thread before calling
00131    * this method.
00132    * 
00133    * @param task the task to add
00134    */
00135   public void addTask(AbstractTask task)
00136   {
00137     if (!isKilled)
00138     {
00139       taskList.add(task);
00140       // We assume that all requests here are writes
00141       backend.addPendingWriteRequest(task);
00142     }
00143     else
00144       task.notifyCompletion();
00145   }
00146 
00147   /**
00148    * Adds a task at the end of the task list. Warning! This method is not
00149    * synchronized and the caller must synchronize on the thread before calling
00150    * this method.
00151    * 
00152    * @param task the task to add
00153    * @param transactionId transaction id in which this task execute
00154    */
00155   public void addTask(AbstractTask task, long transactionId)
00156   {
00157     if (!isKilled)
00158     {
00159       tidList.add(new Long(transactionId));
00160       task.setHasTid(true);
00161       addTask(task);
00162     }
00163     else
00164       task.notifyCompletion();
00165   }
00166 
00167   /**
00168    * Adds a task just after the last write task for the given transaction in the
00169    * task list. Warning! This method is not synchronized and the caller must
00170    * synchronize on the thread before calling this method.
00171    * <p>
00172    * This method is usually used to insert a commit/rollback task when
00173    * asynchrony is allowed between backends.
00174    * 
00175    * @param task the task to add
00176    * @param transactionId transaction id in which this task execute
00177    */
00178   public void insertTaskAfterLastWriteForTransaction(AbstractTask task,
00179       Long transactionId)
00180   {
00181     if (!isKilled)
00182     {
00183       task.setHasTid(true);
00184 
00185       // Find the last task index in the tid queue
00186       int lastTidIndex = tidList.lastIndexOf(transactionId);
00187       if (lastTidIndex == -1)
00188       { // Not found, add in last position
00189         taskList.add(task);
00190         tidList.add(transactionId);
00191         backend.addPendingWriteRequest(task);
00192         return;
00193       }
00194 
00195       // Find the corresponding task in the task list (we have to skip
00196       // autocommit tasks)
00197       int lastRequestIndex = 0;
00198       while (lastTidIndex >= 0)
00199       {
00200         AbstractTask t = (AbstractTask) taskList.get(lastRequestIndex);
00201         if (t.hasTid())
00202           lastTidIndex--;
00203         lastRequestIndex++;
00204       }
00205 
00206       // Add the task after the last write task and the tid in the tid list.
00207       taskList.add(lastRequestIndex, task);
00208       tidList.add(lastTidIndex + 1, transactionId);
00209       // Warning, the task is added in queue (not sorted) in the backend pending
00210       // request list.
00211       backend.addPendingWriteRequest(task);
00212     }
00213     else
00214       task.notifyCompletion();
00215   }
00216 
00217   /**
00218    * Adds a task upfront to the task list so that this task will be the next
00219    * executed task. Warning! This method is not synchronized and the caller must
00220    * synchronize on the thread before calling this method.
00221    * 
00222    * @param task the task to add
00223    */
00224   public void addPriorityTask(AbstractTask task)
00225   {
00226     if (!isKilled)
00227     {
00228       taskList.add(0, task);
00229       // We assume that all requests here are writes
00230       backend.addPendingWriteRequest(task);
00231     }
00232     else
00233       task.notifyCompletion();
00234   }
00235 
00236   /**
00237    * Adds a task upfront to the task list so that this task will be the next
00238    * executed task. Warning! This method is not synchronized and the caller must
00239    * synchronize on the thread before calling this method
00240    * 
00241    * @param task the task to add
00242    * @param transactionId transaction id in which this task execute
00243    */
00244   public void addPriorityTask(AbstractTask task, long transactionId)
00245   {
00246     if (!isKilled)
00247     {
00248       task.setHasTid(true);
00249       addPriorityTask(task);
00250       tidList.add(0, new Long(transactionId));
00251     }
00252     else
00253       task.notifyCompletion();
00254   }
00255 
00256   /**
00257    * Returns true if the thread has pending tasks for the given transaction.
00258    * 
00259    * @param tid the transaction identifier
00260    * @return true if the task list contains task(s) for transaction tid.
00261    */
00262   public boolean hasTaskForTransaction(Long tid)
00263   {
00264     synchronized (this)
00265     {
00266       if ((currentTaskTid != null) && (currentTaskTid.equals(tid)))
00267         // Currently executing task belong to this transaction
00268         return true;
00269       else
00270         return tidList.contains(tid);
00271     }
00272   }
00273 
00274   /**
00275    * Waits for all tasks of the specified transaction to complete.
00276    * 
00277    * @param transactionId the transaction identifier
00278    */
00279   public void waitForAllTasksToComplete(long transactionId)
00280   {
00281     if ((transactionId == 0) || (tidList == null))
00282       return;
00283 
00284     Long tid = new Long(transactionId);
00285     synchronized (this)
00286     {
00287       if (!tidList.contains(tid))
00288       {
00289         if ((currentTaskTid != null)
00290             && (currentTaskTid.longValue() == transactionId))
00291         {
00292           try
00293           {
00294             if (logger.isDebugEnabled())
00295               logger.debug(Translate.get("backendworkerthread.waiting.task"));
00296             wait();
00297           }
00298           catch (InterruptedException ignore)
00299           {
00300           }
00301           return;
00302         }
00303         else
00304           return;
00305       }
00306 
00307       while (tidList.contains(tid))
00308       {
00309         if (logger.isDebugEnabled())
00310           logger.debug(Translate.get("backendworkerthread.waiting.transaction",
00311               String.valueOf(tid)));
00312 
00313         try
00314         {
00315           wait();
00316         }
00317         catch (InterruptedException ignore)
00318         {
00319         }
00320       }
00321     }
00322   }
00323 
00324   /**
00325    * Waits for all current tasks to complete.
00326    */
00327   public void waitForAllTasksToComplete()
00328   {
00329     synchronized (this)
00330     {
00331       Object current;
00332       if (taskList.size() == 0)
00333       {
00334         if (currentlyProcessingTask != null)
00335         {
00336           try
00337           {
00338             if (logger.isDebugEnabled())
00339               logger.debug(Translate.get("backendworkerthread.waiting.task"));
00340             wait();
00341           }
00342           catch (InterruptedException ignore)
00343           {
00344             logger.warn(Translate
00345                 .get("backendworkerthread.no.full.task.synchronization"));
00346           }
00347           return;
00348         }
00349         else
00350         { // No task currently executing
00351           return;
00352         }
00353       }
00354       else
00355         current = taskList.get(taskList.size() - 1);
00356 
00357       if (logger.isDebugEnabled())
00358         logger.debug(Translate.get("backendworkerthread.waiting.request",
00359             current.toString()));
00360 
00361       while (taskList.contains(current))
00362       {
00363         try
00364         {
00365           wait();
00366         }
00367         catch (InterruptedException ignore)
00368         {
00369         }
00370       }
00371     }
00372   }
00373 
00374   /**
00375    * Kills this thread after the next task processing. It also marks all
00376    * remaining tasks in the task list as failed.
00377    */
00378   public synchronized void kill()
00379   {
00380     if (backend.isKilled())
00381       return;
00382 
00383     String msg = "Backend " + backend.getName() + " is shutting down";
00384     // Remove all tasks
00385     while (!taskList.isEmpty())
00386     {
00387       AbstractTask task = (AbstractTask) taskList.remove(0);
00388       try
00389       {
00390         task.notifyFailure(this, 1, new SQLException(msg));
00391       }
00392       catch (SQLException ignore)
00393       {
00394       }
00395     }
00396     isKilled = true;
00397     notify(); // Wake up thread
00398     logger.info(msg);
00399     try
00400     {
00401       // This ensure that all worker threads get removed from the load balancer
00402       // list and that the backend state is set to disable.
00403       loadBalancer.disableBackend(backend);
00404     }
00405     catch (SQLException ignore)
00406     {
00407     }
00408   }
00409 
00410   /**
00411    * Process the tasklist and call <code>wait()</code> (on itself) when the
00412    * tasklist becomes empty.
00413    */
00414   public void run()
00415   {
00416     currentlyProcessingTask = null;
00417 
00418     while (!isKilled)
00419     {
00420       synchronized (this)
00421       {
00422         if (taskList.isEmpty())
00423         { // Nothing to do, go to bed!
00424           try
00425           {
00426             wait();
00427           }
00428           catch (InterruptedException e)
00429           {
00430             logger.warn(Translate.get("backendworkerthread.wait.interrupted"));
00431           }
00432         }
00433         try
00434         { // Take the 1st task from the list
00435           currentlyProcessingTask = (AbstractTask) taskList.remove(0);
00436           if (currentlyProcessingTask.hasTid())
00437             currentTaskTid = (Long) tidList.remove(0);
00438           else
00439             currentTaskTid = null;
00440         }
00441         catch (IndexOutOfBoundsException oob)
00442         {
00443           logger.warn(Translate.get("backendworkerthread.no.task"), oob);
00444         }
00445       }
00446       // Execute the task out of the sync block
00447       try
00448       {
00449         if (logger.isDebugEnabled())
00450           logger.debug(Translate.get("backendworkerthread.execute.task",
00451               currentlyProcessingTask.toString()));
00452         currentlyProcessingTask.execute(this);
00453       }
00454       catch (SQLException e)
00455       {
00456         // Task should have notified of failure
00457         logger.warn(Translate.get("backendworkerthread.task.failed", e));
00458       }
00459       catch (RuntimeException re)
00460       {
00461         // We can't know for sure if the task has notified the failure or not.
00462         // To prevent a deadlock, we force the failure notification here.
00463         try
00464         {
00465           currentlyProcessingTask.notifyFailure(this, 1, new SQLException(re
00466               .getMessage()));
00467         }
00468         catch (SQLException e1)
00469         {
00470           // just notify
00471         }
00472         logger.fatal(Translate.get(
00473             "backendworkerthread.task.runtime.exception",
00474             currentlyProcessingTask.toString()), re);
00475       }
00476       finally
00477       {
00478         try
00479         {
00480           backend.removePendingRequest(currentlyProcessingTask);
00481         }
00482         catch (RuntimeException e)
00483         {
00484           logger.warn(
00485               Translate.get("backendworkerthread.remove.task.error", e), e);
00486         }
00487       }
00488 
00489       // Notify the completion of the task if someone is waiting for
00490       // the completion of this transaction.
00491       // @see #waitForAllTasksToComplete()
00492       // @see #waitForAllTasksToComplete(long)
00493       synchronized (this)
00494       {
00495         notifyAll();
00496         currentlyProcessingTask = null;
00497         currentTaskTid = null;
00498       }
00499     }
00500 
00501     // Automatically disable the backend when the thread dies
00502     try
00503     {
00504       if (backend.isReadEnabled() || backend.isWriteEnabled())
00505         loadBalancer.disableBackend(backend);
00506     }
00507     catch (SQLException e)
00508     {
00509       logger.error(Translate.get("backendworkerthread.backend.disable.failed",
00510           new String[]{backend.getName(), e.getMessage()}));
00511     }
00512   }
00513 
00514   /*
00515    * Getter/Setter
00516    */
00517 
00518   /**
00519    * Returns the backend.
00520    * 
00521    * @return a <code>DatabaseBackend</code> instance
00522    */
00523   public DatabaseBackend getBackend()
00524   {
00525     return backend;
00526   }
00527 
00528   /**
00529    * Returns the logger for tracing.
00530    * 
00531    * @return a <code>Trace</code> instance
00532    */
00533   public Trace getLogger()
00534   {
00535     return logger;
00536   }
00537 
00538 }

Generated on Mon Apr 11 22:01:29 2005 for C-JDBC by  doxygen 1.3.9.1