00001
00025
package org.objectweb.cjdbc.controller.loadbalancer.raidb2;
00026
00027
import java.sql.SQLException;
00028
import java.util.ArrayList;
00029
00030
import org.objectweb.cjdbc.common.i18n.Translate;
00031
import org.objectweb.cjdbc.common.log.Trace;
00032
import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00033
import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00034
import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00035
import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
00036
import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy;
00037
import org.objectweb.cjdbc.controller.loadbalancer.policies.errorchecking.ErrorCheckingPolicy;
00038
import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00039
import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00040
00051 public abstract class RAIDb2ec extends RAIDb2
00052 {
00053
00054
00055
00056
00057
00058 protected ArrayList
backendReadThreads;
00059 protected int nbOfConcurrentReads;
00060 protected ErrorCheckingPolicy
errorCheckingPolicy;
00061
00062 protected static Trace
logger = Trace
00063 .getLogger(
"org.objectweb.cjdbc.controller.loadbalancer.RAIDb2ec");
00064
00065
00066
00067
00068
00082 public RAIDb2ec(
VirtualDatabase vdb,
00083
WaitForCompletionPolicy waitForCompletionPolicy,
00084
CreateTablePolicy createTablePolicy,
00085 ErrorCheckingPolicy errorCheckingPolicy,
int nbOfConcurrentReads)
00086
throws SQLException
00087 {
00088 super(vdb, waitForCompletionPolicy, createTablePolicy);
00089
backendReadThreads =
new ArrayList();
00090
this.errorCheckingPolicy =
errorCheckingPolicy;
00091
this.nbOfConcurrentReads =
nbOfConcurrentReads;
00092 }
00093
00094
00095
00096
00097
00110 public void enableBackend(
DatabaseBackend db,
boolean writeEnabled)
00111
throws SQLException
00112 {
00113
00114
BackendWorkerThread blockingThread =
new BackendWorkerThread(db,
this);
00115 BackendWorkerThread nonBlockingThread =
new BackendWorkerThread(db,
this);
00116
00117
00118
try
00119 {
00120 backendBlockingThreadsRWLock.
acquireWrite();
00121 }
00122
catch (InterruptedException e)
00123 {
00124 String msg =
Translate.get(
00125
"loadbalancer.backendlist.acquire.writelock.failed", e);
00126
logger.error(msg);
00127
throw new SQLException(msg);
00128 }
00129 backendBlockingThreads.add(blockingThread);
00130 backendBlockingThreadsRWLock.
releaseWrite();
00131 blockingThread.start();
00132
logger.info(
Translate.get(
"loadbalancer.backend.workerthread.blocking.add",
00133 db.getName()));
00134
00135
00136
try
00137 {
00138 backendNonBlockingThreadsRWLock.
acquireWrite();
00139 }
00140
catch (InterruptedException e)
00141 {
00142 String msg =
Translate.get(
00143
"loadbalancer.backendlist.acquire.writelock.failed", e);
00144
logger.error(msg);
00145
throw new SQLException(msg);
00146 }
00147 backendNonBlockingThreads.add(nonBlockingThread);
00148 backendNonBlockingThreadsRWLock.
releaseWrite();
00149 nonBlockingThread.start();
00150
logger.info(
Translate.get(
00151
"loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
00152
00153
if (!db.isInitialized())
00154 db.initializeConnections();
00155 db.enableRead();
00156
if (writeEnabled)
00157 db.enableWrite();
00158 }
00159
00171 public synchronized void disableBackend(
DatabaseBackend db)
00172
throws SQLException
00173 {
00174
int nbOfThreads = backendBlockingThreads.size();
00175
00176
00177
for (
int i = 0; i < nbOfThreads; i++)
00178 {
00179
BackendWorkerThread thread = (
BackendWorkerThread) backendBlockingThreads
00180 .get(i);
00181
if (thread.
getBackend().
equals(db))
00182 {
00183
logger.info(
Translate.get(
00184
"loadbalancer.backend.workerthread.blocking.remove", db.getName()));
00185
00186
00187
try
00188 {
00189 backendBlockingThreadsRWLock.
acquireWrite();
00190 }
00191
catch (InterruptedException e)
00192 {
00193 String msg =
Translate.get(
00194
"loadbalancer.backendlist.acquire.writelock.failed", e);
00195
logger.error(msg);
00196
throw new SQLException(msg);
00197 }
00198 backendBlockingThreads.remove(thread);
00199 backendBlockingThreadsRWLock.
releaseWrite();
00200
00201
synchronized (thread)
00202 {
00203
00204 thread.
addPriorityTask(
new KillThreadTask(1, 1));
00205 thread.notify();
00206 }
00207
break;
00208 }
00209 }
00210
00211
00212 nbOfThreads = backendNonBlockingThreads.size();
00213
for (
int i = 0; i < nbOfThreads; i++)
00214 {
00215
BackendWorkerThread thread = (
BackendWorkerThread) backendNonBlockingThreads
00216 .get(i);
00217
if (thread.
getBackend().
equals(db))
00218 {
00219
logger.info(
Translate.get(
00220
"loadbalancer.backend.workerthread.non.blocking.remove", db
00221 .getName()));
00222
00223
00224
try
00225 {
00226 backendNonBlockingThreadsRWLock.
acquireWrite();
00227 }
00228
catch (InterruptedException e)
00229 {
00230 String msg =
Translate.get(
00231
"loadbalancer.backendlist.acquire.writelock.failed", e);
00232
logger.error(msg);
00233
throw new SQLException(msg);
00234 }
00235 backendNonBlockingThreads.remove(thread);
00236 backendNonBlockingThreadsRWLock.
releaseWrite();
00237
00238
synchronized (thread)
00239 {
00240
00241 thread.
addPriorityTask(
new KillThreadTask(1, 1));
00242 thread.notify();
00243 }
00244
break;
00245 }
00246 }
00247
00248 db.disable();
00249
if (db.isInitialized())
00250 db.finalizeConnections();
00251 }
00252
00256 public String
getXmlImpl()
00257 {
00258 StringBuffer info =
new StringBuffer();
00259 info.append(
"<" +
DatabasesXmlTags.ELT_RAIDb_2ec +
" "
00260 +
DatabasesXmlTags.ATT_nbOfConcurrentReads +
"=\""
00261 +
this.nbOfConcurrentReads +
"\">");
00262
this.getRaidb2Xml();
00263
if (waitForCompletionPolicy != null)
00264 info.append(waitForCompletionPolicy.
getXml());
00265 info.append(
"</" +
DatabasesXmlTags.ELT_RAIDb_2ec +
">");
00266
return info.toString();
00267 }
00268 }