00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.core;
00026
00027 import java.io.IOException;
00028 import java.io.OptionalDataException;
00029 import java.net.Socket;
00030 import java.sql.SQLException;
00031 import java.util.ArrayList;
00032
00033 import org.objectweb.cjdbc.common.i18n.Translate;
00034 import org.objectweb.cjdbc.common.log.Trace;
00035 import org.objectweb.cjdbc.common.stream.CJDBCInputStream;
00036 import org.objectweb.cjdbc.common.stream.CJDBCOutputStream;
00037 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00038 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread;
00039 import org.objectweb.cjdbc.driver.protocol.Commands;
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049 public class ControllerWorkerThread extends Thread
00050 {
00051 private ControllerServerThread serverThread;
00052 private boolean isKilled = false;
00053
00054
00055 static Trace logger = Trace
00056 .getLogger("org.objectweb.cjdbc.controller.core.Controller");
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068 public ControllerWorkerThread(ControllerServerThread serverThread)
00069 {
00070 super("ControllerWorkerThread");
00071 this.serverThread = serverThread;
00072 }
00073
00074
00075
00076
00077 public void run()
00078 {
00079 Socket clientSocket;
00080
00081 if (serverThread == null)
00082 {
00083 logger.error(Translate.get("controller.workerthread.null.serverthread"));
00084 isKilled = true;
00085 }
00086 else if (serverThread.controllerServerThreadPendingQueue == null)
00087 {
00088 logger.error(Translate.get("controller.workerthread.null.pendingqueue"));
00089 isKilled = true;
00090 }
00091
00092
00093 while (!isKilled)
00094 {
00095 if (serverThread.isShuttingDown())
00096 break;
00097
00098 synchronized (serverThread.controllerServerThreadPendingQueue)
00099 {
00100 while (serverThread.controllerServerThreadPendingQueue.isEmpty())
00101 {
00102
00103 serverThread.idleWorkerThreads++;
00104 boolean timeout = false;
00105 try
00106 {
00107 long before = System.currentTimeMillis();
00108 serverThread.controllerServerThreadPendingQueue
00109 .wait(ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME);
00110 long now = System.currentTimeMillis();
00111
00112 timeout = now - before >= ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME;
00113 }
00114 catch (InterruptedException ignore)
00115 {
00116 }
00117 serverThread.idleWorkerThreads--;
00118
00119 if (serverThread.controllerServerThreadPendingQueue == null)
00120 {
00121 isKilled = true;
00122 break;
00123 }
00124 if (timeout
00125 && serverThread.controllerServerThreadPendingQueue.isEmpty())
00126 {
00127
00128 isKilled = true;
00129 break;
00130 }
00131 }
00132
00133 if (isKilled)
00134 break;
00135
00136
00137 clientSocket = (Socket) serverThread.controllerServerThreadPendingQueue
00138 .remove(0);
00139 }
00140
00141 if (clientSocket == null)
00142 {
00143 logger.error(Translate.get("controller.workerthread.null.socket"));
00144 continue;
00145 }
00146 else if (logger.isDebugEnabled())
00147 logger.debug(Translate.get("controller.workerthread.connection.from",
00148 new String[]{clientSocket.getInetAddress().toString(),
00149 String.valueOf(clientSocket.getPort())}));
00150
00151 try
00152 {
00153
00154
00155 clientSocket.setTcpNoDelay(true);
00156
00157
00158 CJDBCInputStream in = new CJDBCInputStream(clientSocket);
00159 CJDBCOutputStream out = new CJDBCOutputStream(clientSocket);
00160
00161
00162 int version = in.readInt();
00163 if (version != Commands.ProtocolVersion)
00164 {
00165 logger.warn(Translate.get(
00166 "controller.workerthread.protocol.incompatible", version));
00167 continue;
00168 }
00169 String virtualDatabaseName = in.readUTF();
00170
00171
00172 VirtualDatabase vdb = serverThread.controller
00173 .getVirtualDatabase(virtualDatabaseName);
00174 if (vdb == null)
00175 {
00176 String msg = Translate.get("virtualdatabase.not.found",
00177 virtualDatabaseName);
00178 logger.warn(msg);
00179 continue;
00180 }
00181 if (vdb.isShuttingDown())
00182 {
00183 String msg = Translate.get("virtualdatabase.shutting.down",
00184 virtualDatabaseName);
00185 logger.warn(msg);
00186 continue;
00187 }
00188
00189
00190
00191
00192 ArrayList vdbActiveThreads = vdb.getActiveThreads();
00193 ArrayList vdbPendingQueue = vdb.getPendingConnections();
00194
00195 if (vdbActiveThreads == null)
00196 {
00197 logger.error(Translate
00198 .get("controller.workerthread.null.active.thread"));
00199 isKilled = true;
00200 }
00201 if (vdbPendingQueue == null)
00202 {
00203 logger
00204 .error(Translate.get("controller.workerthread.null.connection"));
00205 isKilled = true;
00206 }
00207
00208
00209 boolean tooManyConnections;
00210 synchronized (vdbActiveThreads)
00211 {
00212 while (vdb.getCurrentNbOfThreads() < vdb.getMinNbOfThreads())
00213 {
00214
00215 if (logger.isDebugEnabled())
00216 logger.debug(Translate
00217 .get("controller.workerthread.starting.thread.for.minimum"));
00218 VirtualDatabaseWorkerThread thread = new VirtualDatabaseWorkerThread(
00219 serverThread.controller, vdb);
00220 vdbActiveThreads.add(thread);
00221 vdb.addCurrentNbOfThread();
00222 thread.start();
00223 }
00224
00225
00226
00227 tooManyConnections = (vdb.getMaxNbOfConnections() > 0)
00228 && vdbActiveThreads.size() + vdbPendingQueue.size() > vdb
00229 .getMaxNbOfConnections();
00230 }
00231 if (tooManyConnections)
00232 {
00233 out.writeObject(new SQLException(Translate
00234 .get("controller.workerthread.too.many.connections")));
00235 out.flush();
00236 continue;
00237 }
00238
00239
00240 synchronized (vdbPendingQueue)
00241 {
00242 vdbPendingQueue.add(in);
00243 vdbPendingQueue.add(out);
00244
00245 clientSocket = null;
00246 synchronized (vdbActiveThreads)
00247 {
00248 if (vdb.getIdleThreads() == 0)
00249 {
00250 if ((vdb.getCurrentNbOfThreads() <= vdb.getMaxNbOfThreads())
00251 || (vdb.getMaxNbOfThreads() == 0))
00252 {
00253
00254 if (logger.isDebugEnabled())
00255 logger.debug(Translate
00256 .get("controller.workerthread.starting.thread"));
00257 VirtualDatabaseWorkerThread thread = new VirtualDatabaseWorkerThread(
00258 serverThread.controller, vdb);
00259 vdbActiveThreads.add(thread);
00260 vdb.addCurrentNbOfThread();
00261 thread.start();
00262 }
00263 else if (logger.isInfoEnabled())
00264 logger.info(Translate.get(
00265 "controller.workerthread.maximum.thread", vdb
00266 .getMaxNbOfThreads()));
00267 }
00268 else
00269 {
00270 if (logger.isDebugEnabled())
00271 logger.debug(Translate
00272 .get("controller.workerthread.notify.thread"));
00273
00274
00275
00276
00277
00278 vdbPendingQueue.notifyAll();
00279 }
00280 }
00281 }
00282 }
00283
00284 catch (OptionalDataException e)
00285 {
00286 logger
00287 .error(Translate.get("controller.workerthread.protocol.error", e));
00288 }
00289 catch (IOException e)
00290 {
00291 logger.error(Translate.get("controller.workerthread.io.error", e));
00292 }
00293 finally
00294 {
00295 try
00296 {
00297 if (clientSocket != null)
00298 {
00299 if (logger.isDebugEnabled())
00300 logger.debug(Translate
00301 .get("controller.workerthread.connection.closing"));
00302 clientSocket.close();
00303 }
00304 }
00305 catch (IOException ignore)
00306 {
00307 }
00308 }
00309 }
00310
00311 if (logger.isDebugEnabled())
00312 logger.debug(Translate.get("controller.workerthread.terminating"));
00313 }
00314 }