Public Member Functions | |
VirtualDatabaseWorkerThread (Controller controller, VirtualDatabase vdb) | |
void | run () |
String[] | retrieveClientData () |
long | getBytesRead () |
long | getBytesWritten () |
long | getTimeActive () |
long | getReadingSpeed () |
long | getWritingSpeed () |
String | getUser () |
void | shutdown () |
Package Attributes | |
boolean | needSkeleton = false |
VirtualDatabaseWorkerThread
handles a connection with a C-JDBC driver.
Definition at line 66 of file VirtualDatabaseWorkerThread.java.
|
Creates a new
Definition at line 121 of file VirtualDatabaseWorkerThread.java. References org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.controller, org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getVirtualDatabaseName(), and org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.logger. 00122 { 00123 super("VirtualDatabaseWorkerThread-" + vdb.getVirtualDatabaseName()); 00124 this.vdb = vdb; 00125 this.controller = controller; 00126 try 00127 { 00128 this.logger = Trace 00129 .getLogger("org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread." 00130 + vdb.getVirtualDatabaseName()); 00131 } 00132 catch (Exception e) 00133 { 00134 this.logger = vdb.logger; 00135 } 00136 }
|
|
get bytes read
Definition at line 1190 of file VirtualDatabaseWorkerThread.java. 01191 {
01192 return in.getBytesRead();
01193 }
|
|
get bytes written
Definition at line 1200 of file VirtualDatabaseWorkerThread.java. References org.objectweb.cjdbc.common.stream.CJDBCOutputStream.getBytesWritten(). 01201 {
01202 return out.getBytesWritten();
01203 }
|
|
get reading speed, WARNING! This is approximate
Definition at line 1220 of file VirtualDatabaseWorkerThread.java. References org.objectweb.cjdbc.common.stream.CJDBCOutputStream.getSpeed(). 01221 {
01222 return in.getSpeed();
01223 }
|
|
get time active
Definition at line 1210 of file VirtualDatabaseWorkerThread.java. 01211 {
01212 return ((System.currentTimeMillis() - in.getDateCreated()) / 1000);
01213 }
|
|
Definition at line 1238 of file VirtualDatabaseWorkerThread.java. Referenced by org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.run(). 01239 {
01240 return user.getLogin();
01241 }
|
|
get writing speed, WARNING! This is approximate
Definition at line 1230 of file VirtualDatabaseWorkerThread.java. References org.objectweb.cjdbc.common.stream.CJDBCOutputStream.getSpeed(). 01231 {
01232 return out.getSpeed();
01233 }
|
|
Retrieve general information on this client
Definition at line 1170 of file VirtualDatabaseWorkerThread.java. References org.objectweb.cjdbc.common.stream.CJDBCOutputStream.getSocket(). 01171 { 01172 String[] data = new String[6]; 01173 data[0] = in.getSocket().getInetAddress().getHostName(); 01174 data[1] = in.getSocket().getInetAddress().getHostAddress(); 01175 data[2] = String.valueOf(in.getBytesRead()); 01176 data[3] = String.valueOf(out.getBytesWritten()); 01177 data[4] = String.valueOf(in.getUseCompression()); 01178 data[5] = String 01179 .valueOf(((System.currentTimeMillis() - in.getDateCreated()) / 1000)); 01180 data[6] = String.valueOf(in.getSpeed()); 01181 data[7] = String.valueOf(out.getSpeed()); 01182 return data; 01183 }
|
|
Gets a connection from the connection queue and process it. Definition at line 286 of file VirtualDatabaseWorkerThread.java. References org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.addIdleThread(), org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet.closeResultSet(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.currentNbOfThreads, org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getActiveThreads(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getAuthenticationManager(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getBlobFilter(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getMaxThreadIdleTime(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getPendingConnections(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getRequestManager(), org.objectweb.cjdbc.controller.requestmanager.RequestManager.getRequiredParsingGranularity(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.getUser(), org.objectweb.cjdbc.common.sql.filters.AbstractBlobFilter.getXml(), org.objectweb.cjdbc.controller.authentication.AuthenticationManager.isValidVirtualUser(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.minNbOfThreads, org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.needSkeleton, org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.poolConnectionThreads, org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.removeCurrentNbOfThread(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.removeIdleThread(), and org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.rollback(). 00287 { 00288 ArrayList vdbActiveThreads = vdb.getActiveThreads(); 00289 ArrayList vdbPendingQueue = vdb.getPendingConnections(); 00290 // List of open ResultSets for streaming. This is not synchronized since the 00291 // connection does only handle one request at a time 00292 streamedResultSet = new HashMap(); 00293 boolean isActive = true; 00294 00295 if (vdbActiveThreads == null) 00296 { 00297 logger 00298 .error("Got null active threads queue in VirtualDatabaseWorkerThread"); 00299 isKilled = true; 00300 } 00301 if (vdbPendingQueue == null) 00302 { 00303 logger.error("Got null connection queue in VirtualDatabaseWorkerThread"); 00304 isKilled = true; 00305 } 00306 00307 // Main loop 00308 while (!isKilled) 00309 { 00310 // Get a connection from the pending queue 00311 synchronized (vdbPendingQueue) 00312 { 00313 while (vdbPendingQueue.isEmpty()) 00314 { 00315 if (!vdb.poolConnectionThreads) 00316 { // User does not want thread pooling, kill this thread! 00317 isKilled = true; 00318 break; 00319 } 00320 boolean timeout = false; 00321 try 00322 { 00323 if (isActive) 00324 { 00325 isActive = false; 00326 // Remove ourselves from the active thread list 00327 synchronized (vdbActiveThreads) 00328 { 00329 vdbActiveThreads.remove(this); 00330 vdb.addIdleThread(); 00331 } 00332 } 00333 long before = System.currentTimeMillis(); 00334 vdbPendingQueue.wait(vdb.getMaxThreadIdleTime()); 00335 long now = System.currentTimeMillis(); 00336 // Check if timeout has expired 00337 timeout = now - before >= vdb.getMaxThreadIdleTime(); 00338 } 00339 catch (InterruptedException e) 00340 { 00341 logger.warn("VirtualDatabaseWorkerThread wait() interrupted"); 00342 } 00343 if (timeout && vdbPendingQueue.isEmpty()) 00344 { 00345 if (vdb.currentNbOfThreads > vdb.minNbOfThreads) 00346 { // We have enough threads, kill this one 00347 isKilled = true; 00348 break; 00349 } 00350 } 00351 } 00352 00353 if (isKilled) 00354 { // Cleaning up 00355 synchronized (vdbActiveThreads) 00356 { // Remove ourselves from the appropriate thread list 00357 if (isActive) 00358 { 00359 vdbActiveThreads.remove(this); 00360 vdb.removeCurrentNbOfThread(); 00361 } 00362 else 00363 vdb.removeIdleThread(); 00364 } 00365 // Get out of the while loop 00366 continue; 00367 } 00368 00369 // Get a connection 00370 try 00371 { 00372 in = (CJDBCInputStream) vdbPendingQueue.remove(0); 00373 out = (CJDBCOutputStream) vdbPendingQueue.remove(0); 00374 } 00375 catch (Exception e) 00376 { 00377 logger.error("Error while getting streams from connection"); 00378 continue; 00379 } 00380 00381 synchronized (vdbActiveThreads) 00382 { 00383 if (!isActive) 00384 { 00385 vdb.removeIdleThread(); 00386 isActive = true; 00387 // Add this thread to the active thread list 00388 vdbActiveThreads.add(this); 00389 } 00390 } 00391 } 00392 00393 // Handle connection 00394 // Read the user information and check authentication 00395 boolean success = false; 00396 try 00397 { 00398 login = in.readUTF(); 00399 String password = in.readUTF(); 00400 user = new VirtualDatabaseUser(login, password); 00401 00402 if (vdb.getAuthenticationManager().isValidVirtualUser(user)) 00403 { // Authentication ok, send if SQL skeleton is needed 00404 needSkeleton = vdb.getRequestManager() 00405 .getRequiredParsingGranularity() != ParsingGranularities.NO_PARSING; 00406 out.writeObject(needSkeleton ? Boolean.TRUE : Boolean.FALSE); 00407 out.writeUTF(vdb.getBlobFilter().getXml()); 00408 out.flush(); 00409 success = true; 00410 00411 if (logger.isDebugEnabled()) 00412 logger.debug("Login accepted for " + login); 00413 } 00414 else 00415 { // Authentication failed, close the connection 00416 String msg = "Authentication failed for user '" + login + "'"; 00417 out.writeObject(new SQLException(msg)); 00418 if (logger.isDebugEnabled()) 00419 logger.debug(msg); 00420 continue; 00421 } 00422 } 00423 catch (OptionalDataException e) 00424 { 00425 logger.error("Protocol error while expecting user authentication (" + e 00426 + ")"); 00427 continue; 00428 } 00429 catch (IOException e) 00430 { 00431 logger.error("I/O error during user authentication (" + e + ")"); 00432 continue; 00433 } 00434 finally 00435 { 00436 if (!success) 00437 { 00438 try 00439 { 00440 out.close(); 00441 in.close(); 00442 } 00443 catch (IOException ignore) 00444 { 00445 } 00446 } 00447 } 00448 00449 currentTid = 0; 00450 transactionStarted = false; 00451 queryExecutedInThisTransaction = false; 00452 closed = false; 00453 int command; 00454 while (!closed && !isKilled) 00455 { 00456 try 00457 { 00458 // Get the query 00459 waitForCommand = true; 00460 command = in.readInt(); 00461 waitForCommand = false; 00462 00463 // Process it 00464 switch (command) 00465 { 00466 case Commands.ExecReadRequest : 00467 execReadRequest(); 00468 break; 00469 case Commands.ExecWriteRequest : 00470 execWriteRequest(); 00471 break; 00472 case Commands.ExecWriteRequestWithKeys : 00473 execWriteRequestWithKeys(); 00474 break; 00475 case Commands.ExecReadStoredProcedure : 00476 execReadStoredProcedure(); 00477 break; 00478 case Commands.ExecWriteStoredProcedure : 00479 execWriteStoredProcedure(); 00480 break; 00481 case Commands.Begin : 00482 begin(); 00483 break; 00484 case Commands.Commit : 00485 commit(); 00486 break; 00487 case Commands.SetAutoCommit : 00488 setAutoCommit(); 00489 break; 00490 case Commands.Rollback : 00491 rollback(); 00492 break; 00493 case Commands.GetVirtualDatabaseName : 00494 getVirtualDatabaseName(); 00495 break; 00496 case Commands.DatabaseMetaDataGetDatabaseProductName : 00497 databaseMetaDataGetDatabaseProductName(); 00498 break; 00499 case Commands.GetControllerVersionNumber : 00500 getControllerVersionNumber(); 00501 break; 00502 case Commands.DatabaseMetaDataGetTables : 00503 databaseMetaDataGetTables(); 00504 break; 00505 case Commands.DatabaseMetaDataGetColumns : 00506 databaseMetaDataGetColumns(); 00507 break; 00508 case Commands.DatabaseMetaDataGetPrimaryKeys : 00509 databaseMetaDataGetPrimaryKeys(); 00510 break; 00511 case Commands.DatabaseMetaDataGetProcedures : 00512 databaseMetaDataGetProcedures(); 00513 break; 00514 case Commands.DatabaseMetaDataGetProcedureColumns : 00515 databaseMetaDataGetProcedureColumns(); 00516 break; 00517 case Commands.ConnectionGetCatalogs : 00518 connectionGetCatalogs(); 00519 break; 00520 case Commands.ConnectionGetCatalog : 00521 connectionGetCatalog(); 00522 break; 00523 case Commands.DatabaseMetaDataGetTableTypes : 00524 databaseMetaDataGetTableTypes(); 00525 break; 00526 case Commands.DatabaseMetaDataGetSchemas : 00527 databaseMetaDataGetSchemas(); 00528 break; 00529 case Commands.DatabaseMetaDataGetTablePrivileges : 00530 databaseMetaDataGetTablePrivileges(); 00531 break; 00532 case Commands.ConnectionSetCatalog : 00533 connectionSetCatalog(); 00534 break; 00535 case Commands.Close : 00536 close(); 00537 break; 00538 case Commands.Reset : 00539 reset(); 00540 break; 00541 case Commands.FetchNextResultSetRows : 00542 fetchNextResultSetRows(); 00543 break; 00544 case Commands.CloseRemoteResultSet : 00545 closeRemoteResultSet(); 00546 break; 00547 case Commands.DatabaseStaticMetadata : 00548 databaseStaticMetadata(); 00549 break; 00550 case Commands.RestoreConnectionState : 00551 restoreConnectionState(); 00552 break; 00553 default : 00554 String errorMsg = "Unsupported protocol command: " + command; 00555 logger.error(errorMsg); 00556 out.writeObject(new RuntimeException(errorMsg)); 00557 out.flush(); 00558 break; 00559 } 00560 } 00561 catch (OptionalDataException e) 00562 { 00563 logger.warn("Protocol error (" + e + ")"); 00564 try 00565 { 00566 out.writeObject(e); 00567 out.flush(); 00568 } 00569 catch (IOException ignore) 00570 { 00571 } 00572 } 00573 catch (RemoteException e) 00574 { 00575 logger.warn("Error during command execution (" + e + ")"); 00576 try 00577 { 00578 out.writeObject(e); 00579 out.flush(); 00580 } 00581 catch (IOException ignore) 00582 { 00583 } 00584 } 00585 catch (EOFException e) 00586 { 00587 logger.warn("Client (login:" + login + ",host:" 00588 + in.getSocket().getInetAddress().getHostName() 00589 + " closed connection with server"); 00590 closed = true; 00591 } 00592 catch (SocketException e) 00593 { 00594 // shutting down 00595 closed = true; 00596 } 00597 catch (IOException e) 00598 { 00599 closed = true; 00600 logger.warn("Closing connection with client " + login 00601 + " because of IOException.(" + e + ")"); 00602 } 00603 catch (ClassNotFoundException e) 00604 { 00605 logger.error("Protocol error (" + e + ")"); 00606 try 00607 { 00608 out.writeObject(e); 00609 out.flush(); 00610 } 00611 catch (IOException ignore) 00612 { 00613 } 00614 } 00615 catch (SQLException e) 00616 { 00617 logger 00618 .warn("Error during command execution (" + e.getMessage() + ")"); 00619 try 00620 { 00621 out.writeObject(e); 00622 out.flush(); 00623 } 00624 catch (IOException ignore) 00625 { 00626 } 00627 } 00628 catch (RuntimeException e) 00629 { 00630 logger.warn("Runtime error during command execution (" 00631 + e.getMessage() + ")", e); 00632 try 00633 { 00634 out.writeObject(new SQLException(e.getMessage())); 00635 out.flush(); 00636 } 00637 catch (IOException ignore) 00638 { 00639 } 00640 } 00641 } 00642 00643 // Do the cleanup 00644 if (transactionStarted) 00645 { 00646 if (logger.isDebugEnabled()) 00647 logger.debug("Forcing transaction " + currentTid + " rollback"); 00648 try 00649 { 00650 vdb.rollback(currentTid); 00651 } 00652 catch (Exception e) 00653 { 00654 if (logger.isDebugEnabled()) 00655 logger.debug("Error during rollback of transaction " + currentTid 00656 + "(" + e + ")"); 00657 } 00658 } 00659 if (!streamedResultSet.isEmpty()) 00660 { 00661 for (Iterator iter = streamedResultSet.values().iterator(); iter 00662 .hasNext();) 00663 { 00664 ControllerResultSet crs = (ControllerResultSet) iter.next(); 00665 crs.closeResultSet(); 00666 } 00667 streamedResultSet.clear(); 00668 } 00669 try 00670 { 00671 in.close(); 00672 } 00673 catch (IOException ignore) 00674 { 00675 } 00676 try 00677 { 00678 out.close(); 00679 } 00680 catch (IOException ignore) 00681 { 00682 } 00683 } 00684 00685 if (logger.isDebugEnabled()) 00686 logger.debug("VirtualDatabaseWorkerThread associated to login: " 00687 + this.getUser() + " terminating."); 00688 }
|
|
Shutdown this thread by setting Definition at line 1247 of file VirtualDatabaseWorkerThread.java. References org.objectweb.cjdbc.common.stream.CJDBCOutputStream.close(). Referenced by org.objectweb.cjdbc.controller.core.shutdown.VirtualDatabaseShutdownThread.terminateVirtualDatabaseWorkerThreads(). 01248 { 01249 // Tell this thread to stop working gently. 01250 // This will cancel transaction if needed 01251 this.isKilled = true; 01252 try 01253 { 01254 if (waitForCommand) 01255 { 01256 // close only the streams if we're not in the middle of a request 01257 in.close(); 01258 out.close(); 01259 } 01260 } 01261 catch (IOException e) 01262 { 01263 // ignore, only the input stream should be close 01264 // for this thread to end 01265 } 01266 }
|
|
Needed for parsing Definition at line 80 of file VirtualDatabaseWorkerThread.java. Referenced by org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread.run(). |