Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

org.objectweb.cjdbc.controller.core.ControllerWorkerThread Class Reference

Collaboration diagram for org.objectweb.cjdbc.controller.core.ControllerWorkerThread:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ControllerWorkerThread (ControllerServerThread serverThread)
void run ()

Static Package Attributes

Trace logger

Detailed Description

The ControllerWorkerThread handles a connection with a C-JDBC driver. It reads a String containing the virtual database name from the driver and sends back the corresponding ConnectionPoint.

Author:
Emmanuel Cecchet
Version:
1.0

Definition at line 49 of file ControllerWorkerThread.java.


Constructor & Destructor Documentation

org.objectweb.cjdbc.controller.core.ControllerWorkerThread.ControllerWorkerThread ControllerServerThread  serverThread  ) 
 

Creates a new ControllerWorkerThread instance.

Parameters:
serverThread the ControllerServerThread that created us.

Definition at line 68 of file ControllerWorkerThread.java.

00069   {
00070     super("ControllerWorkerThread");
00071     this.serverThread = serverThread;
00072   }


Member Function Documentation

void org.objectweb.cjdbc.controller.core.ControllerWorkerThread.run  ) 
 

Gets a connection from the connection queue and process it.

Definition at line 77 of file ControllerWorkerThread.java.

References org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.addCurrentNbOfThread(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.close(), org.objectweb.cjdbc.controller.core.ControllerServerThread.controller, org.objectweb.cjdbc.controller.core.ControllerServerThread.controllerServerThreadPendingQueue, org.objectweb.cjdbc.common.stream.CJDBCOutputStream.flush(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getActiveThreads(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getCurrentNbOfThreads(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getIdleThreads(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getMaxNbOfConnections(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getMaxNbOfThreads(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getMinNbOfThreads(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getPendingConnections(), org.objectweb.cjdbc.controller.core.Controller.getVirtualDatabase(), org.objectweb.cjdbc.controller.core.ControllerServerThread.idleWorkerThreads, org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.isShuttingDown(), org.objectweb.cjdbc.controller.core.ControllerServerThread.isShuttingDown, org.objectweb.cjdbc.controller.core.ControllerWorkerThread.logger, org.objectweb.cjdbc.common.stream.CJDBCInputStream.readInt(), org.objectweb.cjdbc.common.stream.CJDBCInputStream.readUTF(), and org.objectweb.cjdbc.common.stream.CJDBCOutputStream.writeObject().

00078   {
00079     Socket clientSocket;
00080 
00081     if (serverThread == null)
00082     {
00083       logger.error(Translate.get("controller.workerthread.null.serverthread"));
00084       isKilled = true;
00085     }
00086     else if (serverThread.controllerServerThreadPendingQueue == null)
00087     {
00088       logger.error(Translate.get("controller.workerthread.null.pendingqueue"));
00089       isKilled = true;
00090     }
00091 
00092     // Main loop
00093     while (!isKilled)
00094     {
00095       if (serverThread.isShuttingDown())
00096         break;
00097       // Get a connection from the pending queue
00098       synchronized (serverThread.controllerServerThreadPendingQueue)
00099       {
00100         while (serverThread.controllerServerThreadPendingQueue.isEmpty())
00101         {
00102           // Nothing to do, let's sleep ...
00103           serverThread.idleWorkerThreads++;
00104           boolean timeout = false;
00105           try
00106           {
00107             long before = System.currentTimeMillis();
00108             serverThread.controllerServerThreadPendingQueue
00109                 .wait(ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME);
00110             long now = System.currentTimeMillis();
00111             // Check if timeout has expired
00112             timeout = now - before >= ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME;
00113           }
00114           catch (InterruptedException ignore)
00115           {
00116           }
00117           serverThread.idleWorkerThreads--;
00118           // We are shutting down
00119           if (serverThread.controllerServerThreadPendingQueue == null)
00120           {
00121             isKilled = true;
00122             break;
00123           }
00124           if (timeout
00125               && serverThread.controllerServerThreadPendingQueue.isEmpty())
00126           {
00127             // Nothing to do, let's die.
00128             isKilled = true;
00129             break;
00130           }
00131         }
00132 
00133         if (isKilled)
00134           break;
00135 
00136         // Get a connection
00137         clientSocket = (Socket) serverThread.controllerServerThreadPendingQueue
00138             .remove(0);
00139       } // synchronized (serverThread.controllerServerThreadPendingQueue)
00140 
00141       if (clientSocket == null)
00142       {
00143         logger.error(Translate.get("controller.workerthread.null.socket"));
00144         continue;
00145       }
00146       else if (logger.isDebugEnabled())
00147         logger.debug(Translate.get("controller.workerthread.connection.from",
00148             new String[]{clientSocket.getInetAddress().toString(),
00149                 String.valueOf(clientSocket.getPort())}));
00150 
00151       try
00152       {
00153         // Disable Nagle algorithm else small messages are not sent
00154         // (at least under Linux) even if we flush the output stream.
00155         clientSocket.setTcpNoDelay(true);
00156 
00157         // Handle connection
00158         CJDBCInputStream in = new CJDBCInputStream(clientSocket);
00159         CJDBCOutputStream out = new CJDBCOutputStream(clientSocket);
00160 
00161         // Check protocol version for driver compatibility
00162         int version = in.readInt();
00163         if (version != Commands.ProtocolVersion)
00164         {
00165           logger.warn(Translate.get(
00166               "controller.workerthread.protocol.incompatible", version));
00167           continue;
00168         }
00169         String virtualDatabaseName = in.readUTF();
00170 
00171         // Read the virtual database name
00172         VirtualDatabase vdb = serverThread.controller
00173             .getVirtualDatabase(virtualDatabaseName);
00174         if (vdb == null)
00175         {
00176           String msg = Translate.get("virtualdatabase.not.found",
00177               virtualDatabaseName);
00178           logger.warn(msg);
00179           continue;
00180         }
00181         if (vdb.isShuttingDown())
00182         {
00183           String msg = Translate.get("virtualdatabase.shutting.down",
00184               virtualDatabaseName);
00185           logger.warn(msg);
00186           continue;
00187         }
00188 
00189         // At this point we have the virtual database the driver wants to
00190         // connect to and we have to give the job to a
00191         // VirtualDatabaseWorkerThread
00192         ArrayList vdbActiveThreads = vdb.getActiveThreads();
00193         ArrayList vdbPendingQueue = vdb.getPendingConnections();
00194 
00195         if (vdbActiveThreads == null)
00196         {
00197           logger.error(Translate
00198               .get("controller.workerthread.null.active.thread"));
00199           isKilled = true;
00200         }
00201         if (vdbPendingQueue == null)
00202         {
00203           logger
00204               .error(Translate.get("controller.workerthread.null.connection"));
00205           isKilled = true;
00206         }
00207 
00208         // Start minimum number of worker threads
00209         boolean tooManyConnections;
00210         synchronized (vdbActiveThreads)
00211         {
00212           while (vdb.getCurrentNbOfThreads() < vdb.getMinNbOfThreads())
00213           {
00214             // Fork a new worker thread
00215             if (logger.isDebugEnabled())
00216               logger.debug(Translate
00217                   .get("controller.workerthread.starting.thread.for.minimum"));
00218             VirtualDatabaseWorkerThread thread = new VirtualDatabaseWorkerThread(
00219                 serverThread.controller, vdb);
00220             vdbActiveThreads.add(thread);
00221             vdb.addCurrentNbOfThread();
00222             thread.start();
00223           }
00224 
00225           // Check if maximum number of concurrent connections has been
00226           // reached
00227           tooManyConnections = (vdb.getMaxNbOfConnections() > 0)
00228               && vdbActiveThreads.size() + vdbPendingQueue.size() > vdb
00229                   .getMaxNbOfConnections();
00230         }
00231         if (tooManyConnections)
00232         {
00233           out.writeObject(new SQLException(Translate
00234               .get("controller.workerthread.too.many.connections")));
00235           out.flush();
00236           continue;
00237         }
00238 
00239         // Put the connection in the queue
00240         synchronized (vdbPendingQueue)
00241         {
00242           vdbPendingQueue.add(in);
00243           vdbPendingQueue.add(out);
00244           // Nullify the socket else it is closed in the finally block
00245           clientSocket = null;
00246           synchronized (vdbActiveThreads)
00247           { // Is a thread available?
00248             if (vdb.getIdleThreads() == 0)
00249             { // No
00250               if ((vdb.getCurrentNbOfThreads() <= vdb.getMaxNbOfThreads())
00251                   || (vdb.getMaxNbOfThreads() == 0))
00252               {
00253                 // Fork a new worker thread
00254                 if (logger.isDebugEnabled())
00255                   logger.debug(Translate
00256                       .get("controller.workerthread.starting.thread"));
00257                 VirtualDatabaseWorkerThread thread = new VirtualDatabaseWorkerThread(
00258                     serverThread.controller, vdb);
00259                 vdbActiveThreads.add(thread);
00260                 vdb.addCurrentNbOfThread();
00261                 thread.start();
00262               }
00263               else if (logger.isInfoEnabled())
00264                 logger.info(Translate.get(
00265                     "controller.workerthread.maximum.thread", vdb
00266                         .getMaxNbOfThreads()));
00267             }
00268             else
00269             {
00270               if (logger.isDebugEnabled())
00271                 logger.debug(Translate
00272                     .get("controller.workerthread.notify.thread"));
00273               // Here we notify all threads else if one thread doesn't wake
00274               // up after the first notify() we will send a second notify()
00275               // and one signal will be lost. So the safe way is to wake up
00276               // everybody and that worker threads go back to sleep if there
00277               // is no job.
00278               vdbPendingQueue.notifyAll();
00279             }
00280           }
00281         }
00282       }
00283       //      }
00284       catch (OptionalDataException e)
00285       {
00286         logger
00287             .error(Translate.get("controller.workerthread.protocol.error", e));
00288       }
00289       catch (IOException e)
00290       {
00291         logger.error(Translate.get("controller.workerthread.io.error", e));
00292       }
00293       finally
00294       {
00295         try
00296         {
00297           if (clientSocket != null)
00298           {
00299             if (logger.isDebugEnabled())
00300               logger.debug(Translate
00301                   .get("controller.workerthread.connection.closing"));
00302             clientSocket.close();
00303           }
00304         }
00305         catch (IOException ignore)
00306         {
00307         }
00308       }
00309     }
00310 
00311     if (logger.isDebugEnabled())
00312       logger.debug(Translate.get("controller.workerthread.terminating"));
00313   }


Member Data Documentation

Trace org.objectweb.cjdbc.controller.core.ControllerWorkerThread.logger [static, package]
 

Initial value:

 Trace
                                              .getLogger("org.objectweb.cjdbc.controller.core.Controller")
Logger instance.

Definition at line 55 of file ControllerWorkerThread.java.

Referenced by org.objectweb.cjdbc.controller.core.ControllerWorkerThread.run().


The documentation for this class was generated from the following file:
Generated on Mon Apr 11 22:03:44 2005 for C-JDBC by  doxygen 1.3.9.1