Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

VirtualDatabaseWorkerThread.java

00001 /**
00002  * C-JDBC: Clustered JDBC.
00003  * Copyright (C) 2002-2005 French National Institute For Research In Computer
00004  * Science And Control (INRIA).
00005  * Contact: c-jdbc@objectweb.org
00006  * 
00007  * This library is free software; you can redistribute it and/or modify it
00008  * under the terms of the GNU Lesser General Public License as published by the
00009  * Free Software Foundation; either version 2.1 of the License, or any later
00010  * version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but WITHOUT
00013  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00014  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
00015  * for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public License
00018  * along with this library; if not, write to the Free Software Foundation,
00019  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
00020  *
00021  * Initial developer(s): Emmanuel Cecchet.
00022  * Contributor(s): Nicolas Modrzyk.
00023  */
00024 
00025 package org.objectweb.cjdbc.controller.virtualdatabase;
00026 
00027 import java.io.EOFException;
00028 import java.io.IOException;
00029 import java.io.OptionalDataException;
00030 import java.net.SocketException;
00031 import java.rmi.RemoteException;
00032 import java.sql.SQLException;
00033 import java.util.ArrayList;
00034 import java.util.HashMap;
00035 import java.util.Iterator;
00036 
00037 import org.objectweb.cjdbc.common.log.Trace;
00038 import org.objectweb.cjdbc.common.sql.AbstractRequest;
00039 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00040 import org.objectweb.cjdbc.common.sql.AlterRequest;
00041 import org.objectweb.cjdbc.common.sql.CreateRequest;
00042 import org.objectweb.cjdbc.common.sql.DeleteRequest;
00043 import org.objectweb.cjdbc.common.sql.DropRequest;
00044 import org.objectweb.cjdbc.common.sql.InsertRequest;
00045 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00046 import org.objectweb.cjdbc.common.sql.SelectRequest;
00047 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00048 import org.objectweb.cjdbc.common.sql.UpdateRequest;
00049 import org.objectweb.cjdbc.common.sql.metadata.MetadataContainer;
00050 import org.objectweb.cjdbc.common.stream.CJDBCInputStream;
00051 import org.objectweb.cjdbc.common.stream.CJDBCOutputStream;
00052 import org.objectweb.cjdbc.common.users.VirtualDatabaseUser;
00053 import org.objectweb.cjdbc.common.util.Constants;
00054 import org.objectweb.cjdbc.controller.core.Controller;
00055 import org.objectweb.cjdbc.driver.protocol.CommandCompleted;
00056 import org.objectweb.cjdbc.driver.protocol.Commands;
00057 
00058 /**
00059  * The <code>VirtualDatabaseWorkerThread</code> handles a connection with a
00060  * C-JDBC driver.
00061  * 
00062  * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
00063  * @author <a href="mailto:Nicolas.Modrzyk@inria.fr">Nicolas Modrzyk </a>
00064  * @version 1.0
00065  */
00066 public class VirtualDatabaseWorkerThread extends Thread
00067 {
00068   //
00069   // How the code is organized?
00070   //
00071   // 1. Member variables
00072   // 2. Constructor(s)
00073   // 3. Request management
00074   // 4. Getter/Setters
00075 
00076   /** <code>true</code> ifthis has been killed. */
00077   private boolean                isKilled     = false;
00078 
00079   /** Needed for parsing */
00080   boolean                        needSkeleton = false;
00081 
00082   /** Virtual database instantiating this thread. */
00083   private VirtualDatabase        vdb;
00084 
00085   private final CommandCompleted completed    = new CommandCompleted();
00086 
00087   /** Logger instance. */
00088   private Trace                  logger       = null;
00089 
00090   private CJDBCInputStream       in           = null;
00091   private CJDBCOutputStream      out          = null;
00092 
00093   private VirtualDatabaseUser    user;
00094 
00095   private Controller             controller;
00096 
00097   private boolean                waitForCommand;
00098 
00099   private HashMap                streamedResultSet;
00100 
00101   /**
00102    * The following variables represent the state of the connection with the
00103    * client
00104    */
00105   private long                   currentTid;
00106   private boolean                transactionStarted;
00107   private boolean                queryExecutedInThisTransaction;
00108   private String                 login;
00109   private boolean                closed;
00110 
00111   /*
00112    * Constructor
00113    */
00114 
00115   /**
00116    * Creates a new <code>VirtualDatabaseWorkerThread</code> instance.
00117    * 
00118    * @param controller the thread was originated from
00119    * @param vdb the virtual database instantiating this thread.
00120    */
00121   public VirtualDatabaseWorkerThread(Controller controller, VirtualDatabase vdb)
00122   {
00123     super("VirtualDatabaseWorkerThread-" + vdb.getVirtualDatabaseName());
00124     this.vdb = vdb;
00125     this.controller = controller;
00126     try
00127     {
00128       this.logger = Trace
00129           .getLogger("org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread."
00130               + vdb.getVirtualDatabaseName());
00131     }
00132     catch (Exception e)
00133     {
00134       this.logger = vdb.logger;
00135     }
00136   }
00137 
00138   //
00139   // Decoding commands from the stream
00140   //
00141 
00142   /**
00143    * Read a StoredProcedure from the stream.
00144    * 
00145    * @param isRead true if we expect a read stored procedure
00146    * @return the corresponding <code>StoredProcedure</code> object.
00147    * @throws OptionalDataException if an error occurs
00148    * @throws IOException if an error occurs
00149    */
00150   private StoredProcedure decodeProcedureFromStream(boolean isRead)
00151       throws OptionalDataException, IOException
00152   {
00153     String sql = in.readUTF();
00154     boolean escape = in.readBoolean();
00155     String lineSeparator = in.readUTF();
00156     int timeout = in.readInt();
00157     StoredProcedure proc = new StoredProcedure(sql, escape, timeout,
00158         lineSeparator);
00159     proc.setIsAutoCommit(in.readBoolean());
00160     proc.setDriverProcessed(in.readBoolean());
00161     if (isRead)
00162     {
00163       proc.setMaxRows(in.readInt());
00164       proc.setFetchSize(in.readInt());
00165     }
00166     // Does the query has a skeleton ?
00167     if (needSkeleton || !proc.isDriverProcessed())
00168       if (in.readBoolean()) // is there a non null skeleton
00169         proc.setSqlSkeleton(in.readUTF());
00170     return proc;
00171   }
00172 
00173   /**
00174    * Read a SelectRequest from the stream.
00175    * 
00176    * @return the corresponding <code>SelectRequest</code> object.
00177    * @throws OptionalDataException if an error occurs
00178    * @throws IOException if an error occurs
00179    */
00180   private SelectRequest decodeReadRequestFromStream()
00181       throws OptionalDataException, IOException
00182   {
00183     String sql = in.readUTF();
00184     boolean escape = in.readBoolean();
00185     String lineSeparator = in.readUTF();
00186     int timeout = in.readInt();
00187     SelectRequest select = new SelectRequest(sql, escape, timeout,
00188         lineSeparator);
00189     select.setIsAutoCommit(in.readBoolean());
00190     select.setDriverProcessed(in.readBoolean());
00191     select.setMaxRows(in.readInt());
00192     select.setFetchSize(in.readInt());
00193     if (in.readBoolean()) // do we have a cursor name ?
00194       select.setCursorName(in.readUTF());
00195     // Does the query has a skeleton ?
00196     if (needSkeleton || !select.isDriverProcessed())
00197       if (in.readBoolean()) // is there a non null skeleton
00198         select.setSqlSkeleton(in.readUTF());
00199     return select;
00200   }
00201 
00202   /**
00203    * Read a write request send by the <code>Connection</code> object. Only the
00204    * needed parameters are sent, so we can reconstruct the object on the
00205    * controller side, without serializing everything.
00206    * 
00207    * @param withKeys true if we expect a write to return keus (as a ResultSet)
00208    * @return an instace of the <code>AbstractWriteRequest</code>
00209    */
00210   private AbstractWriteRequest decodeWriteRequestFromStream(boolean withKeys)
00211       throws OptionalDataException, IOException
00212   {
00213     AbstractWriteRequest writeRequest;
00214     int requestType = in.readInt();
00215     String sql = in.readUTF();
00216     boolean escape = in.readBoolean();
00217     String lineSeparator = in.readUTF();
00218     int timeout = in.readInt();
00219     switch (requestType)
00220     {
00221       case Commands.CreateRequest :
00222         writeRequest = new CreateRequest(sql, escape, timeout, lineSeparator);
00223         break;
00224       case Commands.AlterRequest :
00225         writeRequest = new AlterRequest(sql, escape, timeout, lineSeparator);
00226         break;
00227       case Commands.DeleteRequest :
00228         writeRequest = new DeleteRequest(sql, escape, timeout, lineSeparator);
00229         break;
00230       case Commands.DropRequest :
00231         writeRequest = new DropRequest(sql, escape, timeout, lineSeparator);
00232         break;
00233       case Commands.InsertRequest :
00234         writeRequest = new InsertRequest(sql, escape, timeout, lineSeparator);
00235         break;
00236       case Commands.UpdateRequest :
00237         writeRequest = new UpdateRequest(sql, escape, timeout, lineSeparator);
00238         break;
00239       default :
00240         throw new IOException("Invalid Write Query Type");
00241     }
00242 
00243     writeRequest.setIsAutoCommit(in.readBoolean());
00244     writeRequest.setDriverProcessed(in.readBoolean());
00245     if (withKeys)
00246     {
00247       writeRequest.setMaxRows(in.readInt());
00248       writeRequest.setFetchSize(in.readInt());
00249     }
00250 
00251     // Does the query has a skeleton ?
00252     if (needSkeleton || !writeRequest.isDriverProcessed())
00253       if (in.readBoolean())
00254         writeRequest.setSqlSkeleton(in.readUTF());
00255 
00256     return writeRequest;
00257   }
00258 
00259   /**
00260    * Set the login and transaction id on the given request. If the request is
00261    * autocommit and a transaction was started, the transaction is first commited
00262    * to return in autocommit mode.
00263    * 
00264    * @param request The request to set
00265    * @param login user login to set
00266    * @param tid the transaction id to set
00267    * @return new value of transaction started
00268    */
00269   private boolean setRequestParameters(AbstractRequest request, String login,
00270       long tid, boolean transactionStarted) throws SQLException
00271   {
00272     request.setLogin(login);
00273     if (request.isAutoCommit() && transactionStarted)
00274     {
00275       vdb.commit(tid);
00276       return false;
00277     }
00278     else
00279       request.setTransactionId(tid);
00280     return transactionStarted;
00281   }
00282 
00283   /**
00284    * Gets a connection from the connection queue and process it.
00285    */
00286   public void run()
00287   {
00288     ArrayList vdbActiveThreads = vdb.getActiveThreads();
00289     ArrayList vdbPendingQueue = vdb.getPendingConnections();
00290     // List of open ResultSets for streaming. This is not synchronized since the
00291     // connection does only handle one request at a time
00292     streamedResultSet = new HashMap();
00293     boolean isActive = true;
00294 
00295     if (vdbActiveThreads == null)
00296     {
00297       logger
00298           .error("Got null active threads queue in VirtualDatabaseWorkerThread");
00299       isKilled = true;
00300     }
00301     if (vdbPendingQueue == null)
00302     {
00303       logger.error("Got null connection queue in VirtualDatabaseWorkerThread");
00304       isKilled = true;
00305     }
00306 
00307     // Main loop
00308     while (!isKilled)
00309     {
00310       // Get a connection from the pending queue
00311       synchronized (vdbPendingQueue)
00312       {
00313         while (vdbPendingQueue.isEmpty())
00314         {
00315           if (!vdb.poolConnectionThreads)
00316           { // User does not want thread pooling, kill this thread!
00317             isKilled = true;
00318             break;
00319           }
00320           boolean timeout = false;
00321           try
00322           {
00323             if (isActive)
00324             {
00325               isActive = false;
00326               // Remove ourselves from the active thread list
00327               synchronized (vdbActiveThreads)
00328               {
00329                 vdbActiveThreads.remove(this);
00330                 vdb.addIdleThread();
00331               }
00332             }
00333             long before = System.currentTimeMillis();
00334             vdbPendingQueue.wait(vdb.getMaxThreadIdleTime());
00335             long now = System.currentTimeMillis();
00336             // Check if timeout has expired
00337             timeout = now - before >= vdb.getMaxThreadIdleTime();
00338           }
00339           catch (InterruptedException e)
00340           {
00341             logger.warn("VirtualDatabaseWorkerThread wait() interrupted");
00342           }
00343           if (timeout && vdbPendingQueue.isEmpty())
00344           {
00345             if (vdb.currentNbOfThreads > vdb.minNbOfThreads)
00346             { // We have enough threads, kill this one
00347               isKilled = true;
00348               break;
00349             }
00350           }
00351         }
00352 
00353         if (isKilled)
00354         { // Cleaning up
00355           synchronized (vdbActiveThreads)
00356           { // Remove ourselves from the appropriate thread list
00357             if (isActive)
00358             {
00359               vdbActiveThreads.remove(this);
00360               vdb.removeCurrentNbOfThread();
00361             }
00362             else
00363               vdb.removeIdleThread();
00364           }
00365           // Get out of the while loop
00366           continue;
00367         }
00368 
00369         // Get a connection
00370         try
00371         {
00372           in = (CJDBCInputStream) vdbPendingQueue.remove(0);
00373           out = (CJDBCOutputStream) vdbPendingQueue.remove(0);
00374         }
00375         catch (Exception e)
00376         {
00377           logger.error("Error while getting streams from connection");
00378           continue;
00379         }
00380 
00381         synchronized (vdbActiveThreads)
00382         {
00383           if (!isActive)
00384           {
00385             vdb.removeIdleThread();
00386             isActive = true;
00387             // Add this thread to the active thread list
00388             vdbActiveThreads.add(this);
00389           }
00390         }
00391       }
00392 
00393       // Handle connection
00394       // Read the user information and check authentication
00395       boolean success = false;
00396       try
00397       {
00398         login = in.readUTF();
00399         String password = in.readUTF();
00400         user = new VirtualDatabaseUser(login, password);
00401 
00402         if (vdb.getAuthenticationManager().isValidVirtualUser(user))
00403         { // Authentication ok, send if SQL skeleton is needed
00404           needSkeleton = vdb.getRequestManager()
00405               .getRequiredParsingGranularity() != ParsingGranularities.NO_PARSING;
00406           out.writeObject(needSkeleton ? Boolean.TRUE : Boolean.FALSE);
00407           out.writeUTF(vdb.getBlobFilter().getXml());
00408           out.flush();
00409           success = true;
00410 
00411           if (logger.isDebugEnabled())
00412             logger.debug("Login accepted for " + login);
00413         }
00414         else
00415         { // Authentication failed, close the connection
00416           String msg = "Authentication failed for user '" + login + "'";
00417           out.writeObject(new SQLException(msg));
00418           if (logger.isDebugEnabled())
00419             logger.debug(msg);
00420           continue;
00421         }
00422       }
00423       catch (OptionalDataException e)
00424       {
00425         logger.error("Protocol error while expecting user authentication (" + e
00426             + ")");
00427         continue;
00428       }
00429       catch (IOException e)
00430       {
00431         logger.error("I/O error during user authentication (" + e + ")");
00432         continue;
00433       }
00434       finally
00435       {
00436         if (!success)
00437         {
00438           try
00439           {
00440             out.close();
00441             in.close();
00442           }
00443           catch (IOException ignore)
00444           {
00445           }
00446         }
00447       }
00448 
00449       currentTid = 0;
00450       transactionStarted = false;
00451       queryExecutedInThisTransaction = false;
00452       closed = false;
00453       int command;
00454       while (!closed && !isKilled)
00455       {
00456         try
00457         {
00458           // Get the query
00459           waitForCommand = true;
00460           command = in.readInt();
00461           waitForCommand = false;
00462 
00463           // Process it
00464           switch (command)
00465           {
00466             case Commands.ExecReadRequest :
00467               execReadRequest();
00468               break;
00469             case Commands.ExecWriteRequest :
00470               execWriteRequest();
00471               break;
00472             case Commands.ExecWriteRequestWithKeys :
00473               execWriteRequestWithKeys();
00474               break;
00475             case Commands.ExecReadStoredProcedure :
00476               execReadStoredProcedure();
00477               break;
00478             case Commands.ExecWriteStoredProcedure :
00479               execWriteStoredProcedure();
00480               break;
00481             case Commands.Begin :
00482               begin();
00483               break;
00484             case Commands.Commit :
00485               commit();
00486               break;
00487             case Commands.SetAutoCommit :
00488               setAutoCommit();
00489               break;
00490             case Commands.Rollback :
00491               rollback();
00492               break;
00493             case Commands.GetVirtualDatabaseName :
00494               getVirtualDatabaseName();
00495               break;
00496             case Commands.DatabaseMetaDataGetDatabaseProductName :
00497               databaseMetaDataGetDatabaseProductName();
00498               break;
00499             case Commands.GetControllerVersionNumber :
00500               getControllerVersionNumber();
00501               break;
00502             case Commands.DatabaseMetaDataGetTables :
00503               databaseMetaDataGetTables();
00504               break;
00505             case Commands.DatabaseMetaDataGetColumns :
00506               databaseMetaDataGetColumns();
00507               break;
00508             case Commands.DatabaseMetaDataGetPrimaryKeys :
00509               databaseMetaDataGetPrimaryKeys();
00510               break;
00511             case Commands.DatabaseMetaDataGetProcedures :
00512               databaseMetaDataGetProcedures();
00513               break;
00514             case Commands.DatabaseMetaDataGetProcedureColumns :
00515               databaseMetaDataGetProcedureColumns();
00516               break;
00517             case Commands.ConnectionGetCatalogs :
00518               connectionGetCatalogs();
00519               break;
00520             case Commands.ConnectionGetCatalog :
00521               connectionGetCatalog();
00522               break;
00523             case Commands.DatabaseMetaDataGetTableTypes :
00524               databaseMetaDataGetTableTypes();
00525               break;
00526             case Commands.DatabaseMetaDataGetSchemas :
00527               databaseMetaDataGetSchemas();
00528               break;
00529             case Commands.DatabaseMetaDataGetTablePrivileges :
00530               databaseMetaDataGetTablePrivileges();
00531               break;
00532             case Commands.ConnectionSetCatalog :
00533               connectionSetCatalog();
00534               break;
00535             case Commands.Close :
00536               close();
00537               break;
00538             case Commands.Reset :
00539               reset();
00540               break;
00541             case Commands.FetchNextResultSetRows :
00542               fetchNextResultSetRows();
00543               break;
00544             case Commands.CloseRemoteResultSet :
00545               closeRemoteResultSet();
00546               break;
00547             case Commands.DatabaseStaticMetadata :
00548               databaseStaticMetadata();
00549               break;
00550             case Commands.RestoreConnectionState :
00551               restoreConnectionState();
00552               break;
00553             default :
00554               String errorMsg = "Unsupported protocol command: " + command;
00555               logger.error(errorMsg);
00556               out.writeObject(new RuntimeException(errorMsg));
00557               out.flush();
00558               break;
00559           }
00560         }
00561         catch (OptionalDataException e)
00562         {
00563           logger.warn("Protocol error (" + e + ")");
00564           try
00565           {
00566             out.writeObject(e);
00567             out.flush();
00568           }
00569           catch (IOException ignore)
00570           {
00571           }
00572         }
00573         catch (RemoteException e)
00574         {
00575           logger.warn("Error during command execution (" + e + ")");
00576           try
00577           {
00578             out.writeObject(e);
00579             out.flush();
00580           }
00581           catch (IOException ignore)
00582           {
00583           }
00584         }
00585         catch (EOFException e)
00586         {
00587           logger.warn("Client (login:" + login + ",host:"
00588               + in.getSocket().getInetAddress().getHostName()
00589               + " closed connection with server");
00590           closed = true;
00591         }
00592         catch (SocketException e)
00593         {
00594           // shutting down
00595           closed = true;
00596         }
00597         catch (IOException e)
00598         {
00599           closed = true;
00600           logger.warn("Closing connection with client " + login
00601               + " because of IOException.(" + e + ")");
00602         }
00603         catch (ClassNotFoundException e)
00604         {
00605           logger.error("Protocol error (" + e + ")");
00606           try
00607           {
00608             out.writeObject(e);
00609             out.flush();
00610           }
00611           catch (IOException ignore)
00612           {
00613           }
00614         }
00615         catch (SQLException e)
00616         {
00617           logger
00618               .warn("Error during command execution (" + e.getMessage() + ")");
00619           try
00620           {
00621             out.writeObject(e);
00622             out.flush();
00623           }
00624           catch (IOException ignore)
00625           {
00626           }
00627         }
00628         catch (RuntimeException e)
00629         {
00630           logger.warn("Runtime error during command execution ("
00631               + e.getMessage() + ")", e);
00632           try
00633           {
00634             out.writeObject(new SQLException(e.getMessage()));
00635             out.flush();
00636           }
00637           catch (IOException ignore)
00638           {
00639           }
00640         }
00641       }
00642 
00643       // Do the cleanup
00644       if (transactionStarted)
00645       {
00646         if (logger.isDebugEnabled())
00647           logger.debug("Forcing transaction " + currentTid + " rollback");
00648         try
00649         {
00650           vdb.rollback(currentTid);
00651         }
00652         catch (Exception e)
00653         {
00654           if (logger.isDebugEnabled())
00655             logger.debug("Error during rollback of transaction " + currentTid
00656                 + "(" + e + ")");
00657         }
00658       }
00659       if (!streamedResultSet.isEmpty())
00660       {
00661         for (Iterator iter = streamedResultSet.values().iterator(); iter
00662             .hasNext();)
00663         {
00664           ControllerResultSet crs = (ControllerResultSet) iter.next();
00665           crs.closeResultSet();
00666         }
00667         streamedResultSet.clear();
00668       }
00669       try
00670       {
00671         in.close();
00672       }
00673       catch (IOException ignore)
00674       {
00675       }
00676       try
00677       {
00678         out.close();
00679       }
00680       catch (IOException ignore)
00681       {
00682       }
00683     }
00684 
00685     if (logger.isDebugEnabled())
00686       logger.debug("VirtualDatabaseWorkerThread associated to login: "
00687           + this.getUser() + " terminating.");
00688   }
00689 
00690   //
00691   // Connection management
00692   //
00693 
00694   private void close() throws IOException
00695   {
00696     if (logger.isDebugEnabled())
00697       logger.debug("Close command");
00698     out.writeObject(completed);
00699     out.flush();
00700     closed = true;
00701   }
00702 
00703   private void closeRemoteResultSet() throws IOException
00704   {
00705     if (logger.isDebugEnabled())
00706       logger.debug("CloseRemoteResultSet command");
00707 
00708     String cursor = in.readUTF();
00709     ControllerResultSet crsToClose = (ControllerResultSet) streamedResultSet
00710         .remove(cursor);
00711     if (crsToClose == null)
00712     {
00713       out
00714           .writeObject(new SQLException(
00715               "No valid ControllerResultSet to close."));
00716       out.flush();
00717     }
00718     else
00719     {
00720       crsToClose.closeResultSet();
00721       out.writeObject(completed);
00722       out.flush();
00723     }
00724   }
00725 
00726   private void reset()
00727   {
00728     // The client application has closed the connection but it is kept
00729     // open in case the transparent connection pooling reuses it.
00730     if (logger.isDebugEnabled())
00731       logger.debug("Reset command");
00732 
00733     // Do the cleanup
00734     if (transactionStarted)
00735     {
00736       if (queryExecutedInThisTransaction)
00737       { // Force rollback of this transaction
00738         if (logger.isDebugEnabled())
00739           logger.debug("Forcing transaction " + currentTid + " rollback");
00740         try
00741         {
00742           vdb.rollback(currentTid);
00743         }
00744         catch (Exception e)
00745         {
00746           if (logger.isDebugEnabled())
00747             logger.debug("Error during rollback of transaction " + currentTid
00748                 + "(" + e + ")");
00749         }
00750       }
00751       else
00752       { // We need to abort the begin to cleanup the metadata
00753         // associated with the started transaction.
00754         if (logger.isDebugEnabled())
00755           logger.debug("Aborting transaction " + currentTid);
00756         try
00757         {
00758           vdb.abort(currentTid);
00759         }
00760         catch (Exception e)
00761         {
00762           if (logger.isDebugEnabled())
00763             logger.debug("Error while aborting transaction " + currentTid + "("
00764                 + e + ")", e);
00765         }
00766       }
00767       currentTid = 0;
00768       transactionStarted = false;
00769     }
00770   }
00771 
00772   private void restoreConnectionState() throws IOException
00773   {
00774     if (logger.isDebugEnabled())
00775       logger.debug("RestoreConnectionState command");
00776     // We receive autocommit from driver
00777     transactionStarted = !in.readBoolean();
00778     if (transactionStarted)
00779       currentTid = in.readLong();
00780   }
00781 
00782   //
00783   // Catalog
00784   //
00785 
00786   private void connectionSetCatalog() throws IOException
00787   {
00788     if (logger.isDebugEnabled())
00789       logger.debug("ConnectionSetCatalog command");
00790     String catalog = in.readUTF();
00791     boolean change = controller.hasVirtualDatabase(catalog);
00792     if (change)
00793     {
00794       VirtualDatabase tempvdb = controller.getVirtualDatabase(catalog);
00795       if (!tempvdb.getAuthenticationManager().isValidVirtualUser(user))
00796         out.writeObject(new SQLException(
00797             "User authentication has failed for asked catalog. No change"));
00798       else
00799       {
00800         this.vdb = tempvdb;
00801         out.writeObject(Boolean.TRUE);
00802       }
00803     }
00804     else
00805       out.writeObject(Boolean.FALSE);
00806     out.flush();
00807   }
00808 
00809   private void connectionGetCatalog() throws IOException
00810   {
00811     if (logger.isDebugEnabled())
00812       logger.debug("ConnectionGetCatalog command");
00813     out.writeObject(vdb.getVirtualDatabaseName());
00814     out.flush();
00815   }
00816 
00817   private void connectionGetCatalogs() throws IOException
00818   {
00819     if (logger.isDebugEnabled())
00820       logger.debug("ConnectionGetCatalogs command");
00821     ArrayList list = controller.getVirtualDatabaseNames();
00822     out.writeObject(vdb.getDynamicMetaData().getCatalogs(list));
00823     out.flush();
00824   }
00825 
00826   //
00827   // Database MetaData
00828   //
00829 
00830   private void databaseMetaDataGetColumns() throws IOException
00831   {
00832     if (logger.isDebugEnabled())
00833       logger.debug("DatabaseMetaDataGetColumns command");
00834     String ccatalog = in.readUTF();
00835     String cschemaPattern = in.readUTF();
00836     String ctableNamePattern = in.readUTF();
00837     String ccolumnNamePattern = in.readUTF();
00838     out.writeObject(vdb.getDynamicMetaData().getColumns(ccatalog,
00839         cschemaPattern, ctableNamePattern, ccolumnNamePattern));
00840     out.flush();
00841   }
00842 
00843   private void databaseMetaDataGetDatabaseProductName() throws IOException
00844   {
00845     if (logger.isDebugEnabled())
00846       logger.debug("GetDatabaseProductName command");
00847     out.writeObject(vdb.getDatabaseProductName());
00848     out.flush();
00849   }
00850 
00851   private void databaseMetaDataGetPrimaryKeys() throws IOException
00852   {
00853     if (logger.isDebugEnabled())
00854       logger.debug("DatabaseMetaDataGetPrimaryKeys command");
00855     String pcatalog = in.readUTF();
00856     String pschemaPattern = in.readUTF();
00857     String ptableNamePattern = in.readUTF();
00858     out.writeObject(vdb.getDynamicMetaData().getPrimaryKeys(pcatalog,
00859         pschemaPattern, ptableNamePattern));
00860     out.flush();
00861   }
00862 
00863   private void databaseMetaDataGetProcedureColumns() throws IOException
00864   {
00865     if (logger.isDebugEnabled())
00866       logger.debug("DatabaseMetaDataGetProcedureColumns command");
00867     String pccatalog = in.readUTF();
00868     String pcschemaPattern = in.readUTF();
00869     String pcprocedureNamePattern = in.readUTF();
00870     String pccolumnNamePattern = in.readUTF();
00871     out.writeObject(vdb.getDynamicMetaData().getProcedureColumns(pccatalog,
00872         pcschemaPattern, pcprocedureNamePattern, pccolumnNamePattern));
00873     out.flush();
00874   }
00875 
00876   private void databaseMetaDataGetProcedures() throws IOException
00877   {
00878     if (logger.isDebugEnabled())
00879       logger.debug("DatabaseMetaDataGetProcedures command");
00880     String rcatalog = in.readUTF();
00881     String rschemaPattern = in.readUTF();
00882     String procedureNamePattern = in.readUTF();
00883     out.writeObject(vdb.getDynamicMetaData().getProcedures(rcatalog,
00884         rschemaPattern, procedureNamePattern));
00885     out.flush();
00886   }
00887 
00888   private void databaseMetaDataGetSchemas() throws IOException
00889   {
00890     if (logger.isDebugEnabled())
00891       logger.debug("DatabaseMetaDataGetSchemas Types command");
00892     out.writeObject(vdb.getDynamicMetaData().getSchemas());
00893     out.flush();
00894   }
00895 
00896   private void databaseMetaDataGetTablePrivileges() throws IOException
00897   {
00898     if (logger.isDebugEnabled())
00899       logger.debug("DatabaseMetaDataGetTablePrivileges command");
00900     String tpcatalog = in.readUTF();
00901     String tpschemaPattern = in.readUTF();
00902     String tptablePattern = in.readUTF();
00903     out.writeObject(vdb.getDynamicMetaData().getTablePrivileges(tpcatalog,
00904         tpschemaPattern, tptablePattern));
00905     out.flush();
00906   }
00907 
00908   private void databaseMetaDataGetTables() throws IOException,
00909       ClassNotFoundException, OptionalDataException
00910   {
00911     if (logger.isDebugEnabled())
00912       logger.debug("DatabaseMetaDataGetTables command");
00913     String tcatalog = in.readUTF();
00914     String tschemaPattern = in.readUTF();
00915     String ttableNamePattern = in.readUTF();
00916     String[] ttypes = (String[]) in.readObject();
00917     out.writeObject(vdb.getDynamicMetaData().getTables(tcatalog,
00918         tschemaPattern, ttableNamePattern, ttypes));
00919     out.flush();
00920   }
00921 
00922   private void databaseMetaDataGetTableTypes() throws IOException
00923   {
00924     if (logger.isDebugEnabled())
00925       logger.debug("DatabaseMetaDataGetTable Types command");
00926     out.writeObject(vdb.getDynamicMetaData().getTableTypes());
00927     out.flush();
00928   }
00929 
00930   private void databaseStaticMetadata() throws IOException
00931   {
00932     String key = in.readUTF();
00933     if (logger.isDebugEnabled())
00934       logger.debug("DatabaseStaticMetadata command for " + key);
00935     MetadataContainer container = vdb.getStaticMetaData()
00936         .getMetadataContainer();
00937     if (container == null)
00938     {
00939       String msg = "No metadata is available probably because no backend is enabled on that controller.";
00940       logger.info(msg);
00941       out.writeObject(new SQLException(msg));
00942     }
00943     else
00944       out.writeObject(container.get(key));
00945     out.flush();
00946   }
00947 
00948   private void getControllerVersionNumber() throws IOException
00949   {
00950     if (logger.isDebugEnabled())
00951       logger.debug("GetControllerVersionNumber command");
00952     out.writeObject(Constants.VERSION);
00953     out.flush();
00954   }
00955 
00956   private void getVirtualDatabaseName() throws IOException
00957   {
00958     if (logger.isDebugEnabled())
00959       logger.debug("GetVirtualDatabaseName command");
00960     out.writeObject(vdb.getDatabaseName());
00961     out.flush();
00962   }
00963 
00964   //
00965   // Transaction management
00966   //
00967 
00968   private void begin() throws SQLException, IOException
00969   {
00970     if (logger.isDebugEnabled())
00971       logger.debug("Begin command");
00972     currentTid = vdb.begin(login);
00973     out.writeObject(new Long(currentTid));
00974     out.flush();
00975     transactionStarted = true;
00976     queryExecutedInThisTransaction = false;
00977   }
00978 
00979   private void commit() throws SQLException, IOException
00980   {
00981     if (logger.isDebugEnabled())
00982       logger.debug("Commit command");
00983     vdb.commit(currentTid);
00984     currentTid = vdb.begin(login);
00985     out.writeObject(new Long(currentTid));
00986     out.flush();
00987     queryExecutedInThisTransaction = false;
00988   }
00989 
00990   private void rollback() throws SQLException, IOException
00991   {
00992     if (logger.isDebugEnabled())
00993       logger.debug("Rollback command");
00994     vdb.rollback(currentTid);
00995     currentTid = vdb.begin(login);
00996     out.writeObject(new Long(currentTid));
00997     out.flush();
00998     queryExecutedInThisTransaction = false;
00999   }
01000 
01001   private void setAutoCommit() throws SQLException, IOException
01002   {
01003     if (logger.isDebugEnabled())
01004       logger.debug("Set Auto commit command");
01005     vdb.commit(currentTid);
01006     currentTid = 0;
01007     transactionStarted = false;
01008     out.writeObject(Boolean.TRUE);
01009     out.flush();
01010   }
01011 
01012   //
01013   // Request execution
01014   //
01015 
01016   private void execReadRequest() throws OptionalDataException, IOException,
01017       SQLException
01018   {
01019     if (logger.isDebugEnabled())
01020       logger.debug("ExecReadRequest command");
01021     SelectRequest select = decodeReadRequestFromStream();
01022     transactionStarted = setRequestParameters(select, login, currentTid,
01023         transactionStarted);
01024     if (!transactionStarted)
01025       currentTid = 0;
01026     else
01027       queryExecutedInThisTransaction = true;
01028     ControllerResultSet crs = vdb.execReadRequest(select);
01029     out.writeObject(crs.getFields());
01030     out.writeObject(crs.getData());
01031     out.writeBoolean(crs.hasMoreData());
01032     if (crs.hasMoreData())
01033     { // Send the cursor name for further references
01034       out.writeUTF(crs.getCursorName());
01035       System.out.println("adding streamedresultset");
01036       // Keep a reference to the ResultSet
01037       streamedResultSet.put(crs.getCursorName(), crs);
01038     }
01039     else
01040       crs = null; // release the reference right away
01041     out.flush();
01042   }
01043 
01044   private void execReadStoredProcedure() throws OptionalDataException,
01045       IOException, SQLException
01046   {
01047     if (logger.isDebugEnabled())
01048       logger.debug("ExecReadStoredProcedure command");
01049     StoredProcedure readProc = decodeProcedureFromStream(true);
01050     transactionStarted = setRequestParameters(readProc, login, currentTid,
01051         transactionStarted);
01052     if (!transactionStarted)
01053       currentTid = 0;
01054     else
01055       queryExecutedInThisTransaction = true;
01056     ControllerResultSet sprs = vdb.execReadStoredProcedure(readProc);
01057     out.writeObject(sprs.getFields());
01058     out.writeObject(sprs.getData());
01059     out.writeBoolean(sprs.hasMoreData());
01060     if (sprs.hasMoreData())
01061     { // Send the cursor name for further references
01062       out.writeUTF(sprs.getCursorName());
01063       System.out.println("adding streamedresultset");
01064       // Keep a reference to the ResultSet
01065       streamedResultSet.put(sprs.getCursorName(), sprs);
01066     }
01067     else
01068       sprs = null; // release references right away
01069     readProc = null;
01070     out.flush();
01071   }
01072 
01073   private void execWriteRequest() throws OptionalDataException, IOException,
01074       SQLException
01075   {
01076     if (logger.isDebugEnabled())
01077       logger.debug("ExecWriteRequest command");
01078     AbstractWriteRequest write = decodeWriteRequestFromStream(false);
01079     transactionStarted = setRequestParameters(write, login, currentTid,
01080         transactionStarted);
01081     if (!transactionStarted)
01082       currentTid = 0;
01083     else
01084       queryExecutedInThisTransaction = true;
01085     out.writeObject(new Integer(vdb.execWriteRequest(write)));
01086     write = null; // release the reference right away
01087     out.flush();
01088   }
01089 
01090   private void execWriteRequestWithKeys() throws OptionalDataException,
01091       IOException, SQLException
01092   {
01093     if (logger.isDebugEnabled())
01094       logger.debug("ExecWriteRequestWithKeys command");
01095     AbstractWriteRequest writeWithKeys = decodeWriteRequestFromStream(true);
01096     transactionStarted = setRequestParameters(writeWithKeys, login, currentTid,
01097         transactionStarted);
01098     if (!transactionStarted)
01099       currentTid = 0;
01100     else
01101       queryExecutedInThisTransaction = true;
01102     ControllerResultSet keys = vdb.execWriteRequestWithKeys(writeWithKeys);
01103     out.writeObject(keys.getFields());
01104     out.writeObject(keys.getData());
01105     out.writeBoolean(keys.hasMoreData());
01106     if (keys.hasMoreData())
01107     { // Send the cursor name for further references
01108       out.writeUTF(keys.getCursorName());
01109       System.out.println("adding streamedresultset");
01110       // Keep a reference to the ResultSet
01111       streamedResultSet.put(keys.getCursorName(), keys);
01112     }
01113     else
01114       keys = null; // release references right away
01115     writeWithKeys = null;
01116     out.flush();
01117   }
01118 
01119   private void execWriteStoredProcedure() throws OptionalDataException,
01120       IOException, SQLException
01121   {
01122     if (logger.isDebugEnabled())
01123       logger.debug("ExecWriteStoredProcedure command");
01124     StoredProcedure writeProc = decodeProcedureFromStream(false);
01125     transactionStarted = setRequestParameters(writeProc, login, currentTid,
01126         transactionStarted);
01127     if (!transactionStarted)
01128       currentTid = 0;
01129     else
01130       queryExecutedInThisTransaction = true;
01131     out.writeObject(new Integer(vdb.execWriteStoredProcedure(writeProc)));
01132     writeProc = null; // release reference right away
01133     out.flush();
01134   }
01135 
01136   private void fetchNextResultSetRows() throws IOException, SQLException
01137   {
01138     if (logger.isDebugEnabled())
01139       logger.debug("FetchNextResultSetRows command");
01140 
01141     String cursorName = in.readUTF();
01142     int fetchSize = in.readInt();
01143     ControllerResultSet fetchCrs = (ControllerResultSet) streamedResultSet
01144         .get(cursorName);
01145     if (fetchCrs == null)
01146     {
01147       out.writeObject(new SQLException(
01148           "No valid ControllerResultSet to fetch data from"));
01149       out.flush();
01150     }
01151     else
01152     {
01153       out.writeObject(fetchCrs.fetchData(fetchSize));
01154       out.writeBoolean(fetchCrs.hasMoreData());
01155       out.flush();
01156       if (!fetchCrs.hasMoreData())
01157         streamedResultSet.remove(cursorName);
01158     }
01159   }
01160 
01161   //
01162   // Public API
01163   //
01164 
01165   /**
01166    * Retrieve general information on this client
01167    * 
01168    * @return an array of string
01169    */
01170   public String[] retrieveClientData()
01171   {
01172     String[] data = new String[6];
01173     data[0] = in.getSocket().getInetAddress().getHostName();
01174     data[1] = in.getSocket().getInetAddress().getHostAddress();
01175     data[2] = String.valueOf(in.getBytesRead());
01176     data[3] = String.valueOf(out.getBytesWritten());
01177     data[4] = String.valueOf(in.getUseCompression());
01178     data[5] = String
01179         .valueOf(((System.currentTimeMillis() - in.getDateCreated()) / 1000));
01180     data[6] = String.valueOf(in.getSpeed());
01181     data[7] = String.valueOf(out.getSpeed());
01182     return data;
01183   }
01184 
01185   /**
01186    * get bytes read
01187    * 
01188    * @return bytes read
01189    */
01190   public long getBytesRead()
01191   {
01192     return in.getBytesRead();
01193   }
01194 
01195   /**
01196    * get bytes written
01197    * 
01198    * @return bytes written
01199    */
01200   public long getBytesWritten()
01201   {
01202     return out.getBytesWritten();
01203   }
01204 
01205   /**
01206    * get time active
01207    * 
01208    * @return time active since started
01209    */
01210   public long getTimeActive()
01211   {
01212     return ((System.currentTimeMillis() - in.getDateCreated()) / 1000);
01213   }
01214 
01215   /**
01216    * get reading speed, WARNING! This is approximate
01217    * 
01218    * @return reading speed
01219    */
01220   public long getReadingSpeed()
01221   {
01222     return in.getSpeed();
01223   }
01224 
01225   /**
01226    * get writing speed, WARNING! This is approximate
01227    * 
01228    * @return writing speed
01229    */
01230   public long getWritingSpeed()
01231   {
01232     return out.getSpeed();
01233   }
01234 
01235   /**
01236    * @return Returns the login of the current user.
01237    */
01238   public String getUser()
01239   {
01240     return user.getLogin();
01241   }
01242 
01243   /**
01244    * Shutdown this thread by setting <code>isKilled</code> value to true. This
01245    * gives time to check for needed rollback transactions
01246    */
01247   public void shutdown()
01248   {
01249     // Tell this thread to stop working gently.
01250     // This will cancel transaction if needed
01251     this.isKilled = true;
01252     try
01253     {
01254       if (waitForCommand)
01255       {
01256         // close only the streams if we're not in the middle of a request
01257         in.close();
01258         out.close();
01259       }
01260     }
01261     catch (IOException e)
01262     {
01263       // ignore, only the input stream should be close
01264       // for this thread to end
01265     }
01266   }
01267 
01268 }

Generated on Mon Apr 11 22:01:35 2005 for C-JDBC by  doxygen 1.3.9.1