00001
00025
package org.objectweb.cjdbc.controller.loadbalancer.raidb1;
00026
00027
import java.sql.SQLException;
00028
import java.util.ArrayList;
00029
00030
import org.objectweb.cjdbc.common.i18n.Translate;
00031
import org.objectweb.cjdbc.common.log.Trace;
00032
import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00033
import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00034
import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00035
import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
00036
import org.objectweb.cjdbc.controller.loadbalancer.policies.errorchecking.ErrorCheckingPolicy;
00037
import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00038
import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00039
00050 public abstract class RAIDb1ec extends RAIDb1
00051 {
00052
00053
00054
00055
00056
00057 protected ArrayList
backendReadThreads;
00058 protected int nbOfConcurrentReads;
00059 protected ErrorCheckingPolicy
errorCheckingPolicy;
00060
00061 protected static Trace
logger = Trace
00062 .getLogger(
"org.objectweb.cjdbc.controller.loadbalancer.RAIDb1ec");
00063
00064
00065
00066
00067
00079 public RAIDb1ec(
VirtualDatabase vdb,
00080
WaitForCompletionPolicy waitForCompletionPolicy,
00081 ErrorCheckingPolicy errorCheckingPolicy,
int nbOfConcurrentReads)
00082
throws SQLException
00083 {
00084 super(vdb, waitForCompletionPolicy);
00085
backendReadThreads =
new ArrayList();
00086
this.errorCheckingPolicy =
errorCheckingPolicy;
00087
this.nbOfConcurrentReads =
nbOfConcurrentReads;
00088 }
00089
00090
00091
00092
00093
00106 public void enableBackend(
DatabaseBackend db,
boolean writeEnabled)
00107
throws SQLException
00108 {
00109
00110
BackendWorkerThread blockingThread =
new BackendWorkerThread(db,
this);
00111 BackendWorkerThread nonBlockingThread =
new BackendWorkerThread(db,
this);
00112
00113
00114
try
00115 {
00116 backendBlockingThreadsRWLock.
acquireWrite();
00117 }
00118
catch (InterruptedException e)
00119 {
00120 String msg =
Translate.get(
00121
"loadbalancer.backendlist.acquire.writelock.failed", e);
00122
logger.error(msg);
00123
throw new SQLException(msg);
00124 }
00125 backendBlockingThreads.add(blockingThread);
00126 backendBlockingThreadsRWLock.
releaseWrite();
00127 blockingThread.start();
00128
logger.info(
Translate.get(
"loadbalancer.backend.workerthread.blocking.add",
00129 db.getName()));
00130
00131
00132
try
00133 {
00134 backendNonBlockingThreadsRWLock.
acquireWrite();
00135 }
00136
catch (InterruptedException e)
00137 {
00138 String msg =
Translate.get(
00139
"loadbalancer.backendlist.acquire.writelock.failed", e);
00140
logger.error(msg);
00141
throw new SQLException(msg);
00142 }
00143 backendNonBlockingThreads.add(nonBlockingThread);
00144 backendNonBlockingThreadsRWLock.
releaseWrite();
00145 nonBlockingThread.start();
00146
logger.info(
Translate.get(
00147
"loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
00148
00149
if (!db.isInitialized())
00150 db.initializeConnections();
00151 db.enableRead();
00152
if (writeEnabled)
00153 db.enableWrite();
00154 }
00155
00167 public synchronized void disableBackend(
DatabaseBackend db)
00168
throws SQLException
00169 {
00170
int nbOfThreads = backendBlockingThreads.size();
00171
00172
00173
for (
int i = 0; i < nbOfThreads; i++)
00174 {
00175
BackendWorkerThread thread = (
BackendWorkerThread) backendBlockingThreads
00176 .get(i);
00177
if (thread.
getBackend().
equals(db))
00178 {
00179
logger.info(
Translate.get(
00180
"loadbalancer.backend.workerthread.blocking.remove", db.getName()));
00181
00182
00183
try
00184 {
00185 backendBlockingThreadsRWLock.
acquireWrite();
00186 }
00187
catch (InterruptedException e)
00188 {
00189 String msg =
Translate.get(
00190
"loadbalancer.backendlist.acquire.writelock.failed", e);
00191
logger.error(msg);
00192
throw new SQLException(msg);
00193 }
00194 backendBlockingThreads.remove(thread);
00195 backendBlockingThreadsRWLock.
releaseWrite();
00196
00197
synchronized (thread)
00198 {
00199
00200 thread.
addPriorityTask(
new KillThreadTask(1, 1));
00201 thread.notify();
00202 }
00203
break;
00204 }
00205 }
00206
00207
00208 nbOfThreads = backendNonBlockingThreads.size();
00209
for (
int i = 0; i < nbOfThreads; i++)
00210 {
00211
BackendWorkerThread thread = (
BackendWorkerThread) backendNonBlockingThreads
00212 .get(i);
00213
if (thread.
getBackend().
equals(db))
00214 {
00215
logger.info(
Translate.get(
00216
"loadbalancer.backend.workerthread.non.blocking.remove", db
00217 .getName()));
00218
00219
00220
try
00221 {
00222 backendNonBlockingThreadsRWLock.
acquireWrite();
00223 }
00224
catch (InterruptedException e)
00225 {
00226 String msg =
Translate.get(
00227
"loadbalancer.backendlist.acquire.writelock.failed", e);
00228
logger.error(msg);
00229
throw new SQLException(msg);
00230 }
00231 backendNonBlockingThreads.remove(thread);
00232 backendNonBlockingThreadsRWLock.
releaseWrite();
00233
00234
synchronized (thread)
00235 {
00236
00237 thread.
addPriorityTask(
new KillThreadTask(1, 1));
00238 thread.notify();
00239 }
00240
break;
00241 }
00242 }
00243
00244 db.disable();
00245
if (db.isInitialized())
00246 db.finalizeConnections();
00247 }
00248
00252 public String
getXmlImpl()
00253 {
00254 StringBuffer info =
new StringBuffer();
00255 info.append(
"<" +
DatabasesXmlTags.ELT_RAIDb_1ec +
" "
00256 +
DatabasesXmlTags.ATT_nbOfConcurrentReads +
"=\""
00257 +
this.nbOfConcurrentReads +
"\">");
00258
this.getRaidb1Xml();
00259
if (waitForCompletionPolicy != null)
00260 info.append(waitForCompletionPolicy.
getXml());
00261 info.append(
"</" +
DatabasesXmlTags.ELT_RAIDb_1ec +
">");
00262
return info.toString();
00263 }
00264 }