00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.recoverylog;
00026
00027 import java.sql.SQLException;
00028 import java.util.ArrayList;
00029
00030 import org.objectweb.cjdbc.common.i18n.Translate;
00031 import org.objectweb.cjdbc.common.jmx.notifications.CjdbcNotificationList;
00032 import org.objectweb.cjdbc.common.log.Trace;
00033 import org.objectweb.cjdbc.common.shared.BackendState;
00034 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00035 import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00036 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00037 import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
00038 import org.objectweb.cjdbc.controller.loadbalancer.tasks.BeginTask;
00039 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00040 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00041
00042
00043
00044
00045
00046
00047
00048
00049 public class JDBCRecoverThread extends Thread
00050 {
00051 static Trace logger = Trace.getLogger(JDBCRecoverThread.class
00052 .getName());
00053
00054 private AbstractRecoveryLog recoveryLog;
00055 private DatabaseBackend backend;
00056 private AbstractLoadBalancer loadBalancer;
00057 private SQLException exception;
00058
00059 private BackendWorkerThread bwt;
00060 private ArrayList tids;
00061
00062 private AbstractScheduler scheduler;
00063
00064 private String checkpointName;
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075 public JDBCRecoverThread(AbstractScheduler scheduler,
00076 AbstractRecoveryLog recoveryLog, DatabaseBackend backend,
00077 AbstractLoadBalancer loadBalancer, String checkpointName)
00078 {
00079 this.scheduler = scheduler;
00080 this.recoveryLog = recoveryLog;
00081 this.backend = backend;
00082 this.loadBalancer = loadBalancer;
00083 this.checkpointName = checkpointName;
00084 tids = new ArrayList();
00085 }
00086
00087
00088
00089
00090
00091
00092 public SQLException getException()
00093 {
00094 return exception;
00095 }
00096
00097
00098
00099
00100 public void run()
00101 {
00102 backend.setState(BackendState.REPLAYING);
00103 try
00104 {
00105 backend.initializeConnections();
00106 }
00107 catch (SQLException e)
00108 {
00109 recoveryFailed(e);
00110 return;
00111 }
00112 recoveryLog.beginRecovery();
00113
00114
00115 long logIdx;
00116 try
00117 {
00118 logIdx = recoveryLog.getCheckpointRequestId(checkpointName);
00119 }
00120 catch (SQLException e)
00121 {
00122 recoveryLog.endRecovery();
00123 String msg = Translate.get("recovery.cannot.get.checkpoint", e);
00124 logger.error(msg);
00125 recoveryFailed(new SQLException(msg));
00126 return;
00127 }
00128
00129 try
00130 {
00131
00132
00133 logIdx = recover(logIdx);
00134 }
00135 catch (SQLException e)
00136 {
00137 recoveryFailed(e);
00138 return;
00139 }
00140
00141 try
00142 {
00143
00144 scheduler.suspendWrites();
00145 }
00146 catch (SQLException e1)
00147 {
00148 recoveryFailed(e1);
00149 return;
00150 }
00151
00152 try
00153 {
00154
00155 logIdx = recover(logIdx + 1);
00156 }
00157 catch (SQLException e2)
00158 {
00159 recoveryFailed(e2);
00160 return;
00161 }
00162
00163 try
00164 {
00165
00166 loadBalancer.enableBackend(backend, true);
00167 }
00168 catch (SQLException e3)
00169 {
00170 recoveryFailed(e3);
00171 return;
00172 }
00173 scheduler.resumeWrites();
00174 logger.info(Translate.get("backend.state.enabled", backend.getName()));
00175 }
00176
00177
00178
00179
00180
00181
00182
00183 private void recoveryFailed(SQLException e)
00184 {
00185 this.exception = e;
00186
00187 if (scheduler.isSuspendedWrites())
00188 scheduler.resumeWrites();
00189
00190 backend.setLastKnownCheckpoint(null);
00191 backend.setState(BackendState.DISABLED);
00192 backend.notifyJmxError(
00193 CjdbcNotificationList.VIRTUALDATABASE_BACKEND_REPLAYING_FAILED, e);
00194 }
00195
00196
00197
00198
00199
00200
00201
00202
00203 private long recover(long logIdx) throws SQLException
00204 {
00205 bwt = new BackendWorkerThread("Worker thread for recovery on backend:"
00206 + backend.getName(), backend, loadBalancer);
00207 bwt.start();
00208 RecoveryTask recoveryTask = null;
00209 AbstractTask abstractTask = null;
00210
00211 logger.info(Translate.get("recovery.start.process"));
00212
00213 long tid;
00214
00215 while (logIdx != -1)
00216 {
00217 try
00218 {
00219 recoveryTask = recoveryLog.recoverNextRequest(logIdx);
00220 }
00221 catch (SQLException e)
00222 {
00223
00224 recoveryLog.endRecovery();
00225 addWorkerTask(bwt, new KillThreadTask(1, 1));
00226 String msg = Translate.get("recovery.cannot.recover.from.index", e);
00227 logger.error(msg, e);
00228 throw new SQLException(msg);
00229 }
00230 if (recoveryTask == null)
00231 break;
00232 tid = recoveryTask.getTid();
00233 if (tid != 0)
00234 {
00235 if (recoveryTask.getTask() instanceof BeginTask)
00236 tids.add(new Long(tid));
00237 else if (!tids.contains(new Long(tid)))
00238 {
00239
00240
00241
00242
00243
00244 logIdx++;
00245 continue;
00246 }
00247 }
00248
00249 abstractTask = recoveryTask.getTask();
00250 logIdx = recoveryTask.getId();
00251 synchronized (abstractTask)
00252 {
00253 try
00254 {
00255 addWorkerTask(bwt, abstractTask);
00256 abstractTask.wait();
00257 if (abstractTask.getFailed() > 0)
00258 {
00259
00260 recoveryLog.endRecovery();
00261 addWorkerTask(bwt, new KillThreadTask(1, 1));
00262 String msg = Translate.get("recovery.failed.with.error",
00263 new String[]{
00264 abstractTask.toString(),
00265 ((Exception) abstractTask.getExceptions().get(0))
00266 .getMessage()});
00267 logger.error(msg);
00268 throw new SQLException(msg);
00269 }
00270 }
00271 catch (InterruptedException e1)
00272 {
00273
00274 recoveryLog.endRecovery();
00275 addWorkerTask(bwt, new KillThreadTask(1, 1));
00276 throw new SQLException(Translate.get(
00277 "recovery.interrupted.with.request", recoveryTask.getTask()
00278 .toString()));
00279 }
00280 }
00281 }
00282 return logIdx;
00283 }
00284
00285
00286
00287
00288
00289
00290
00291 private void addWorkerTask(BackendWorkerThread bwt, AbstractTask task)
00292 {
00293 synchronized (bwt)
00294 {
00295 bwt.addTask(task);
00296 bwt.notify();
00297 }
00298 }
00299
00300
00301
00302
00303
00304 public void endRecovery()
00305 {
00306
00307 logger.info(Translate.get("recovery.process.complete"));
00308 if (bwt != null)
00309 {
00310 addWorkerTask(bwt, new KillThreadTask(1, 1));
00311 try
00312 {
00313 bwt.join();
00314 }
00315 catch (InterruptedException e)
00316 {
00317 recoveryLog.endRecovery();
00318 String msg = Translate.get("recovery.join.failed", e);
00319 logger.error(msg, e);
00320 exception = new SQLException(msg);
00321 }
00322 }
00323
00324 recoveryLog.endRecovery();
00325 }
00326
00327 }