Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

JDBCRecoverThread.java

00001 /**
00002  * C-JDBC: Clustered JDBC.
00003  * Copyright (C) 2002-2005 French National Institute For Research In Computer
00004  * Science And Control (INRIA).
00005  * Contact: c-jdbc@objectweb.org
00006  * 
00007  * This library is free software; you can redistribute it and/or modify it
00008  * under the terms of the GNU Lesser General Public License as published by the
00009  * Free Software Foundation; either version 2.1 of the License, or any later
00010  * version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but WITHOUT
00013  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00014  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
00015  * for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public License
00018  * along with this library; if not, write to the Free Software Foundation,
00019  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
00020  *
00021  * Initial developer(s): Nicolas Modrzyk.
00022  * Contributor(s): Emmanuel Cecchet.
00023  */
00024 
00025 package org.objectweb.cjdbc.controller.recoverylog;
00026 
00027 import java.sql.SQLException;
00028 import java.util.ArrayList;
00029 
00030 import org.objectweb.cjdbc.common.i18n.Translate;
00031 import org.objectweb.cjdbc.common.jmx.notifications.CjdbcNotificationList;
00032 import org.objectweb.cjdbc.common.log.Trace;
00033 import org.objectweb.cjdbc.common.shared.BackendState;
00034 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00035 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00036 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00037 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
00038 import org.objectweb.cjdbc.controller.loadbalancer.tasks.BeginTask;
00039 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00040 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00041 
00042 /**
00043  * This class defines a JDBCRecoverThread
00044  * 
00045  * @author <a href="mailto:Nicolas.Modrzyk@inrialpes.fr">Nicolas Modrzyk </a>
00046  * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
00047  * @version 1.0
00048  */
00049 public class JDBCRecoverThread extends Thread
00050 {
00051   static Trace                 logger = Trace.getLogger(JDBCRecoverThread.class
00052                                           .getName());
00053 
00054   private AbstractRecoveryLog  recoveryLog;
00055   private DatabaseBackend      backend;
00056   private AbstractLoadBalancer loadBalancer;
00057   private SQLException         exception;
00058 
00059   private BackendWorkerThread  bwt;
00060   private ArrayList            tids;
00061 
00062   private AbstractScheduler    scheduler;
00063 
00064   private String               checkpointName;
00065 
00066   /**
00067    * Creates a new <code>JDBCRecoverThread</code> object
00068    * 
00069    * @param scheduler the currently used scheduler
00070    * @param recoveryLog Recovery log that creates this thread
00071    * @param backend database backend for logging
00072    * @param loadBalancer index to start from for recovery
00073    * @param checkpointName load balancer to use to create a BackendWorkerThread
00074    */
00075   public JDBCRecoverThread(AbstractScheduler scheduler,
00076       AbstractRecoveryLog recoveryLog, DatabaseBackend backend,
00077       AbstractLoadBalancer loadBalancer, String checkpointName)
00078   {
00079     this.scheduler = scheduler;
00080     this.recoveryLog = recoveryLog;
00081     this.backend = backend;
00082     this.loadBalancer = loadBalancer;
00083     this.checkpointName = checkpointName;
00084     tids = new ArrayList();
00085   }
00086 
00087   /**
00088    * Returns the exception value.
00089    * 
00090    * @return Returns the exception.
00091    */
00092   public SQLException getException()
00093   {
00094     return exception;
00095   }
00096 
00097   /**
00098    * @see java.lang.Runnable#run()
00099    */
00100   public void run()
00101   {
00102     backend.setState(BackendState.REPLAYING);
00103     try
00104     {
00105       backend.initializeConnections();
00106     }
00107     catch (SQLException e)
00108     {
00109       recoveryFailed(e);
00110       return;
00111     }
00112     recoveryLog.beginRecovery();
00113 
00114     // Get the checkpoint from the recovery log
00115     long logIdx;
00116     try
00117     {
00118       logIdx = recoveryLog.getCheckpointRequestId(checkpointName);
00119     }
00120     catch (SQLException e)
00121     {
00122       recoveryLog.endRecovery();
00123       String msg = Translate.get("recovery.cannot.get.checkpoint", e);
00124       logger.error(msg);
00125       recoveryFailed(new SQLException(msg));
00126       return;
00127     }
00128 
00129     try
00130     {
00131       // Play writes from the recovery log until the last possible transaction
00132       // without blocking the scheduler
00133       logIdx = recover(logIdx);
00134     }
00135     catch (SQLException e)
00136     {
00137       recoveryFailed(e);
00138       return;
00139     }
00140 
00141     try
00142     {
00143       // Suspend the writes
00144       scheduler.suspendWrites();
00145     }
00146     catch (SQLException e1)
00147     {
00148       recoveryFailed(e1);
00149       return;
00150     }
00151 
00152     try
00153     {
00154       // Play the remaining writes that were pending
00155       logIdx = recover(logIdx + 1);
00156     }
00157     catch (SQLException e2)
00158     {
00159       recoveryFailed(e2);
00160       return;
00161     }
00162 
00163     try
00164     {
00165       // Now enable it
00166       loadBalancer.enableBackend(backend, true);
00167     }
00168     catch (SQLException e3)
00169     {
00170       recoveryFailed(e3);
00171       return;
00172     }
00173     scheduler.resumeWrites();
00174     logger.info(Translate.get("backend.state.enabled", backend.getName()));
00175   }
00176 
00177   /**
00178    * Unset the last known checkpoint and set the backend to disabled state. This
00179    * should be called when the recovery has failed.
00180    * 
00181    * @param e cause of the recovery failure
00182    */
00183   private void recoveryFailed(SQLException e)
00184   {
00185     this.exception = e;
00186 
00187     if (scheduler.isSuspendedWrites())
00188       scheduler.resumeWrites();
00189 
00190     backend.setLastKnownCheckpoint(null);
00191     backend.setState(BackendState.DISABLED);
00192     backend.notifyJmxError(
00193         CjdbcNotificationList.VIRTUALDATABASE_BACKEND_REPLAYING_FAILED, e);
00194   }
00195 
00196   /**
00197    * Replay the recovery log from the given logIdx index
00198    * 
00199    * @param logIdx logIdx used to start the recovery
00200    * @return last logIdx that was replayed.
00201    * @throws SQLException if fails
00202    */
00203   private long recover(long logIdx) throws SQLException
00204   {
00205     bwt = new BackendWorkerThread("Worker thread for recovery on backend:"
00206         + backend.getName(), backend, loadBalancer);
00207     bwt.start();
00208     RecoveryTask recoveryTask = null;
00209     AbstractTask abstractTask = null;
00210 
00211     logger.info(Translate.get("recovery.start.process"));
00212 
00213     long tid;
00214     // Replay the whole log
00215     while (logIdx != -1)
00216     {
00217       try
00218       {
00219         recoveryTask = recoveryLog.recoverNextRequest(logIdx);
00220       }
00221       catch (SQLException e)
00222       {
00223         // Signal end of recovery and kill worker thread
00224         recoveryLog.endRecovery();
00225         addWorkerTask(bwt, new KillThreadTask(1, 1));
00226         String msg = Translate.get("recovery.cannot.recover.from.index", e);
00227         logger.error(msg, e);
00228         throw new SQLException(msg);
00229       }
00230       if (recoveryTask == null)
00231         break;
00232       tid = recoveryTask.getTid();
00233       if (tid != 0)
00234       {
00235         if (recoveryTask.getTask() instanceof BeginTask)
00236           tids.add(new Long(tid));
00237         else if (!tids.contains(new Long(tid)))
00238         {
00239           /*
00240            * if the task transaction id does not have a corresponding begin (it
00241            * is not in the tids arraylist), then this task has already been
00242            * played when the backend was disabled. So we can skip it.
00243            */
00244           logIdx++;
00245           continue;
00246         }
00247       } // else autocommit ok
00248 
00249       abstractTask = recoveryTask.getTask();
00250       logIdx = recoveryTask.getId();
00251       synchronized (abstractTask)
00252       {
00253         try
00254         {
00255           addWorkerTask(bwt, abstractTask);
00256           abstractTask.wait();
00257           if (abstractTask.getFailed() > 0)
00258           {
00259             // Signal end of recovery and kill worker thread
00260             recoveryLog.endRecovery();
00261             addWorkerTask(bwt, new KillThreadTask(1, 1));
00262             String msg = Translate.get("recovery.failed.with.error",
00263                 new String[]{
00264                     abstractTask.toString(),
00265                     ((Exception) abstractTask.getExceptions().get(0))
00266                         .getMessage()});
00267             logger.error(msg);
00268             throw new SQLException(msg);
00269           }
00270         }
00271         catch (InterruptedException e1)
00272         {
00273           // Signal end of recovery and kill worker thread
00274           recoveryLog.endRecovery();
00275           addWorkerTask(bwt, new KillThreadTask(1, 1));
00276           throw new SQLException(Translate.get(
00277               "recovery.interrupted.with.request", recoveryTask.getTask()
00278                   .toString()));
00279         }
00280       }
00281     }
00282     return logIdx;
00283   }
00284 
00285   /**
00286    * Add a task to a BackendWorkerThread using the proper synchronization.
00287    * 
00288    * @param bwt BackendWorkerThread to synchronize on
00289    * @param task the task to add to the thread queue
00290    */
00291   private void addWorkerTask(BackendWorkerThread bwt, AbstractTask task)
00292   {
00293     synchronized (bwt)
00294     {
00295       bwt.addTask(task);
00296       bwt.notify();
00297     }
00298   }
00299 
00300   /**
00301    * Properly end the recovery and kill the worker thread used for recovery if
00302    * it exists
00303    */
00304   public void endRecovery()
00305   {
00306     // We are done with the recovery
00307     logger.info(Translate.get("recovery.process.complete"));
00308     if (bwt != null)
00309     {
00310       addWorkerTask(bwt, new KillThreadTask(1, 1));
00311       try
00312       {
00313         bwt.join();
00314       }
00315       catch (InterruptedException e)
00316       {
00317         recoveryLog.endRecovery();
00318         String msg = Translate.get("recovery.join.failed", e);
00319         logger.error(msg, e);
00320         exception = new SQLException(msg);
00321       }
00322     }
00323 
00324     recoveryLog.endRecovery();
00325   }
00326 
00327 }

Generated on Mon Apr 11 22:01:32 2005 for C-JDBC by  doxygen 1.3.9.1