00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.loadbalancer.raidb2;
00026
00027 import java.sql.SQLException;
00028 import java.util.ArrayList;
00029
00030 import org.objectweb.cjdbc.common.i18n.Translate;
00031 import org.objectweb.cjdbc.common.log.Trace;
00032 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00033 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00034 import org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread;
00035 import org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy;
00036 import org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy;
00037 import org.objectweb.cjdbc.controller.loadbalancer.policies.errorchecking.ErrorCheckingPolicy;
00038 import org.objectweb.cjdbc.controller.loadbalancer.tasks.KillThreadTask;
00039 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051 public abstract class RAIDb2ec extends RAIDb2
00052 {
00053
00054
00055
00056
00057
00058 protected ArrayList backendReadThreads;
00059 protected int nbOfConcurrentReads;
00060 protected ErrorCheckingPolicy errorCheckingPolicy;
00061
00062 protected static Trace logger = Trace
00063 .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb2ec");
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082 public RAIDb2ec(VirtualDatabase vdb,
00083 WaitForCompletionPolicy waitForCompletionPolicy,
00084 CreateTablePolicy createTablePolicy,
00085 ErrorCheckingPolicy errorCheckingPolicy, int nbOfConcurrentReads)
00086 throws Exception
00087 {
00088 super(vdb, waitForCompletionPolicy, createTablePolicy);
00089 backendReadThreads = new ArrayList();
00090 this.errorCheckingPolicy = errorCheckingPolicy;
00091 this.nbOfConcurrentReads = nbOfConcurrentReads;
00092 }
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110 public void enableBackend(DatabaseBackend db, boolean writeEnabled)
00111 throws SQLException
00112 {
00113
00114 BackendWorkerThread blockingThread = new BackendWorkerThread(db, this);
00115 BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this);
00116
00117
00118 try
00119 {
00120 backendBlockingThreadsRWLock.acquireWrite();
00121 }
00122 catch (InterruptedException e)
00123 {
00124 String msg = Translate.get(
00125 "loadbalancer.backendlist.acquire.writelock.failed", e);
00126 logger.error(msg);
00127 throw new SQLException(msg);
00128 }
00129 backendBlockingThreads.add(blockingThread);
00130 backendBlockingThreadsRWLock.releaseWrite();
00131 blockingThread.start();
00132 logger.info(Translate.get("loadbalancer.backend.workerthread.blocking.add",
00133 db.getName()));
00134
00135
00136 try
00137 {
00138 backendNonBlockingThreadsRWLock.acquireWrite();
00139 }
00140 catch (InterruptedException e)
00141 {
00142 String msg = Translate.get(
00143 "loadbalancer.backendlist.acquire.writelock.failed", e);
00144 logger.error(msg);
00145 throw new SQLException(msg);
00146 }
00147 backendNonBlockingThreads.add(nonBlockingThread);
00148 backendNonBlockingThreadsRWLock.releaseWrite();
00149 nonBlockingThread.start();
00150 logger.info(Translate.get(
00151 "loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
00152
00153 if (!db.isInitialized())
00154 db.initializeConnections();
00155 db.enableRead();
00156 if (writeEnabled)
00157 db.enableWrite();
00158 }
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171 public synchronized void disableBackend(DatabaseBackend db)
00172 throws SQLException
00173 {
00174 int nbOfThreads = backendBlockingThreads.size();
00175
00176
00177 for (int i = 0; i < nbOfThreads; i++)
00178 {
00179 BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
00180 .get(i);
00181 if (thread.getBackend().equals(db))
00182 {
00183 logger.info(Translate.get(
00184 "loadbalancer.backend.workerthread.blocking.remove", db.getName()));
00185
00186
00187 try
00188 {
00189 backendBlockingThreadsRWLock.acquireWrite();
00190 }
00191 catch (InterruptedException e)
00192 {
00193 String msg = Translate.get(
00194 "loadbalancer.backendlist.acquire.writelock.failed", e);
00195 logger.error(msg);
00196 throw new SQLException(msg);
00197 }
00198 backendBlockingThreads.remove(thread);
00199 backendBlockingThreadsRWLock.releaseWrite();
00200
00201 synchronized (thread)
00202 {
00203
00204 thread.addPriorityTask(new KillThreadTask(1, 1));
00205 thread.notify();
00206 }
00207 break;
00208 }
00209 }
00210
00211
00212 nbOfThreads = backendNonBlockingThreads.size();
00213 for (int i = 0; i < nbOfThreads; i++)
00214 {
00215 BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
00216 .get(i);
00217 if (thread.getBackend().equals(db))
00218 {
00219 logger.info(Translate.get(
00220 "loadbalancer.backend.workerthread.non.blocking.remove", db
00221 .getName()));
00222
00223
00224 try
00225 {
00226 backendNonBlockingThreadsRWLock.acquireWrite();
00227 }
00228 catch (InterruptedException e)
00229 {
00230 String msg = Translate.get(
00231 "loadbalancer.backendlist.acquire.writelock.failed", e);
00232 logger.error(msg);
00233 throw new SQLException(msg);
00234 }
00235 backendNonBlockingThreads.remove(thread);
00236 backendNonBlockingThreadsRWLock.releaseWrite();
00237
00238 synchronized (thread)
00239 {
00240
00241 thread.addPriorityTask(new KillThreadTask(1, 1));
00242 thread.notify();
00243 }
00244 break;
00245 }
00246 }
00247
00248 db.disable();
00249 if (db.isInitialized())
00250 db.finalizeConnections();
00251 }
00252
00253
00254
00255
00256 public String getXmlImpl()
00257 {
00258 StringBuffer info = new StringBuffer();
00259 info.append("<" + DatabasesXmlTags.ELT_RAIDb_2ec + " "
00260 + DatabasesXmlTags.ATT_nbOfConcurrentReads + "=\""
00261 + this.nbOfConcurrentReads + "\">");
00262 this.getRaidb2Xml();
00263 if (waitForCompletionPolicy != null)
00264 info.append(waitForCompletionPolicy.getXml());
00265 info.append("</" + DatabasesXmlTags.ELT_RAIDb_2ec + ">");
00266 return info.toString();
00267 }
00268 }