00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb1;
00026
00027 import java.sql.SQLException;
00028 import java.util.ArrayList;
00029
00030 import org.objectweb.cjdbc.common.i18n.Translate;
00031 import org.objectweb.cjdbc.common.log.Trace;
00032 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00033 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00034 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00035 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
00036 import org.objectweb.cjdbc.controller.loadbalancer.policies.errorchecking.ErrorCheckingPolicy;
00037 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00038 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050 public abstract class RAIDb1ec extends RAIDb1
00051 {
00052
00053
00054
00055
00056
00057 protected ArrayList backendReadThreads;
00058 protected int nbOfConcurrentReads;
00059 protected ErrorCheckingPolicy errorCheckingPolicy;
00060
00061 protected static Trace logger = Trace
00062 .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb1ec");
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079 public RAIDb1ec(VirtualDatabase vdb,
00080 WaitForCompletionPolicy waitForCompletionPolicy,
00081 ErrorCheckingPolicy errorCheckingPolicy, int nbOfConcurrentReads)
00082 throws Exception
00083 {
00084 super(vdb, waitForCompletionPolicy);
00085 backendReadThreads = new ArrayList();
00086 this.errorCheckingPolicy = errorCheckingPolicy;
00087 this.nbOfConcurrentReads = nbOfConcurrentReads;
00088 }
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106 public void enableBackend(DatabaseBackend db, boolean writeEnabled)
00107 throws SQLException
00108 {
00109
00110 BackendWorkerThread blockingThread = new BackendWorkerThread(db, this);
00111 BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this);
00112
00113
00114 try
00115 {
00116 backendBlockingThreadsRWLock.acquireWrite();
00117 }
00118 catch (InterruptedException e)
00119 {
00120 String msg = Translate.get(
00121 "loadbalancer.backendlist.acquire.writelock.failed", e);
00122 logger.error(msg);
00123 throw new SQLException(msg);
00124 }
00125 backendBlockingThreads.add(blockingThread);
00126 backendBlockingThreadsRWLock.releaseWrite();
00127 blockingThread.start();
00128 logger.info(Translate.get("loadbalancer.backend.workerthread.blocking.add",
00129 db.getName()));
00130
00131
00132 try
00133 {
00134 backendNonBlockingThreadsRWLock.acquireWrite();
00135 }
00136 catch (InterruptedException e)
00137 {
00138 String msg = Translate.get(
00139 "loadbalancer.backendlist.acquire.writelock.failed", e);
00140 logger.error(msg);
00141 throw new SQLException(msg);
00142 }
00143 backendNonBlockingThreads.add(nonBlockingThread);
00144 backendNonBlockingThreadsRWLock.releaseWrite();
00145 nonBlockingThread.start();
00146 logger.info(Translate.get(
00147 "loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
00148
00149 if (!db.isInitialized())
00150 db.initializeConnections();
00151 db.enableRead();
00152 if (writeEnabled)
00153 db.enableWrite();
00154 }
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167 public synchronized void disableBackend(DatabaseBackend db)
00168 throws SQLException
00169 {
00170 int nbOfThreads = backendBlockingThreads.size();
00171
00172
00173 for (int i = 0; i < nbOfThreads; i++)
00174 {
00175 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
00176 .get(i);
00177 if (thread.getBackend().equals(db))
00178 {
00179 logger.info(Translate.get(
00180 "loadbalancer.backend.workerthread.blocking.remove", db.getName()));
00181
00182
00183 try
00184 {
00185 backendBlockingThreadsRWLock.acquireWrite();
00186 }
00187 catch (InterruptedException e)
00188 {
00189 String msg = Translate.get(
00190 "loadbalancer.backendlist.acquire.writelock.failed", e);
00191 logger.error(msg);
00192 throw new SQLException(msg);
00193 }
00194 backendBlockingThreads.remove(thread);
00195 backendBlockingThreadsRWLock.releaseWrite();
00196
00197 synchronized (thread)
00198 {
00199
00200 thread.addPriorityTask(new KillThreadTask(1, 1));
00201 thread.notify();
00202 }
00203 break;
00204 }
00205 }
00206
00207
00208 nbOfThreads = backendNonBlockingThreads.size();
00209 for (int i = 0; i < nbOfThreads; i++)
00210 {
00211 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
00212 .get(i);
00213 if (thread.getBackend().equals(db))
00214 {
00215 logger.info(Translate.get(
00216 "loadbalancer.backend.workerthread.non.blocking.remove", db
00217 .getName()));
00218
00219
00220 try
00221 {
00222 backendNonBlockingThreadsRWLock.acquireWrite();
00223 }
00224 catch (InterruptedException e)
00225 {
00226 String msg = Translate.get(
00227 "loadbalancer.backendlist.acquire.writelock.failed", e);
00228 logger.error(msg);
00229 throw new SQLException(msg);
00230 }
00231 backendNonBlockingThreads.remove(thread);
00232 backendNonBlockingThreadsRWLock.releaseWrite();
00233
00234 synchronized (thread)
00235 {
00236
00237 thread.addPriorityTask(new KillThreadTask(1, 1));
00238 thread.notify();
00239 }
00240 break;
00241 }
00242 }
00243
00244 db.disable();
00245 if (db.isInitialized())
00246 db.finalizeConnections();
00247 }
00248
00249
00250
00251
00252 public String getXmlImpl()
00253 {
00254 StringBuffer info = new StringBuffer();
00255 info.append("<" + DatabasesXmlTags.ELT_RAIDb_1ec + " "
00256 + DatabasesXmlTags.ATT_nbOfConcurrentReads + "=\""
00257 + this.nbOfConcurrentReads + "\">");
00258 this.getRaidb1Xml();
00259 if (waitForCompletionPolicy != null)
00260 info.append(waitForCompletionPolicy.getXml());
00261 info.append("</" + DatabasesXmlTags.ELT_RAIDb_1ec + ">");
00262 return info.toString();
00263 }
00264 }