Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

ControllerWorkerThread.java

00001 /**
00002  * C-JDBC: Clustered JDBC.
00003  * Copyright (C) 2002-2005 French National Institute For Research In Computer
00004  * Science And Control (INRIA).
00005  * Contact: c-jdbc@objectweb.org
00006  * 
00007  * This library is free software; you can redistribute it and/or modify it
00008  * under the terms of the GNU Lesser General Public License as published by the
00009  * Free Software Foundation; either version 2.1 of the License, or any later
00010  * version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but WITHOUT
00013  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00014  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
00015  * for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public License
00018  * along with this library; if not, write to the Free Software Foundation,
00019  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
00020  *
00021  * Initial developer(s): Emmanuel Cecchet. 
00022  * Contributor(s): ______________________________________.
00023  */
00024 
00025 package org.objectweb.cjdbc.controller.core;
00026 
00027 import java.io.IOException;
00028 import java.io.OptionalDataException;
00029 import java.net.Socket;
00030 import java.sql.SQLException;
00031 import java.util.ArrayList;
00032 
00033 import org.objectweb.cjdbc.common.i18n.Translate;
00034 import org.objectweb.cjdbc.common.log.Trace;
00035 import org.objectweb.cjdbc.common.stream.CJDBCInputStream;
00036 import org.objectweb.cjdbc.common.stream.CJDBCOutputStream;
00037 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00038 import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread;
00039 import org.objectweb.cjdbc.driver.protocol.Commands;
00040 
00041 /**
00042  * The <code>ControllerWorkerThread</code> handles a connection with a C-JDBC
00043  * driver. It reads a String containing the virtual database name from the
00044  * driver and sends back the corresponding <code>ConnectionPoint</code>.
00045  * 
00046  * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
00047  * @version 1.0
00048  */
00049 public class ControllerWorkerThread extends Thread
00050 {
00051   private ControllerServerThread serverThread;
00052   private boolean                isKilled = false;
00053 
00054   /** Logger instance. */
00055   static Trace                   logger   = Trace
00056                                               .getLogger("org.objectweb.cjdbc.controller.core.Controller");
00057 
00058   /*
00059    * Constructor
00060    */
00061 
00062   /**
00063    * Creates a new <code>ControllerWorkerThread</code> instance.
00064    * 
00065    * @param serverThread the <code>ControllerServerThread</code> that created
00066    *          us.
00067    */
00068   public ControllerWorkerThread(ControllerServerThread serverThread)
00069   {
00070     super("ControllerWorkerThread");
00071     this.serverThread = serverThread;
00072   }
00073 
00074   /**
00075    * Gets a connection from the connection queue and process it.
00076    */
00077   public void run()
00078   {
00079     Socket clientSocket;
00080 
00081     if (serverThread == null)
00082     {
00083       logger.error(Translate.get("controller.workerthread.null.serverthread"));
00084       isKilled = true;
00085     }
00086     else if (serverThread.controllerServerThreadPendingQueue == null)
00087     {
00088       logger.error(Translate.get("controller.workerthread.null.pendingqueue"));
00089       isKilled = true;
00090     }
00091 
00092     // Main loop
00093     while (!isKilled)
00094     {
00095       if (serverThread.isShuttingDown())
00096         break;
00097       // Get a connection from the pending queue
00098       synchronized (serverThread.controllerServerThreadPendingQueue)
00099       {
00100         while (serverThread.controllerServerThreadPendingQueue.isEmpty())
00101         {
00102           // Nothing to do, let's sleep ...
00103           serverThread.idleWorkerThreads++;
00104           boolean timeout = false;
00105           try
00106           {
00107             long before = System.currentTimeMillis();
00108             serverThread.controllerServerThreadPendingQueue
00109                 .wait(ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME);
00110             long now = System.currentTimeMillis();
00111             // Check if timeout has expired
00112             timeout = now - before >= ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME;
00113           }
00114           catch (InterruptedException ignore)
00115           {
00116           }
00117           serverThread.idleWorkerThreads--;
00118           // We are shutting down
00119           if (serverThread.controllerServerThreadPendingQueue == null)
00120           {
00121             isKilled = true;
00122             break;
00123           }
00124           if (timeout
00125               && serverThread.controllerServerThreadPendingQueue.isEmpty())
00126           {
00127             // Nothing to do, let's die.
00128             isKilled = true;
00129             break;
00130           }
00131         }
00132 
00133         if (isKilled)
00134           break;
00135 
00136         // Get a connection
00137         clientSocket = (Socket) serverThread.controllerServerThreadPendingQueue
00138             .remove(0);
00139       } // synchronized (serverThread.controllerServerThreadPendingQueue)
00140 
00141       if (clientSocket == null)
00142       {
00143         logger.error(Translate.get("controller.workerthread.null.socket"));
00144         continue;
00145       }
00146       else if (logger.isDebugEnabled())
00147         logger.debug(Translate.get("controller.workerthread.connection.from",
00148             new String[]{clientSocket.getInetAddress().toString(),
00149                 String.valueOf(clientSocket.getPort())}));
00150 
00151       try
00152       {
00153         // Disable Nagle algorithm else small messages are not sent
00154         // (at least under Linux) even if we flush the output stream.
00155         clientSocket.setTcpNoDelay(true);
00156 
00157         // Handle connection
00158         CJDBCInputStream in = new CJDBCInputStream(clientSocket);
00159         CJDBCOutputStream out = new CJDBCOutputStream(clientSocket);
00160 
00161         // Check protocol version for driver compatibility
00162         int version = in.readInt();
00163         if (version != Commands.ProtocolVersion)
00164         {
00165           logger.warn(Translate.get(
00166               "controller.workerthread.protocol.incompatible", version));
00167           continue;
00168         }
00169         String virtualDatabaseName = in.readUTF();
00170 
00171         // Read the virtual database name
00172         VirtualDatabase vdb = serverThread.controller
00173             .getVirtualDatabase(virtualDatabaseName);
00174         if (vdb == null)
00175         {
00176           String msg = Translate.get("virtualdatabase.not.found",
00177               virtualDatabaseName);
00178           logger.warn(msg);
00179           continue;
00180         }
00181         if (vdb.isShuttingDown())
00182         {
00183           String msg = Translate.get("virtualdatabase.shutting.down",
00184               virtualDatabaseName);
00185           logger.warn(msg);
00186           continue;
00187         }
00188 
00189         // At this point we have the virtual database the driver wants to
00190         // connect to and we have to give the job to a
00191         // VirtualDatabaseWorkerThread
00192         ArrayList vdbActiveThreads = vdb.getActiveThreads();
00193         ArrayList vdbPendingQueue = vdb.getPendingConnections();
00194 
00195         if (vdbActiveThreads == null)
00196         {
00197           logger.error(Translate
00198               .get("controller.workerthread.null.active.thread"));
00199           isKilled = true;
00200         }
00201         if (vdbPendingQueue == null)
00202         {
00203           logger
00204               .error(Translate.get("controller.workerthread.null.connection"));
00205           isKilled = true;
00206         }
00207 
00208         // Start minimum number of worker threads
00209         boolean tooManyConnections;
00210         synchronized (vdbActiveThreads)
00211         {
00212           while (vdb.getCurrentNbOfThreads() < vdb.getMinNbOfThreads())
00213           {
00214             // Fork a new worker thread
00215             if (logger.isDebugEnabled())
00216               logger.debug(Translate
00217                   .get("controller.workerthread.starting.thread.for.minimum"));
00218             VirtualDatabaseWorkerThread thread = new VirtualDatabaseWorkerThread(
00219                 serverThread.controller, vdb);
00220             vdbActiveThreads.add(thread);
00221             vdb.addCurrentNbOfThread();
00222             thread.start();
00223           }
00224 
00225           // Check if maximum number of concurrent connections has been
00226           // reached
00227           tooManyConnections = (vdb.getMaxNbOfConnections() > 0)
00228               && vdbActiveThreads.size() + vdbPendingQueue.size() > vdb
00229                   .getMaxNbOfConnections();
00230         }
00231         if (tooManyConnections)
00232         {
00233           out.writeObject(new SQLException(Translate
00234               .get("controller.workerthread.too.many.connections")));
00235           out.flush();
00236           continue;
00237         }
00238 
00239         // Put the connection in the queue
00240         synchronized (vdbPendingQueue)
00241         {
00242           vdbPendingQueue.add(in);
00243           vdbPendingQueue.add(out);
00244           // Nullify the socket else it is closed in the finally block
00245           clientSocket = null;
00246           synchronized (vdbActiveThreads)
00247           { // Is a thread available?
00248             if (vdb.getIdleThreads() == 0)
00249             { // No
00250               if ((vdb.getCurrentNbOfThreads() <= vdb.getMaxNbOfThreads())
00251                   || (vdb.getMaxNbOfThreads() == 0))
00252               {
00253                 // Fork a new worker thread
00254                 if (logger.isDebugEnabled())
00255                   logger.debug(Translate
00256                       .get("controller.workerthread.starting.thread"));
00257                 VirtualDatabaseWorkerThread thread = new VirtualDatabaseWorkerThread(
00258                     serverThread.controller, vdb);
00259                 vdbActiveThreads.add(thread);
00260                 vdb.addCurrentNbOfThread();
00261                 thread.start();
00262               }
00263               else if (logger.isInfoEnabled())
00264                 logger.info(Translate.get(
00265                     "controller.workerthread.maximum.thread", vdb
00266                         .getMaxNbOfThreads()));
00267             }
00268             else
00269             {
00270               if (logger.isDebugEnabled())
00271                 logger.debug(Translate
00272                     .get("controller.workerthread.notify.thread"));
00273               // Here we notify all threads else if one thread doesn't wake
00274               // up after the first notify() we will send a second notify()
00275               // and one signal will be lost. So the safe way is to wake up
00276               // everybody and that worker threads go back to sleep if there
00277               // is no job.
00278               vdbPendingQueue.notifyAll();
00279             }
00280           }
00281         }
00282       }
00283       //      }
00284       catch (OptionalDataException e)
00285       {
00286         logger
00287             .error(Translate.get("controller.workerthread.protocol.error", e));
00288       }
00289       catch (IOException e)
00290       {
00291         logger.error(Translate.get("controller.workerthread.io.error", e));
00292       }
00293       finally
00294       {
00295         try
00296         {
00297           if (clientSocket != null)
00298           {
00299             if (logger.isDebugEnabled())
00300               logger.debug(Translate
00301                   .get("controller.workerthread.connection.closing"));
00302             clientSocket.close();
00303           }
00304         }
00305         catch (IOException ignore)
00306         {
00307         }
00308       }
00309     }
00310 
00311     if (logger.isDebugEnabled())
00312       logger.debug(Translate.get("controller.workerthread.terminating"));
00313   }
00314 }

Generated on Mon Apr 11 22:01:30 2005 for C-JDBC by  doxygen 1.3.9.1