Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread Class Reference

Collaboration diagram for org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 VirtualDatabaseWorkerThread (Controller controller, VirtualDatabase vdb)
void run ()
String[] retrieveClientData ()
long getBytesRead ()
long getBytesWritten ()
long getTimeActive ()
long getReadingSpeed ()
long getWritingSpeed ()
String getUser ()
void shutdown ()

Package Attributes

boolean needSkeleton = false

Detailed Description

The VirtualDatabaseWorkerThread handles a connection with a C-JDBC driver.

Author:
Emmanuel Cecchet

Nicolas Modrzyk

Version:
1.0

Definition at line 66 of file VirtualDatabaseWorkerThread.java.


Constructor & Destructor Documentation

org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.VirtualDatabaseWorkerThread Controller  controller,
VirtualDatabase  vdb
 

Creates a new VirtualDatabaseWorkerThread instance.

Parameters:
controller the thread was originated from
vdb the virtual database instantiating this thread.

Definition at line 121 of file VirtualDatabaseWorkerThread.java.

References org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.controller, org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getVirtualDatabaseName(), and org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.logger.

00122   {
00123     super("VirtualDatabaseWorkerThread-" + vdb.getVirtualDatabaseName());
00124     this.vdb = vdb;
00125     this.controller = controller;
00126     try
00127     {
00128       this.logger = Trace
00129           .getLogger("org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread."
00130               + vdb.getVirtualDatabaseName());
00131     }
00132     catch (Exception e)
00133     {
00134       this.logger = vdb.logger;
00135     }
00136   }


Member Function Documentation

long org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.getBytesRead  ) 
 

get bytes read

Returns:
bytes read

Definition at line 1190 of file VirtualDatabaseWorkerThread.java.

01191   {
01192     return in.getBytesRead();
01193   }

long org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.getBytesWritten  ) 
 

get bytes written

Returns:
bytes written

Definition at line 1200 of file VirtualDatabaseWorkerThread.java.

References org.objectweb.cjdbc.common.stream.CJDBCOutputStream.getBytesWritten().

01201   {
01202     return out.getBytesWritten();
01203   }

long org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.getReadingSpeed  ) 
 

get reading speed, WARNING! This is approximate

Returns:
reading speed

Definition at line 1220 of file VirtualDatabaseWorkerThread.java.

References org.objectweb.cjdbc.common.stream.CJDBCOutputStream.getSpeed().

01221   {
01222     return in.getSpeed();
01223   }

long org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.getTimeActive  ) 
 

get time active

Returns:
time active since started

Definition at line 1210 of file VirtualDatabaseWorkerThread.java.

01211   {
01212     return ((System.currentTimeMillis() - in.getDateCreated()) / 1000);
01213   }

String org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.getUser  ) 
 

Returns:
Returns the login of the current user.

Definition at line 1238 of file VirtualDatabaseWorkerThread.java.

Referenced by org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.run().

01239   {
01240     return user.getLogin();
01241   }

long org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.getWritingSpeed  ) 
 

get writing speed, WARNING! This is approximate

Returns:
writing speed

Definition at line 1230 of file VirtualDatabaseWorkerThread.java.

References org.objectweb.cjdbc.common.stream.CJDBCOutputStream.getSpeed().

01231   {
01232     return out.getSpeed();
01233   }

String [] org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.retrieveClientData  ) 
 

Retrieve general information on this client

Returns:
an array of string

Definition at line 1170 of file VirtualDatabaseWorkerThread.java.

References org.objectweb.cjdbc.common.stream.CJDBCOutputStream.getSocket().

01171   {
01172     String[] data = new String[6];
01173     data[0] = in.getSocket().getInetAddress().getHostName();
01174     data[1] = in.getSocket().getInetAddress().getHostAddress();
01175     data[2] = String.valueOf(in.getBytesRead());
01176     data[3] = String.valueOf(out.getBytesWritten());
01177     data[4] = String.valueOf(in.getUseCompression());
01178     data[5] = String
01179         .valueOf(((System.currentTimeMillis() - in.getDateCreated()) / 1000));
01180     data[6] = String.valueOf(in.getSpeed());
01181     data[7] = String.valueOf(out.getSpeed());
01182     return data;
01183   }

void org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.run  ) 
 

Gets a connection from the connection queue and process it.

Definition at line 286 of file VirtualDatabaseWorkerThread.java.

