src/org/objectweb/cjdbc/controller/virtualdatabase/VirtualDatabaseWorkerThread.java

説明を見る。
00001 00025 package org.objectweb.cjdbc.controller.virtualdatabase; 00026 00027 import java.io.EOFException; 00028 import java.io.IOException; 00029 import java.io.OptionalDataException; 00030 import java.net.SocketException; 00031 import java.rmi.RemoteException; 00032 import java.sql.SQLException; 00033 import java.util.ArrayList; 00034 import java.util.HashMap; 00035 import java.util.Iterator; 00036 00037 import org.objectweb.cjdbc.common.log.Trace; 00038 import org.objectweb.cjdbc.common.sql.AbstractRequest; 00039 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 00040 import org.objectweb.cjdbc.common.sql.AlterRequest; 00041 import org.objectweb.cjdbc.common.sql.CreateRequest; 00042 import org.objectweb.cjdbc.common.sql.DeleteRequest; 00043 import org.objectweb.cjdbc.common.sql.DropRequest; 00044 import org.objectweb.cjdbc.common.sql.InsertRequest; 00045 import org.objectweb.cjdbc.common.sql.ParsingGranularities; 00046 import org.objectweb.cjdbc.common.sql.SelectRequest; 00047 import org.objectweb.cjdbc.common.sql.StoredProcedure; 00048 import org.objectweb.cjdbc.common.sql.UpdateRequest; 00049 import org.objectweb.cjdbc.common.stream.CJDBCInputStream; 00050 import org.objectweb.cjdbc.common.stream.CJDBCOutputStream; 00051 import org.objectweb.cjdbc.common.users.VirtualDatabaseUser; 00052 import org.objectweb.cjdbc.common.util.Constants; 00053 import org.objectweb.cjdbc.controller.core.Controller; 00054 import org.objectweb.cjdbc.driver.protocol.CommandCompleted; 00055 import org.objectweb.cjdbc.driver.protocol.Commands; 00056 00065 public class VirtualDatabaseWorkerThread extends Thread 00066 { 00067 // 00068 // How the code is organized? 00069 // 00070 // 1. Member variables 00071 // 2. Constructor(s) 00072 // 3. Request management 00073 // 4. Getter/Setters 00074 00075 private static int threadID = 0; 00076 00078 private boolean isKilled = false; 00079 00081 boolean needSkeleton = false; 00082 00084 private VirtualDatabase vdb; 00085 00086 private final CommandCompleted completed = new CommandCompleted(); 00087 00089 private Trace logger = null; 00090 00091 private CJDBCInputStream in = null; 00092 private CJDBCOutputStream out = null; 00093 00094 private VirtualDatabaseUser user; 00095 00096 private Controller controller; 00097 00098 private boolean waitForCommand; 00099 00100 private HashMap streamedResultSet; 00101 00102 /* 00103 * Constructor 00104 */ 00105 00112 public VirtualDatabaseWorkerThread(Controller controller, VirtualDatabase vdb) 00113 { 00114 super("VirtualDatabaseWorkerThread-" + vdb.getName()); 00115 this.vdb = vdb; 00116 this.controller = controller; 00117 try 00118 { 00119 this.logger = Trace 00120 .getLogger("org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread." 00121 + vdb.getName() + "." + (threadID++)); 00122 } 00123 catch (Exception e) 00124 { 00125 this.logger = vdb.logger; 00126 } 00127 } 00128 00133 public void shutdown() 00134 { 00135 // Tell this thread to stop working gently. 00136 // This will cancel transaction if needed 00137 this.isKilled = true; 00138 try 00139 { 00140 if (waitForCommand) 00141 { 00142 // close only the streams if we're not in the middle of a request 00143 in.close(); 00144 out.close(); 00145 } 00146 } 00147 catch (IOException e) 00148 { 00149 // ignore, only the input stream should be close 00150 // for this thread to end 00151 } 00152 00153 } 00154 00165 private boolean setRequestParameters(AbstractRequest request, String login, 00166 long tid, boolean transactionStarted) throws SQLException 00167 { 00168 request.setLogin(login); 00169 if (request.isAutoCommit() && transactionStarted) 00170 { 00171 vdb.commit(tid); 00172 return false; 00173 } 00174 else 00175 request.setTransactionId(tid); 00176 return transactionStarted; 00177 } 00178 00182 public void run() 00183 { 00184 ArrayList vdbActiveThreads = vdb.getActiveThreads(); 00185 ArrayList vdbPendingQueue = vdb.getPendingConnections(); 00186 // List of open ResultSets for streaming. This is not synchronized since the 00187 // connection does only handle one request at a time 00188 streamedResultSet = new HashMap(); 00189 boolean isActive = true; 00190 00191 if (vdbActiveThreads == null) 00192 { 00193 logger 00194 .error("Got null active threads queue in VirtualDatabaseWorkerThread"); 00195 isKilled = true; 00196 } 00197 if (vdbPendingQueue == null) 00198 { 00199 logger.error("Got null connection queue in VirtualDatabaseWorkerThread"); 00200 isKilled = true; 00201 } 00202 00203 // Main loop 00204 while (!isKilled) 00205 { 00206 // Get a connection from the pending queue 00207 synchronized (vdbPendingQueue) 00208 { 00209 while (vdbPendingQueue.isEmpty()) 00210 { 00211 if (!vdb.poolConnectionThreads) 00212 { // User does not want thread pooling, kill this thread! 00213 isKilled = true; 00214 break; 00215 } 00216 boolean timeout = false; 00217 try 00218 { 00219 if (isActive) 00220 { 00221 isActive = false; 00222 // Remove ourselves from the active thread list 00223 synchronized (vdbActiveThreads) 00224 { 00225 vdbActiveThreads.remove(this); 00226 vdb.addIdleThread(); 00227 } 00228 } 00229 long before = System.currentTimeMillis(); 00230 vdbPendingQueue.wait(vdb.getMaxThreadIdleTime()); 00231 long now = System.currentTimeMillis(); 00232 // Check if timeout has expired 00233 timeout = now - before >= vdb.getMaxThreadIdleTime(); 00234 } 00235 catch (InterruptedException e) 00236 { 00237 logger.warn("VirtualDatabaseWorkerThread wait() interrupted"); 00238 } 00239 if (timeout && vdbPendingQueue.isEmpty()) 00240 { 00241 if (vdb.currentNbOfThreads > vdb.minNbOfThreads) 00242 { // We have enough threads, kill this one 00243 isKilled = true; 00244 break; 00245 } 00246 } 00247 } 00248 00249 if (isKilled) 00250 { // Cleaning up 00251 synchronized (vdbActiveThreads) 00252 { // Remove ourselves from the appropriate thread list 00253 if (isActive) 00254 { 00255 vdbActiveThreads.remove(this); 00256 vdb.removeCurrentNbOfThread(); 00257 } 00258 else 00259 vdb.removeIdleThread(); 00260 } 00261 // Get out of the while loop 00262 continue; 00263 } 00264 00265 // Get a connection 00266 try 00267 { 00268 in = (CJDBCInputStream) vdbPendingQueue.remove(0); 00269 out = (CJDBCOutputStream) vdbPendingQueue.remove(0); 00270 } 00271 catch (Exception e) 00272 { 00273 logger.error("Error while getting streams from connection"); 00274 continue; 00275 } 00276 00277 synchronized (vdbActiveThreads) 00278 { 00279 if (!isActive) 00280 { 00281 vdb.removeIdleThread(); 00282 isActive = true; 00283 // Add this thread to the active thread list 00284 vdbActiveThreads.add(this); 00285 } 00286 } 00287 } 00288 00289 // Handle connection 00290 // Read the user information and check authentication 00291 boolean success = false; 00292 String login; 00293 try 00294 { 00295 login = in.readUTF(); 00296 String password = in.readUTF(); 00297 user = new VirtualDatabaseUser(login, password); 00298 00299 if (vdb.getAuthenticationManager().isValidVirtualUser(user)) 00300 { // Authentication ok, send if SQL skeleton is needed 00301 needSkeleton = vdb.getRequestManager() 00302 .getRequiredParsingGranularity() != ParsingGranularities.NO_PARSING; 00303 out.writeObject(new Boolean(needSkeleton)); 00304 out.writeUTF(vdb.getBlobFilter().getXml()); 00305 out.flush(); 00306 success = true; 00307 00308 if (logger.isDebugEnabled()) 00309 logger.debug("Login accepted for " + login); 00310 } 00311 else 00312 { // Authentication failed, close the connection 00313 String msg = "Authentication failed for user '" + login + "'"; 00314 out.writeObject(new SQLException(msg)); 00315 if (logger.isDebugEnabled()) 00316 logger.debug(msg); 00317 continue; 00318 } 00319 } 00320 catch (OptionalDataException e) 00321 { 00322 logger.error("Protocol error while expecting user authentication (" + e 00323 + ")"); 00324 continue; 00325 } 00326 catch (IOException e) 00327 { 00328 logger.error("I/O error during user authentication (" + e + ")"); 00329 continue; 00330 } 00331 finally 00332 { 00333 if (!success) 00334 { 00335 try 00336 { 00337 out.close(); 00338 in.close(); 00339 } 00340 catch (IOException ignore) 00341 { 00342 } 00343 } 00344 } 00345 00346 // Ok, let's process the queries on this connection 00347 long currentTid = 0; 00348 boolean transactionStarted = false; 00349 boolean queryExecutedInThisTransaction = false; 00350 boolean closed = false; 00351 int command; 00352 while (!closed && !isKilled) 00353 { 00354 try 00355 { 00356 // Get the query 00357 waitForCommand = true; 00358 command = in.readInt(); 00359 waitForCommand = false; 00360 00361 // Process it 00362 switch (command) 00363 { 00364 case Commands.ExecReadRequest : 00365 if (logger.isDebugEnabled()) 00366 logger.debug("ExecReadRequest command"); 00367 /* Recreate select request begin */ 00368 SelectRequest select = readRequestFromStream(); 00369 transactionStarted = setRequestParameters(select, login, 00370 currentTid, transactionStarted); 00371 /* Recreate select request end */ 00372 00373 if (!transactionStarted) 00374 currentTid = 0; 00375 else 00376 queryExecutedInThisTransaction = true; 00377 ControllerResultSet crs = vdb.execReadRequest(select); 00378 if (crs.hasMoreData()) 00379 streamedResultSet.put(crs.getCursorName(), crs); 00380 out.writeObject(crs.getFields()); 00381 out.writeObject(crs.getData()); 00382 out.writeBoolean(crs.hasMoreData()); 00383 if (crs.hasMoreData()) 00384 out.writeUTF(crs.getCursorName()); 00385 out.flush(); 00386 break; 00387 case Commands.ExecWriteRequest : 00388 if (logger.isDebugEnabled()) 00389 logger.debug("ExecWriteRequest command"); 00390 AbstractWriteRequest write = writeRequestFromStream(false); 00391 transactionStarted = setRequestParameters(write, login, 00392 currentTid, transactionStarted); 00393 if (!transactionStarted) 00394 currentTid = 0; 00395 else 00396 queryExecutedInThisTransaction = true; 00397 out.writeObject(new Integer(vdb.execWriteRequest(write))); 00398 out.flush(); 00399 break; 00400 case Commands.ExecWriteRequestWithKeys : 00401 if (logger.isDebugEnabled()) 00402 logger.debug("ExecWriteRequestWithKeys command"); 00403 AbstractWriteRequest writeWithKeys = writeRequestFromStream(true); 00404 transactionStarted = setRequestParameters(writeWithKeys, login, 00405 currentTid, transactionStarted); 00406 if (!transactionStarted) 00407 currentTid = 0; 00408 else 00409 queryExecutedInThisTransaction = true; 00410 ControllerResultSet keys = vdb 00411 .execWriteRequestWithKeys(writeWithKeys); 00412 out.writeObject(keys.getFields()); 00413 out.writeObject(keys.getData()); 00414 out.writeBoolean(keys.hasMoreData()); 00415 if (keys.hasMoreData()) 00416 out.writeUTF(keys.getCursorName()); 00417 out.flush(); 00418 break; 00419 case Commands.ExecReadStoredProcedure : 00420 if (logger.isDebugEnabled()) 00421 logger.debug("ExecReadStoredProcedure command"); 00422 StoredProcedure readProc = procedureFromStream(true); 00423 transactionStarted = setRequestParameters(readProc, login, 00424 currentTid, transactionStarted); 00425 if (!transactionStarted) 00426 currentTid = 0; 00427 else 00428 queryExecutedInThisTransaction = true; 00429 ControllerResultSet sprs = vdb.execReadStoredProcedure(readProc); 00430 out.writeObject(sprs.getFields()); 00431 out.writeObject(sprs.getData()); 00432 out.writeBoolean(sprs.hasMoreData()); 00433 if (sprs.hasMoreData()) 00434 out.writeUTF(sprs.getCursorName()); 00435 out.flush(); 00436 break; 00437 case Commands.ExecWriteStoredProcedure : 00438 if (logger.isDebugEnabled()) 00439 logger.debug("ExecWriteStoredProcedure command"); 00440 StoredProcedure writeProc = procedureFromStream(false); 00441 transactionStarted = setRequestParameters(writeProc, login, 00442 currentTid, transactionStarted); 00443 if (!transactionStarted) 00444 currentTid = 0; 00445 else 00446 queryExecutedInThisTransaction = true; 00447 out.writeObject(new Integer(vdb 00448 .execWriteStoredProcedure(writeProc))); 00449 out.flush(); 00450 break; 00451 case Commands.Begin : 00452 if (logger.isDebugEnabled()) 00453 logger.debug("Begin command"); 00454 currentTid = vdb.begin(login); 00455 out.writeObject(new Long(currentTid)); 00456 out.flush(); 00457 transactionStarted = true; 00458 break; 00459 case Commands.Commit : 00460 if (logger.isDebugEnabled()) 00461 logger.debug("Commit command"); 00462 vdb.commit(currentTid); 00463 currentTid = vdb.begin(login); 00464 out.writeObject(new Long(currentTid)); 00465 out.flush(); 00466 break; 00467 case Commands.SetAutoCommit : 00468 if (logger.isDebugEnabled()) 00469 logger.debug("Set Auto commit command"); 00470 vdb.commit(currentTid); 00471 currentTid = 0; 00472 transactionStarted = false; 00473 out.writeObject(Boolean.TRUE); 00474 out.flush(); 00475 break; 00476 case Commands.Rollback : 00477 if (logger.isDebugEnabled()) 00478 logger.debug("Rollback command"); 00479 vdb.rollback(currentTid); 00480 currentTid = vdb.begin(login); 00481 out.writeObject(new Long(currentTid)); 00482 out.flush(); 00483 break; 00484 case Commands.GetVirtualDatabaseName : 00485 if (logger.isDebugEnabled()) 00486 logger.debug("GetVirtualDatabaseName command"); 00487 out.writeObject(vdb.getDatabaseName()); 00488 out.flush(); 00489 break; 00490 case Commands.GetDatabaseProductName : 00491 if (logger.isDebugEnabled()) 00492 logger.debug("GetDatabaseProductName command"); 00493 out.writeObject(vdb.getDatabaseProductName()); 00494 out.flush(); 00495 break; 00496 case Commands.GetControllerVersionNumber : 00497 if (logger.isDebugEnabled()) 00498 logger.debug("GetControllerVersionNumber command"); 00499 out.writeObject(Constants.VERSION); 00500 out.flush(); 00501 break; 00502 case Commands.DatabaseMetaDataGetTables : 00503 if (logger.isDebugEnabled()) 00504 logger.debug("DatabaseMetaDataGetTables command"); 00505 String tcatalog = in.readUTF(); 00506 String tschemaPattern = in.readUTF(); 00507 String ttableNamePattern = in.readUTF(); 00508 String[] ttypes = (String[]) in.readObject(); 00509 out.writeObject(vdb.getMetaData().getTables(tcatalog, 00510 tschemaPattern, ttableNamePattern, ttypes)); 00511 out.flush(); 00512 break; 00513 case Commands.DatabaseMetaDataGetColumns : 00514 if (logger.isDebugEnabled()) 00515 logger.debug("DatabaseMetaDataGetColumns command"); 00516 String ccatalog = in.readUTF(); 00517 String cschemaPattern = in.readUTF(); 00518 String ctableNamePattern = in.readUTF(); 00519 String ccolumnNamePattern = in.readUTF(); 00520 out.writeObject(vdb.getMetaData().getColumns(ccatalog, 00521 cschemaPattern, ctableNamePattern, ccolumnNamePattern)); 00522 out.flush(); 00523 break; 00524 case Commands.DatabaseMetaDataGetPrimaryKeys : 00525 if (logger.isDebugEnabled()) 00526 logger.debug("DatabaseMetaDataGetPrimaryKeys command"); 00527 String pcatalog = in.readUTF(); 00528 String pschemaPattern = in.readUTF(); 00529 String ptableNamePattern = in.readUTF(); 00530 out.writeObject(vdb.getMetaData().getPrimaryKeys(pcatalog, 00531 pschemaPattern, ptableNamePattern)); 00532 out.flush(); 00533 break; 00534 case Commands.DatabaseMetaDataGetProcedures : 00535 if (logger.isDebugEnabled()) 00536 logger.debug("DatabaseMetaDataGetProcedures command"); 00537 String rcatalog = in.readUTF(); 00538 String rschemaPattern = in.readUTF(); 00539 String procedureNamePattern = in.readUTF(); 00540 out.writeObject(vdb.getMetaData().getProcedures(rcatalog, 00541 rschemaPattern, procedureNamePattern)); 00542 out.flush(); 00543 break; 00544 case Commands.DatabaseMetaDataGetProcedureColumns : 00545 if (logger.isDebugEnabled()) 00546 logger.debug("DatabaseMetaDataGetProcedureColumns command"); 00547 String pccatalog = in.readUTF(); 00548 String pcschemaPattern = in.readUTF(); 00549 String pcprocedureNamePattern = in.readUTF(); 00550 String pccolumnNamePattern = in.readUTF(); 00551 out 00552 .writeObject(vdb.getMetaData().getProcedureColumns(pccatalog, 00553 pcschemaPattern, pcprocedureNamePattern, 00554 pccolumnNamePattern)); 00555 out.flush(); 00556 break; 00557 case Commands.DatabaseMetaDataGetCatalogs : 00558 if (logger.isDebugEnabled()) 00559 logger.debug("DatabaseMetaDataGetCatalogs command"); 00560 ArrayList list = controller.listVirtualDatabases(); 00561 out.writeObject(vdb.getMetaData().getCatalogs(list)); 00562 out.flush(); 00563 break; 00564 case Commands.DatabaseMetaDataGetCatalog : 00565 if (logger.isDebugEnabled()) 00566 logger.debug("DatabaseMetaDataGetCatalog command"); 00567 out.writeObject(vdb.getName()); 00568 out.flush(); 00569 break; 00570 case Commands.DatabaseMetaDataGetTableTypes : 00571 if (logger.isDebugEnabled()) 00572 logger.debug("DatabaseMetaDataGetTable Types command"); 00573 out.writeObject(vdb.getMetaData().getTableTypes()); 00574 out.flush(); 00575 break; 00576 case Commands.DatabaseMetaDataGetSchemas : 00577 if (logger.isDebugEnabled()) 00578 logger.debug("DatabaseMetaDataGetSchemas Types command"); 00579 out.writeObject(vdb.getMetaData().getSchemas()); 00580 out.flush(); 00581 break; 00582 case Commands.DatabaseMetaDataGetTablePrivileges : 00583 if (logger.isDebugEnabled()) 00584 logger.debug("DatabaseMetaDataGetTablePrivileges command"); 00585 String tpcatalog = in.readUTF(); 00586 String tpschemaPattern = in.readUTF(); 00587 String tptablePattern = in.readUTF(); 00588 out.writeObject(vdb.getMetaData().getTablePrivileges(tpcatalog, 00589 tpschemaPattern, tptablePattern)); 00590 out.flush(); 00591 break; 00592 case Commands.ConnectionSetCatalog : 00593 if (logger.isDebugEnabled()) 00594 logger.debug("ConnectionSetCatalog command"); 00595 String catalog = in.readUTF(); 00596 boolean change = controller.hasVirtualDatabase(catalog); 00597 if (change) 00598 { 00599 VirtualDatabase tempvdb = controller 00600 .getVirtualDatabase(catalog); 00601 if (!tempvdb.getAuthenticationManager() 00602 .isValidVirtualUser(user)) 00603 out 00604 .writeObject(new SQLException( 00605 "User authentication has failed for asked catalog. No change")); 00606 else 00607 { 00608 this.vdb = tempvdb; 00609 out.writeObject(new Boolean(true)); 00610 } 00611 } 00612 else 00613 out.writeObject(new Boolean(false)); 00614 out.flush(); 00615 break; 00616 case Commands.Close : 00617 if (logger.isDebugEnabled()) 00618 logger.debug("Close command"); 00619 out.writeObject(completed); 00620 out.flush(); 00621 closed = true; 00622 break; 00623 case Commands.Reset : 00624 if (logger.isDebugEnabled()) 00625 logger.debug("Reset command"); 00626 00627 // Do the cleanup 00628 if (transactionStarted && queryExecutedInThisTransaction) 00629 { 00630 if (logger.isDebugEnabled()) 00631 logger.debug("Forcing transaction " + currentTid 00632 + " rollback"); 00633 try 00634 { 00635 vdb.rollback(currentTid); 00636 } 00637 catch (Exception e) 00638 { 00639 if (logger.isDebugEnabled()) 00640 logger.debug("Error during rollback of transaction " 00641 + currentTid + "(" + e + ")"); 00642 } 00643 } 00644 currentTid = 0; 00645 transactionStarted = false; 00646 break; 00647 case Commands.FetchNextResultSetRows : 00648 if (logger.isDebugEnabled()) 00649 logger.debug("FetchNextResultSetRows command"); 00650 00651 String cursorName = in.readUTF(); 00652 int fetchSize = in.readInt(); 00653 ControllerResultSet fetchCrs = (ControllerResultSet) streamedResultSet 00654 .get(cursorName); 00655 if (fetchCrs == null) 00656 { 00657 out.writeObject(new SQLException( 00658 "No valid ControllerResultSet to fetch data from")); 00659 out.flush(); 00660 } 00661 else 00662 { 00663 out.writeObject(fetchCrs.fetchData(fetchSize)); 00664 out.writeBoolean(fetchCrs.hasMoreData()); 00665 out.flush(); 00666 if (!fetchCrs.hasMoreData()) 00667 streamedResultSet.remove(cursorName); 00668 } 00669 break; 00670 case Commands.CloseRemoteResultSet : 00671 if (logger.isDebugEnabled()) 00672 logger.debug("CloseRemoteResultSet command"); 00673 00674 String cursor = in.readUTF(); 00675 ControllerResultSet crsToClose = (ControllerResultSet) streamedResultSet 00676 .remove(cursor); 00677 if (crsToClose == null) 00678 { 00679 out.writeObject(new SQLException( 00680 "No valid ControllerResultSet to close.")); 00681 out.flush(); 00682 } 00683 else 00684 { 00685 crsToClose.closeResultSet(); 00686 out.writeObject(completed); 00687 out.flush(); 00688 } 00689 break; 00690 case Commands.RestoreConnectionState : 00691 if (logger.isDebugEnabled()) 00692 logger.debug("RestoreConnectionState command"); 00693 // We receive autocommit from driver 00694 transactionStarted = !in.readBoolean(); 00695 if (transactionStarted) 00696 currentTid = in.readLong(); 00697 break; 00698 default : 00699 String errorMsg = "Unsupported protocol command: " + command; 00700 logger.error(errorMsg); 00701 out.writeObject(new RuntimeException(errorMsg)); 00702 out.flush(); 00703 break; 00704 } 00705 } 00706 catch (OptionalDataException e) 00707 { 00708 logger.warn("Protocol error (" + e + ")"); 00709 try 00710 { 00711 out.writeObject(e); 00712 out.flush(); 00713 } 00714 catch (IOException ignore) 00715 { 00716 } 00717 } 00718 catch (RemoteException e) 00719 { 00720 logger.warn("Error during command execution (" + e + ")"); 00721 try 00722 { 00723 out.writeObject(e); 00724 out.flush(); 00725 } 00726 catch (IOException ignore) 00727 { 00728 } 00729 } 00730 catch (EOFException e) 00731 { 00732 logger.warn("Client " + login + " closed connection with server", e); 00733 closed = true; 00734 } 00735 catch (SocketException e) 00736 { 00737 // shutting down 00738 closed = true; 00739 } 00740 catch (IOException e) 00741 { 00742 closed = true; 00743 logger.warn("Closing connection with client " + login 00744 + " because of IOException.(" + e + ")"); 00745 } 00746 catch (ClassNotFoundException e) 00747 { 00748 logger.error("Protocol error (" + e + ")"); 00749 try 00750 { 00751 out.writeObject(e); 00752 out.flush(); 00753 } 00754 catch (IOException ignore) 00755 { 00756 } 00757 } 00758 catch (SQLException e) 00759 { 00760 logger 00761 .warn("Error during command execution (" + e.getMessage() + ")"); 00762 try 00763 { 00764 out.writeObject(e); 00765 out.flush(); 00766 } 00767 catch (IOException ignore) 00768 { 00769 } 00770 } 00771 catch (RuntimeException e) 00772 { 00773 logger.warn("Runtime error during command execution (" 00774 + e.getMessage() + ")"); 00775 try 00776 { 00777 out.writeObject(new SQLException(e.getMessage())); 00778 out.flush(); 00779 } 00780 catch (IOException ignore) 00781 { 00782 } 00783 } 00784 } 00785 00786 // Do the cleanup 00787 if (transactionStarted) 00788 { 00789 if (logger.isDebugEnabled()) 00790 logger.debug("Forcing transaction " + currentTid + " rollback"); 00791 try 00792 { 00793 vdb.rollback(currentTid); 00794 } 00795 catch (Exception e) 00796 { 00797 if (logger.isDebugEnabled()) 00798 logger.debug("Error during rollback of transaction " + currentTid 00799 + "(" + e + ")"); 00800 } 00801 } 00802 if (!streamedResultSet.isEmpty()) 00803 { 00804 for (Iterator iter = streamedResultSet.values().iterator(); iter 00805 .hasNext();) 00806 { 00807 ControllerResultSet crs = (ControllerResultSet) iter.next(); 00808 crs.closeResultSet(); 00809 00810 } 00811 streamedResultSet.clear(); 00812 } 00813 try 00814 { 00815 in.close(); 00816 } 00817 catch (IOException ignore) 00818 { 00819 } 00820 try 00821 { 00822 out.close(); 00823 } 00824 catch (IOException ignore) 00825 { 00826 } 00827 } 00828 00829 if (logger.isDebugEnabled()) 00830 logger.debug("VirtualDatabaseWorkerThread associated to login: " 00831 + this.getUser() + " terminating."); 00832 } 00833 00842 private StoredProcedure procedureFromStream(boolean isRead) 00843 throws OptionalDataException, IOException 00844 { 00845 String sql = in.readUTF(); 00846 boolean escape = in.readBoolean(); 00847 String lineSeparator = in.readUTF(); 00848 int timeout = in.readInt(); 00849 StoredProcedure proc = new StoredProcedure(sql, escape, timeout, 00850 lineSeparator); 00851 proc.setIsAutoCommit(in.readBoolean()); 00852 proc.setDriverProcessed(in.readBoolean()); 00853 if (isRead) 00854 { 00855 proc.setMaxRows(in.readInt()); 00856 proc.setFetchSize(in.readInt()); 00857 } 00858 // Does the query has a skeleton ? 00859 if (needSkeleton || !proc.isDriverProcessed()) 00860 if (in.readBoolean()) // is there a non null skeleton 00861 proc.setSqlSkeleton(in.readUTF()); 00862 return proc; 00863 } 00864 00873 private AbstractWriteRequest writeRequestFromStream(boolean withKeys) 00874 throws OptionalDataException, IOException 00875 { 00876 AbstractWriteRequest writeRequest; 00877 int requestType = in.readInt(); 00878 String sql = in.readUTF(); 00879 boolean escape = in.readBoolean(); 00880 String lineSeparator = in.readUTF(); 00881 int timeout = in.readInt(); 00882 switch (requestType) 00883 { 00884 case Commands.CreateRequest : 00885 writeRequest = new CreateRequest(sql, escape, timeout, lineSeparator); 00886 break; 00887 case Commands.AlterRequest : 00888 writeRequest = new AlterRequest(sql, escape, timeout, lineSeparator); 00889 break; 00890 case Commands.DeleteRequest : 00891 writeRequest = new DeleteRequest(sql, escape, timeout, lineSeparator); 00892 break; 00893 case Commands.DropRequest : 00894 writeRequest = new DropRequest(sql, escape, timeout, lineSeparator); 00895 break; 00896 case Commands.InsertRequest : 00897 writeRequest = new InsertRequest(sql, escape, timeout, lineSeparator); 00898 break; 00899 case Commands.UpdateRequest : 00900 writeRequest = new UpdateRequest(sql, escape, timeout, lineSeparator); 00901 break; 00902 default : 00903 throw new IOException("Invalid Write Query Type"); 00904 } 00905 00906 writeRequest.setIsAutoCommit(in.readBoolean()); 00907 writeRequest.setDriverProcessed(in.readBoolean()); 00908 if (withKeys) 00909 { 00910 writeRequest.setMaxRows(in.readInt()); 00911 writeRequest.setFetchSize(in.readInt()); 00912 } 00913 00914 // Does the query has a skeleton ? 00915 if (needSkeleton || !writeRequest.isDriverProcessed()) 00916 if (in.readBoolean()) 00917 writeRequest.setSqlSkeleton(in.readUTF()); 00918 00919 return writeRequest; 00920 } 00921 00929 private SelectRequest readRequestFromStream() throws OptionalDataException, 00930 IOException 00931 { 00932 String sql = in.readUTF(); 00933 boolean escape = in.readBoolean(); 00934 String lineSeparator = in.readUTF(); 00935 int timeout = in.readInt(); 00936 SelectRequest select = new SelectRequest(sql, escape, timeout, 00937 lineSeparator); 00938 select.setIsAutoCommit(in.readBoolean()); 00939 select.setDriverProcessed(in.readBoolean()); 00940 select.setMaxRows(in.readInt()); 00941 select.setFetchSize(in.readInt()); 00942 if (in.readBoolean()) // do we have a cursor name ? 00943 select.setCursorName(in.readUTF()); 00944 // Does the query has a skeleton ? 00945 if (needSkeleton || !select.isDriverProcessed()) 00946 if (in.readBoolean()) // is there a non null skeleton 00947 select.setSqlSkeleton(in.readUTF()); 00948 return select; 00949 } 00950 00956 public String[] retrieveClientData() 00957 { 00958 String[] data = new String[6]; 00959 data[0] = in.getSocket().getInetAddress().getHostName(); 00960 data[1] = in.getSocket().getInetAddress().getHostAddress(); 00961 data[2] = String.valueOf(in.getBytesRead()); 00962 data[3] = String.valueOf(out.getBytesWritten()); 00963 data[4] = String.valueOf(in.getUseCompression()); 00964 data[5] = String 00965 .valueOf(((System.currentTimeMillis() - in.getDateCreated()) / 1000)); 00966 data[6] = String.valueOf(in.getSpeed()); 00967 data[7] = String.valueOf(out.getSpeed()); 00968 return data; 00969 } 00970 00976 public long getBytesRead() 00977 { 00978 return in.getBytesRead(); 00979 } 00980 00986 public long getBytesWritten() 00987 { 00988 return out.getBytesWritten(); 00989 } 00990 00996 public long getTimeActive() 00997 { 00998 return ((System.currentTimeMillis() - in.getDateCreated()) / 1000); 00999 } 01000 01006 public long getReadingSpeed() 01007 { 01008 return in.getSpeed(); 01009 } 01010 01016 public long getWritingSpeed() 01017 { 01018 return out.getSpeed(); 01019 } 01020 01024 public String getUser() 01025 { 01026 return user.getLogin(); 01027 } 01028 }

CJDBCversion1.0.4に対してTue Oct 12 15:16:04 2004に生成されました。 doxygen 1.3.8