00001
00025
package org.objectweb.cjdbc.controller.recoverylog;
00026
00027
import java.sql.SQLException;
00028
import java.util.ArrayList;
00029
00030
import org.objectweb.cjdbc.common.i18n.Translate;
00031
import org.objectweb.cjdbc.common.log.Trace;
00032
import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00033
import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00034
import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00035
import org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask;
00036
import org.objectweb.cjdbc.controller.loadbalancer.tasks.BeginTask;
00037
import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00038
00045 public class JDBCRecoverThread implements Runnable
00046 {
00047
00048 static Trace
logger = Trace.getLogger(
JDBCRecoverThread.class
00049 .getName());
00050
00051 AbstractRecoveryLog recoveryLog;
00052 DatabaseBackend
backend;
00053 long logIdx;
00054 AbstractLoadBalancer
loadBalancer;
00055 SQLException
exception;
00056
00057 BackendWorkerThread
bwt;
00058 ArrayList
tids;
00059
00068 public JDBCRecoverThread(
AbstractRecoveryLog recoveryLog,
00069 DatabaseBackend backend,
long recoveryTransactionId,
00070 AbstractLoadBalancer loadBalancer)
00071 {
00072
this.recoveryLog = recoveryLog;
00073
this.backend = backend;
00074
this.logIdx = recoveryTransactionId;
00075
this.loadBalancer = loadBalancer;
00076
tids =
new ArrayList();
00077 }
00078
00084 public long getLogIdx()
00085 {
00086
return logIdx;
00087 }
00088
00094 public void setLogIdx(
long logIdx)
00095 {
00096
this.logIdx = logIdx;
00097 }
00098
00104 public SQLException
getException()
00105 {
00106
return exception;
00107 }
00108
00112 public void run()
00113 {
00114
try
00115 {
00116
bwt =
new BackendWorkerThread(
"Worker thread for recovery on backend:"
00117 +
backend.getName(),
backend,
loadBalancer);
00118
bwt.start();
00119
RecoveryTask recoveryTask = null;
00120
AbstractTask abstractTask = null;
00121
00122
logger.info(
Translate.get(
"recovery.start.process"));
00123
00124
long tid;
00125
00126
while (
logIdx != -1)
00127 {
00128
try
00129 {
00130 recoveryTask =
recoveryLog.
recoverNextRequest(
logIdx);
00131 }
00132
catch (SQLException e)
00133 {
00134
00135
recoveryLog.
endRecovery();
00136
addWorkerTask(
bwt,
new KillThreadTask(1, 1));
00137 String msg =
Translate.get(
"recovery.cannot.recover.from.index", e);
00138
logger.error(msg, e);
00139
throw new SQLException(msg);
00140 }
00141
if (recoveryTask == null)
00142
break;
00143 tid = recoveryTask.
getTid();
00144
if (tid == 0)
00145
00146 ;
00147
else
00148 {
00149
if (recoveryTask.
getTask() instanceof
BeginTask)
00150
tids.add(
new Long(tid));
00151
else if (!
tids.contains(
new Long(tid)))
00152 {
00153
00154
00155
00156
00157
00158
logIdx++;
00159
continue;
00160 }
00161 }
00162 abstractTask = recoveryTask.
getTask();
00163
logIdx = recoveryTask.
getId();
00164
synchronized (abstractTask)
00165 {
00166
try
00167 {
00168
addWorkerTask(
bwt, abstractTask);
00169 abstractTask.wait();
00170
if (abstractTask.
getFailed() > 0)
00171 {
00172
00173
recoveryLog.
endRecovery();
00174
addWorkerTask(
bwt,
new KillThreadTask(1, 1));
00175 String msg =
Translate.get(
"recovery.failed.with.error",
00176
new String[]{
00177 abstractTask.toString(),
00178 ((Exception) abstractTask.
getExceptions().get(0))
00179 .getMessage()});
00180
logger.error(msg);
00181
throw new SQLException(msg);
00182 }
00183 }
00184
catch (InterruptedException e1)
00185 {
00186
00187
recoveryLog.
endRecovery();
00188
addWorkerTask(
bwt,
new KillThreadTask(1, 1));
00189
throw new SQLException(
Translate.get(
00190
"recovery.interrupted.with.request", recoveryTask.
getTask()
00191 .toString()));
00192 }
00193 }
00194 }
00195 }
00196
catch (SQLException e)
00197 {
00198 exception = e;
00199 }
00200 }
00201
00208 private void addWorkerTask(BackendWorkerThread bwt,
AbstractTask task)
00209 {
00210
synchronized (bwt)
00211 {
00212 bwt.addTask(task);
00213 bwt.notify();
00214 }
00215 }
00216
00221 public void endRecovery()
00222 {
00223
00224 logger.info(
Translate.get(
"recovery.process.complete"));
00225
if (bwt != null)
00226 {
00227 addWorkerTask(bwt,
new KillThreadTask(1, 1));
00228
try
00229 {
00230 bwt.join();
00231 }
00232
catch (InterruptedException e)
00233 {
00234 recoveryLog.endRecovery();
00235 String msg =
Translate.get(
"recovery.join.failed", e);
00236 logger.error(msg, e);
00237 exception =
new SQLException(msg);
00238 }
00239 }
00240
00241 recoveryLog.endRecovery();
00242 }
00243
00244 }