References org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.addIdleThread(), org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet.closeResultSet(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.currentNbOfThreads, org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getActiveThreads(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getAuthenticationManager(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getBlobFilter(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getMaxThreadIdleTime(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getPendingConnections(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getRequestManager(), org.objectweb.cjdbc.controller.requestmanager.RequestManager.getRequiredParsingGranularity(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.getUser(), org.objectweb.cjdbc.common.sql.filters.AbstractBlobFilter.getXml(), org.objectweb.cjdbc.controller.authentication.AuthenticationManager.isValidVirtualUser(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.minNbOfThreads, org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.needSkeleton, org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.poolConnectionThreads, org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.removeCurrentNbOfThread(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.removeIdleThread(), and org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.rollback().

00287   {
00288     ArrayList vdbActiveThreads = vdb.getActiveThreads();
00289     ArrayList vdbPendingQueue = vdb.getPendingConnections();
00290     // List of open ResultSets for streaming. This is not synchronized since the
00291     // connection does only handle one request at a time
00292     streamedResultSet = new HashMap();
00293     boolean isActive = true;
00294 
00295     if (vdbActiveThreads == null)
00296     {
00297       logger
00298           .error("Got null active threads queue in VirtualDatabaseWorkerThread");
00299       isKilled = true;
00300     }
00301     if (vdbPendingQueue == null)
00302     {
00303       logger.error("Got null connection queue in VirtualDatabaseWorkerThread");
00304       isKilled = true;
00305     }
00306 
00307     // Main loop
00308     while (!isKilled)
00309     {
00310       // Get a connection from the pending queue
00311       synchronized (vdbPendingQueue)
00312       {
00313         while (vdbPendingQueue.isEmpty())
00314         {
00315           if (!vdb.poolConnectionThreads)
00316           { // User does not want thread pooling, kill this thread!
00317             isKilled = true;
00318             break;
00319           }
00320           boolean timeout = false;
00321           try
00322           {
00323             if (isActive)
00324             {
00325               isActive = false;
00326               // Remove ourselves from the active thread list
00327               synchronized (vdbActiveThreads)
00328               {
00329                 vdbActiveThreads.remove(this);
00330                 vdb.addIdleThread();
00331               }
00332             }
00333             long before = System.currentTimeMillis();
00334             vdbPendingQueue.wait(vdb.getMaxThreadIdleTime());
00335             long now = System.currentTimeMillis();
00336             // Check if timeout has expired
00337             timeout = now - before >= vdb.getMaxThreadIdleTime();
00338           }
00339           catch (InterruptedException e)
00340           {
00341             logger.warn("VirtualDatabaseWorkerThread wait() interrupted");
00342           }
00343           if (timeout && vdbPendingQueue.isEmpty())
00344           {
00345             if (vdb.currentNbOfThreads > vdb.minNbOfThreads)
00346             { // We have enough threads, kill this one
00347               isKilled = true;
00348               break;
00349             }
00350           }
00351         }
00352 
00353         if (isKilled)
00354         { // Cleaning up
00355           synchronized (vdbActiveThreads)
00356           { // Remove ourselves from the appropriate thread list
00357             if (isActive)
00358             {
00359               vdbActiveThreads.remove(this);
00360               vdb.removeCurrentNbOfThread();
00361             }
00362             else
00363               vdb.removeIdleThread();
00364           }
00365           // Get out of the while loop
00366           continue;
00367         }
00368 
00369         // Get a connection
00370         try
00371         {
00372           in = (CJDBCInputStream) vdbPendingQueue.remove(0);
00373           out = (CJDBCOutputStream) vdbPendingQueue.remove(0);
00374         }
00375         catch (Exception e)
00376         {
00377           logger.error("Error while getting streams from connection");
00378           continue;
00379         }
00380 
00381         synchronized (vdbActiveThreads)
00382         {
00383           if (!isActive)
00384           {
00385             vdb.removeIdleThread();
00386             isActive = true;
00387             // Add this thread to the active thread list
00388             vdbActiveThreads.add(this);
00389           }
00390         }
00391       }
00392 
00393       // Handle connection
00394       // Read the user information and check authentication
00395       boolean success = false;
00396       try
00397       {
00398         login = in.readUTF();
00399         String password = in.readUTF();
00400         user = new VirtualDatabaseUser(login, password);
00401 
00402         if (vdb.getAuthenticationManager().isValidVirtualUser(user))
00403         { // Authentication ok, send if SQL skeleton is needed
00404           needSkeleton = vdb.getRequestManager()
00405               .getRequiredParsingGranularity() != ParsingGranularities.NO_PARSING;
00406           out.writeObject(needSkeleton ? Boolean.TRUE : Boolean.FALSE);
00407           out.writeUTF(vdb.getBlobFilter().getXml());
00408           out.flush();
00409           success = true;
00410 
00411           if (logger.isDebugEnabled())
00412             logger.debug("Login accepted for " + login);
00413         }
00414         else
00415         { // Authentication failed, close the connection
00416           String msg = "Authentication failed for user '" + login + "'";
00417           out.writeObject(new SQLException(msg));
00418           if (logger.isDebugEnabled())
00419             logger.debug(msg);
00420           continue;
00421         }
00422       }
00423       catch (OptionalDataException e)
00424       {
00425         logger.error("Protocol error while expecting user authentication (" + e
00426             + ")");
00427         continue;
00428       }
00429       catch (IOException e)
00430       {
00431         logger.error("I/O error during user authentication (" + e + ")");
00432         continue;
00433       }
00434       finally
00435       {
00436         if (!success)
00437         {
00438           try
00439           {
00440             out.close();
00441             in.close();
00442           }
00443           catch (IOException ignore)
00444           {
00445           }
00446         }
00447       }
00448 
00449       currentTid = 0;
00450       transactionStarted = false;
00451       queryExecutedInThisTransaction = false;
00452       closed = false;
00453       int command;
00454       while (!closed && !isKilled)
00455       {
00456         try
00457         {
00458           // Get the query
00459           waitForCommand = true;
00460           command = in.readInt();
00461           waitForCommand = false;
00462 
00463           // Process it
00464           switch (command)
00465           {
00466             case Commands.ExecReadRequest :
00467               execReadRequest();
00468               break;
00469             case Commands.ExecWriteRequest :
00470               execWriteRequest();
00471               break;
00472             case Commands.ExecWriteRequestWithKeys :
00473               execWriteRequestWithKeys();
00474               break;
00475             case Commands.ExecReadStoredProcedure :
00476               execReadStoredProcedure();
00477               break;
00478             case Commands.ExecWriteStoredProcedure :
00479               execWriteStoredProcedure();
00480               break;
00481             case Commands.Begin :
00482               begin();
00483               break;
00484             case Commands.Commit :
00485               commit();
00486               break;
00487             case Commands.SetAutoCommit :
00488               setAutoCommit();
00489               break;
00490             case Commands.Rollback :
00491               rollback();
00492               break;
00493             case Commands.GetVirtualDatabaseName :
00494               getVirtualDatabaseName();
00495               break;
00496             case Commands.DatabaseMetaDataGetDatabaseProductName :
00497               databaseMetaDataGetDatabaseProductName();
00498               break;
00499             case Commands.GetControllerVersionNumber :
00500               getControllerVersionNumber();
00501               break;
00502             case Commands.DatabaseMetaDataGetTables :
00503               databaseMetaDataGetTables();
00504               break;
00505             case Commands.DatabaseMetaDataGetColumns :
00506               databaseMetaDataGetColumns();
00507               break;
00508             case Commands.DatabaseMetaDataGetPrimaryKeys :
00509               databaseMetaDataGetPrimaryKeys();
00510               break;
00511             case Commands.DatabaseMetaDataGetProcedures :
00512               databaseMetaDataGetProcedures();
00513               break;
00514             case Commands.DatabaseMetaDataGetProcedureColumns :
00515               databaseMetaDataGetProcedureColumns();
00516               break;
00517             case Commands.ConnectionGetCatalogs :
00518               connectionGetCatalogs();
00519               break;
00520             case Commands.ConnectionGetCatalog :
00521               connectionGetCatalog();
00522               break;
00523             case Commands.DatabaseMetaDataGetTableTypes :
00524               databaseMetaDataGetTableTypes();
00525               break;
00526             case Commands.DatabaseMetaDataGetSchemas :
00527               databaseMetaDataGetSchemas();
00528               break;
00529             case Commands.DatabaseMetaDataGetTablePrivileges :
00530               databaseMetaDataGetTablePrivileges();
00531               break;
00532             case Commands.ConnectionSetCatalog :
00533               connectionSetCatalog();
00534               break;
00535             case Commands.Close :
00536               close();
00537               break;
00538             case Commands.Reset :
00539               reset();
00540               break;
00541             case Commands.FetchNextResultSetRows :
00542               fetchNextResultSetRows();
00543               break;
00544             case Commands.CloseRemoteResultSet :
00545               closeRemoteResultSet();
00546               break;
00547             case Commands.DatabaseStaticMetadata :
00548               databaseStaticMetadata();
00549               break;
00550             case Commands.RestoreConnectionState :
00551               restoreConnectionState();
00552               break;
00553             default :
00554               String errorMsg = "Unsupported protocol command: " + command;
00555               logger.error(errorMsg);
00556               out.writeObject(new RuntimeException(errorMsg));
00557               out.flush();
00558               break;
00559           }
00560         }
00561         catch (OptionalDataException e)
00562         {
00563           logger.warn("Protocol error (" + e + ")");
00564           try
00565           {
00566             out.writeObject(e);
00567             out.flush();
00568           }
00569           catch (IOException ignore)
00570           {
00571           }
00572         }
00573         catch (RemoteException e)
00574         {
00575           logger.warn("Error during command execution (" + e + ")");
00576           try
00577           {
00578             out.writeObject(e);
00579             out.flush();
00580           }
00581           catch (IOException ignore)
00582           {
00583           }
00584         }
00585         catch (EOFException e)
00586         {
00587           logger.warn("Client (login:" + login + ",host:"
00588               + in.getSocket().getInetAddress().getHostName()
00589               + " closed connection with server");
00590           closed = true;
00591         }
00592         catch (SocketException e)
00593         {
00594           // shutting down
00595           closed = true;
00596         }
00597         catch (IOException e)
00598         {
00599           closed = true;
00600           logger.warn("Closing connection with client " + login
00601               + " because of IOException.(" + e + ")");
00602         }
00603         catch (ClassNotFoundException e)
00604         {
00605           logger.error("Protocol error (" + e + ")");
00606           try
00607           {
00608             out.writeObject(e);
00609             out.flush();
00610           }
00611           catch (IOException ignore)
00612           {
00613           }
00614         }
00615         catch (SQLException e)
00616         {
00617           logger
00618               .warn("Error during command execution (" + e.getMessage() + ")");
00619           try
00620           {
00621             out.writeObject(e);
00622             out.flush();
00623           }
00624           catch (IOException ignore)
00625           {
00626           }
00627         }
00628         catch (RuntimeException e)
00629         {
00630           logger.warn("Runtime error during command execution ("
00631               + e.getMessage() + ")", e);
00632           try
00633           {
00634             out.writeObject(new SQLException(e.getMessage()));
00635             out.flush();
00636           }
00637           catch (IOException ignore)
00638           {
00639           }
00640         }
00641       }
00642 
00643       // Do the cleanup
00644       if (transactionStarted)
00645       {
00646         if (logger.isDebugEnabled())
00647           logger.debug("Forcing transaction " + currentTid + " rollback");
00648         try
00649         {
00650           vdb.rollback(currentTid);
00651         }
00652         catch (Exception e)
00653         {
00654           if (logger.isDebugEnabled())
00655             logger.debug("Error during rollback of transaction " + currentTid
00656                 + "(" + e + ")");
00657         }
00658       }
00659       if (!streamedResultSet.isEmpty())
00660       {
00661         for (Iterator iter = streamedResultSet.values().iterator(); iter
00662             .hasNext();)
00663         {
00664           ControllerResultSet crs = (ControllerResultSet) iter.next();
00665           crs.closeResultSet();
00666         }
00667         streamedResultSet.clear();
00668       }
00669       try
00670       {
00671         in.close();
00672       }
00673       catch (IOException ignore)
00674       {
00675       }
00676       try
00677       {
00678         out.close();
00679       }
00680       catch (IOException ignore)
00681       {
00682       }
00683     }
00684 
00685     if (logger.isDebugEnabled())
00686       logger.debug("VirtualDatabaseWorkerThread associated to login: "
00687           + this.getUser() + " terminating.");
00688   }

void org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.shutdown  ) 
 

Shutdown this thread by setting isKilled value to true. This gives time to check for needed rollback transactions

Definition at line 1247 of file VirtualDatabaseWorkerThread.java.

References org.objectweb.cjdbc.common.stream.CJDBCOutputStream.close().

Referenced by org.objectweb.cjdbc.controller.core.shutdown.VirtualDatabaseShutdownThread.terminateVirtualDatabaseWorkerThreads().

01248   {
01249     // Tell this thread to stop working gently.
01250     // This will cancel transaction if needed
01251     this.isKilled = true;
01252     try
01253     {
01254       if (waitForCommand)
01255       {
01256         // close only the streams if we're not in the middle of a request
01257         in.close();
01258         out.close();
01259       }
01260     }
01261     catch (IOException e)
01262     {
01263       // ignore, only the input stream should be close
01264       // for this thread to end
01265     }
01266   }


Member Data Documentation

boolean org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.needSkeleton = false [package]
 

Needed for parsing

Definition at line 80 of file VirtualDatabaseWorkerThread.java.

Referenced by org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.run().


The documentation for this class was generated from the following file:
Generated on Mon Apr 11 22:04:47 2005 for C-JDBC by  doxygen 1.3.9.1