Public Member Functions | |
ControllerWorkerThread (ControllerServerThread serverThread) | |
void | run () |
Static Package Attributes | |
Trace | logger |
ControllerWorkerThread
handles a connection with a C-JDBC driver. It reads a String containing the virtual database name from the driver and sends back the corresponding ConnectionPoint
.
Definition at line 49 of file ControllerWorkerThread.java.
|
Creates a new
Definition at line 68 of file ControllerWorkerThread.java. 00069 { 00070 super("ControllerWorkerThread"); 00071 this.serverThread = serverThread; 00072 }
|
|
Gets a connection from the connection queue and process it. Definition at line 77 of file ControllerWorkerThread.java. References org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.addCurrentNbOfThread(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.close(), org.objectweb.cjdbc.controller.core.ControllerServerThread.controller, org.objectweb.cjdbc.controller.core.ControllerServerThread.controllerServerThreadPendingQueue, org.objectweb.cjdbc.common.stream.CJDBCOutputStream.flush(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getActiveThreads(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getCurrentNbOfThreads(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getIdleThreads(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getMaxNbOfConnections(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getMaxNbOfThreads(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getMinNbOfThreads(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getPendingConnections(), org.objectweb.cjdbc.controller.core.Controller.getVirtualDatabase(), org.objectweb.cjdbc.controller.core.ControllerServerThread.idleWorkerThreads, org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.isShuttingDown(), org.objectweb.cjdbc.controller.core.ControllerServerThread.isShuttingDown, org.objectweb.cjdbc.controller.core.ControllerWorkerThread.logger, org.objectweb.cjdbc.common.stream.CJDBCInputStream.readInt(), org.objectweb.cjdbc.common.stream.CJDBCInputStream.readUTF(), and org.objectweb.cjdbc.common.stream.CJDBCOutputStream.writeObject(). 00078 { 00079 Socket clientSocket; 00080 00081 if (serverThread == null) 00082 { 00083 logger.error(Translate.get("controller.workerthread.null.serverthread")); 00084 isKilled = true; 00085 } 00086 else if (serverThread.controllerServerThreadPendingQueue == null) 00087 { 00088 logger.error(Translate.get("controller.workerthread.null.pendingqueue")); 00089 isKilled = true; 00090 } 00091 00092 // Main loop 00093 while (!isKilled) 00094 { 00095 if (serverThread.isShuttingDown()) 00096 break; 00097 // Get a connection from the pending queue 00098 synchronized (serverThread.controllerServerThreadPendingQueue) 00099 { 00100 while (serverThread.controllerServerThreadPendingQueue.isEmpty()) 00101 { 00102 // Nothing to do, let's sleep ... 00103 serverThread.idleWorkerThreads++; 00104 boolean timeout = false; 00105 try 00106 { 00107 long before = System.currentTimeMillis(); 00108 serverThread.controllerServerThreadPendingQueue 00109 .wait(ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME); 00110 long now = System.currentTimeMillis(); 00111 // Check if timeout has expired 00112 timeout = now - before >= ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME; 00113 } 00114 catch (InterruptedException ignore) 00115 { 00116 } 00117 serverThread.idleWorkerThreads--; 00118 // We are shutting down 00119 if (serverThread.controllerServerThreadPendingQueue == null) 00120 { 00121 isKilled = true; 00122 break; 00123 } 00124 if (timeout 00125 && serverThread.controllerServerThreadPendingQueue.isEmpty()) 00126 { 00127 // Nothing to do, let's die. 00128 isKilled = true; 00129 break; 00130 } 00131 } 00132 00133 if (isKilled) 00134 break; 00135 00136 // Get a connection 00137 clientSocket = (Socket) serverThread.controllerServerThreadPendingQueue 00138 .remove(0); 00139 } // synchronized (serverThread.controllerServerThreadPendingQueue) 00140 00141 if (clientSocket == null) 00142 { 00143 logger.error(Translate.get("controller.workerthread.null.socket")); 00144 continue; 00145 } 00146 else if (logger.isDebugEnabled()) 00147 logger.debug(Translate.get("controller.workerthread.connection.from", 00148 new String[]{clientSocket.getInetAddress().toString(), 00149 String.valueOf(clientSocket.getPort())})); 00150 00151 try 00152 { 00153 // Disable Nagle algorithm else small messages are not sent 00154 // (at least under Linux) even if we flush the output stream. 00155 clientSocket.setTcpNoDelay(true); 00156 00157 // Handle connection 00158 CJDBCInputStream in = new CJDBCInputStream(clientSocket); 00159 CJDBCOutputStream out = new CJDBCOutputStream(clientSocket); 00160 00161 // Check protocol version for driver compatibility 00162 int version = in.readInt(); 00163 if (version != Commands.ProtocolVersion) 00164 { 00165 logger.warn(Translate.get( 00166 "controller.workerthread.protocol.incompatible", version)); 00167 continue; 00168 } 00169 String virtualDatabaseName = in.readUTF(); 00170 00171 // Read the virtual database name 00172 VirtualDatabase vdb = serverThread.controller 00173 .getVirtualDatabase(virtualDatabaseName); 00174 if (vdb == null) 00175 { 00176 String msg = Translate.get("virtualdatabase.not.found", 00177 virtualDatabaseName); 00178 logger.warn(msg); 00179 continue; 00180 } 00181 if (vdb.isShuttingDown()) 00182 { 00183 String msg = Translate.get("virtualdatabase.shutting.down", 00184 virtualDatabaseName); 00185 logger.warn(msg); 00186 continue; 00187 } 00188 00189 // At this point we have the virtual database the driver wants to 00190 // connect to and we have to give the job to a 00191 // VirtualDatabaseWorkerThread 00192 ArrayList vdbActiveThreads = vdb.getActiveThreads(); 00193 ArrayList vdbPendingQueue = vdb.getPendingConnections(); 00194 00195 if (vdbActiveThreads == null) 00196 { 00197 logger.error(Translate 00198 .get("controller.workerthread.null.active.thread")); 00199 isKilled = true; 00200 } 00201 if (vdbPendingQueue == null) 00202 { 00203 logger 00204 .error(Translate.get("controller.workerthread.null.connection")); 00205 isKilled = true; 00206 } 00207 00208 // Start minimum number of worker threads 00209 boolean tooManyConnections; 00210 synchronized (vdbActiveThreads) 00211 { 00212 while (vdb.getCurrentNbOfThreads() < vdb.getMinNbOfThreads()) 00213 { 00214 // Fork a new worker thread 00215 if (logger.isDebugEnabled()) 00216 logger.debug(Translate 00217 .get("controller.workerthread.starting.thread.for.minimum")); 00218 VirtualDatabaseWorkerThread thread = new VirtualDatabaseWorkerThread( 00219 serverThread.controller, vdb); 00220 vdbActiveThreads.add(thread); 00221 vdb.addCurrentNbOfThread(); 00222 thread.start(); 00223 } 00224 00225 // Check if maximum number of concurrent connections has been 00226 // reached 00227 tooManyConnections = (vdb.getMaxNbOfConnections() > 0) 00228 && vdbActiveThreads.size() + vdbPendingQueue.size() > vdb 00229 .getMaxNbOfConnections(); 00230 } 00231 if (tooManyConnections) 00232 { 00233 out.writeObject(new SQLException(Translate 00234 .get("controller.workerthread.too.many.connections"))); 00235 out.flush(); 00236 continue; 00237 } 00238 00239 // Put the connection in the queue 00240 synchronized (vdbPendingQueue) 00241 { 00242 vdbPendingQueue.add(in); 00243 vdbPendingQueue.add(out); 00244 // Nullify the socket else it is closed in the finally block 00245 clientSocket = null; 00246 synchronized (vdbActiveThreads) 00247 { // Is a thread available? 00248 if (vdb.getIdleThreads() == 0) 00249 { // No 00250 if ((vdb.getCurrentNbOfThreads() <= vdb.getMaxNbOfThreads()) 00251 || (vdb.getMaxNbOfThreads() == 0)) 00252 { 00253 // Fork a new worker thread 00254 if (logger.isDebugEnabled()) 00255 logger.debug(Translate 00256 .get("controller.workerthread.starting.thread")); 00257 VirtualDatabaseWorkerThread thread = new VirtualDatabaseWorkerThread( 00258 serverThread.controller, vdb); 00259 vdbActiveThreads.add(thread); 00260 vdb.addCurrentNbOfThread(); 00261 thread.start(); 00262 } 00263 else if (logger.isInfoEnabled()) 00264 logger.info(Translate.get( 00265 "controller.workerthread.maximum.thread", vdb 00266 .getMaxNbOfThreads())); 00267 } 00268 else 00269 { 00270 if (logger.isDebugEnabled()) 00271 logger.debug(Translate 00272 .get("controller.workerthread.notify.thread")); 00273 // Here we notify all threads else if one thread doesn't wake 00274 // up after the first notify() we will send a second notify() 00275 // and one signal will be lost. So the safe way is to wake up 00276 // everybody and that worker threads go back to sleep if there 00277 // is no job. 00278 vdbPendingQueue.notifyAll(); 00279 } 00280 } 00281 } 00282 } 00283 // } 00284 catch (OptionalDataException e) 00285 { 00286 logger 00287 .error(Translate.get("controller.workerthread.protocol.error", e)); 00288 } 00289 catch (IOException e) 00290 { 00291 logger.error(Translate.get("controller.workerthread.io.error", e)); 00292 } 00293 finally 00294 { 00295 try 00296 { 00297 if (clientSocket != null) 00298 { 00299 if (logger.isDebugEnabled()) 00300 logger.debug(Translate 00301 .get("controller.workerthread.connection.closing")); 00302 clientSocket.close(); 00303 } 00304 } 00305 catch (IOException ignore) 00306 { 00307 } 00308 } 00309 } 00310 00311 if (logger.isDebugEnabled()) 00312 logger.debug(Translate.get("controller.workerthread.terminating")); 00313 }
|
|
Initial value: Trace
.getLogger("org.objectweb.cjdbc.controller.core.Controller")
Definition at line 55 of file ControllerWorkerThread.java. Referenced by org.objectweb.cjdbc.controller.core.ControllerWorkerThread.run(). |