00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.virtualdatabase;
00026
00027 import java.io.EOFException;
00028 import java.io.IOException;
00029 import java.io.OptionalDataException;
00030 import java.net.SocketException;
00031 import java.rmi.RemoteException;
00032 import java.sql.SQLException;
00033 import java.util.ArrayList;
00034 import java.util.HashMap;
00035 import java.util.Iterator;
00036
00037 import org.objectweb.cjdbc.common.log.Trace;
00038 import org.objectweb.cjdbc.common.sql.AbstractRequest;
00039 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00040 import org.objectweb.cjdbc.common.sql.AlterRequest;
00041 import org.objectweb.cjdbc.common.sql.CreateRequest;
00042 import org.objectweb.cjdbc.common.sql.DeleteRequest;
00043 import org.objectweb.cjdbc.common.sql.DropRequest;
00044 import org.objectweb.cjdbc.common.sql.InsertRequest;
00045 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00046 import org.objectweb.cjdbc.common.sql.SelectRequest;
00047 import org.objectweb.cjdbc.common.sql.StoredProcedure;
00048 import org.objectweb.cjdbc.common.sql.UpdateRequest;
00049 import org.objectweb.cjdbc.common.sql.metadata.MetadataContainer;
00050 import org.objectweb.cjdbc.common.stream.CJDBCInputStream;
00051 import org.objectweb.cjdbc.common.stream.CJDBCOutputStream;
00052 import org.objectweb.cjdbc.common.users.VirtualDatabaseUser;
00053 import org.objectweb.cjdbc.common.util.Constants;
00054 import org.objectweb.cjdbc.controller.core.Controller;
00055 import org.objectweb.cjdbc.driver.protocol.CommandCompleted;
00056 import org.objectweb.cjdbc.driver.protocol.Commands;
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066 public class VirtualDatabaseWorkerThread extends Thread
00067 {
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077 private boolean isKilled = false;
00078
00079
00080 boolean needSkeleton = false;
00081
00082
00083 private VirtualDatabase vdb;
00084
00085 private final CommandCompleted completed = new CommandCompleted();
00086
00087
00088 private Trace logger = null;
00089
00090 private CJDBCInputStream in = null;
00091 private CJDBCOutputStream out = null;
00092
00093 private VirtualDatabaseUser user;
00094
00095 private Controller controller;
00096
00097 private boolean waitForCommand;
00098
00099 private HashMap streamedResultSet;
00100
00101
00102
00103
00104
00105 private long currentTid;
00106 private boolean transactionStarted;
00107 private boolean queryExecutedInThisTransaction;
00108 private String login;
00109 private boolean closed;
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121 public VirtualDatabaseWorkerThread(Controller controller, VirtualDatabase vdb)
00122 {
00123 super("VirtualDatabaseWorkerThread-" + vdb.getVirtualDatabaseName());
00124 this.vdb = vdb;
00125 this.controller = controller;
00126 try
00127 {
00128 this.logger = Trace
00129 .getLogger("org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread."
00130 + vdb.getVirtualDatabaseName());
00131 }
00132 catch (Exception e)
00133 {
00134 this.logger = vdb.logger;
00135 }
00136 }
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150 private StoredProcedure decodeProcedureFromStream(boolean isRead)
00151 throws OptionalDataException, IOException
00152 {
00153 String sql = in.readUTF();
00154 boolean escape = in.readBoolean();
00155 String lineSeparator = in.readUTF();
00156 int timeout = in.readInt();
00157 StoredProcedure proc = new StoredProcedure(sql, escape, timeout,
00158 lineSeparator);
00159 proc.setIsAutoCommit(in.readBoolean());
00160 proc.setDriverProcessed(in.readBoolean());
00161 if (isRead)
00162 {
00163 proc.setMaxRows(in.readInt());
00164 proc.setFetchSize(in.readInt());
00165 }
00166
00167 if (needSkeleton || !proc.isDriverProcessed())
00168 if (in.readBoolean())
00169 proc.setSqlSkeleton(in.readUTF());
00170 return proc;
00171 }
00172
00173
00174
00175
00176
00177
00178
00179
00180 private SelectRequest decodeReadRequestFromStream()
00181 throws OptionalDataException, IOException
00182 {
00183 String sql = in.readUTF();
00184 boolean escape = in.readBoolean();
00185 String lineSeparator = in.readUTF();
00186 int timeout = in.readInt();
00187 SelectRequest select = new SelectRequest(sql, escape, timeout,
00188 lineSeparator);
00189 select.setIsAutoCommit(in.readBoolean());
00190 select.setDriverProcessed(in.readBoolean());
00191 select.setMaxRows(in.readInt());
00192 select.setFetchSize(in.readInt());
00193 if (in.readBoolean())
00194 select.setCursorName(in.readUTF());
00195
00196 if (needSkeleton || !select.isDriverProcessed())
00197 if (in.readBoolean())
00198 select.setSqlSkeleton(in.readUTF());
00199 return select;
00200 }
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210 private AbstractWriteRequest decodeWriteRequestFromStream(boolean withKeys)
00211 throws OptionalDataException, IOException
00212 {
00213 AbstractWriteRequest writeRequest;
00214 int requestType = in.readInt();
00215 String sql = in.readUTF();
00216 boolean escape = in.readBoolean();
00217 String lineSeparator = in.readUTF();
00218 int timeout = in.readInt();
00219 switch (requestType)
00220 {
00221 case Commands.CreateRequest :
00222 writeRequest = new CreateRequest(sql, escape, timeout, lineSeparator);
00223 break;
00224 case Commands.AlterRequest :
00225 writeRequest = new AlterRequest(sql, escape, timeout, lineSeparator);
00226 break;
00227 case Commands.DeleteRequest :
00228 writeRequest = new DeleteRequest(sql, escape, timeout, lineSeparator);
00229 break;
00230 case Commands.DropRequest :
00231 writeRequest = new DropRequest(sql, escape, timeout, lineSeparator);
00232 break;
00233 case Commands.InsertRequest :
00234 writeRequest = new InsertRequest(sql, escape, timeout, lineSeparator);
00235 break;
00236 case Commands.UpdateRequest :
00237 writeRequest = new UpdateRequest(sql, escape, timeout, lineSeparator);
00238 break;
00239 default :
00240 throw new IOException("Invalid Write Query Type");
00241 }
00242
00243 writeRequest.setIsAutoCommit(in.readBoolean());
00244 writeRequest.setDriverProcessed(in.readBoolean());
00245 if (withKeys)
00246 {
00247 writeRequest.setMaxRows(in.readInt());
00248 writeRequest.setFetchSize(in.readInt());
00249 }
00250
00251
00252 if (needSkeleton || !writeRequest.isDriverProcessed())
00253 if (in.readBoolean())
00254 writeRequest.setSqlSkeleton(in.readUTF());
00255
00256 return writeRequest;
00257 }
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269 private boolean setRequestParameters(AbstractRequest request, String login,
00270 long tid, boolean transactionStarted) throws SQLException
00271 {
00272 request.setLogin(login);
00273 if (request.isAutoCommit() && transactionStarted)
00274 {
00275 vdb.commit(tid);
00276 return false;
00277 }
00278 else
00279 request.setTransactionId(tid);
00280 return transactionStarted;
00281 }
00282
00283
00284
00285
00286 public void run()
00287 {
00288 ArrayList vdbActiveThreads = vdb.getActiveThreads();
00289 ArrayList vdbPendingQueue = vdb.getPendingConnections();
00290
00291
00292 streamedResultSet = new HashMap();
00293 boolean isActive = true;
00294
00295 if (vdbActiveThreads == null)
00296 {
00297 logger
00298 .error("Got null active threads queue in VirtualDatabaseWorkerThread");
00299 isKilled = true;
00300 }
00301 if (vdbPendingQueue == null)
00302 {
00303 logger.error("Got null connection queue in VirtualDatabaseWorkerThread");
00304 isKilled = true;
00305 }
00306
00307
00308 while (!isKilled)
00309 {
00310
00311 synchronized (vdbPendingQueue)
00312 {
00313 while (vdbPendingQueue.isEmpty())
00314 {
00315 if (!vdb.poolConnectionThreads)
00316 {
00317 isKilled = true;
00318 break;
00319 }
00320 boolean timeout = false;
00321 try
00322 {
00323 if (isActive)
00324 {
00325 isActive = false;
00326
00327 synchronized (vdbActiveThreads)
00328 {
00329 vdbActiveThreads.remove(this);
00330 vdb.addIdleThread();
00331 }
00332 }
00333 long before = System.currentTimeMillis();
00334 vdbPendingQueue.wait(vdb.getMaxThreadIdleTime());
00335 long now = System.currentTimeMillis();
00336
00337 timeout = now - before >= vdb.getMaxThreadIdleTime();
00338 }
00339 catch (InterruptedException e)
00340 {
00341 logger.warn("VirtualDatabaseWorkerThread wait() interrupted");
00342 }
00343 if (timeout && vdbPendingQueue.isEmpty())
00344 {
00345 if (vdb.currentNbOfThreads > vdb.minNbOfThreads)
00346 {
00347 isKilled = true;
00348 break;
00349 }
00350 }
00351 }
00352
00353 if (isKilled)
00354 {
00355 synchronized (vdbActiveThreads)
00356 {
00357 if (isActive)
00358 {
00359 vdbActiveThreads.remove(this);
00360 vdb.removeCurrentNbOfThread();
00361 }
00362 else
00363 vdb.removeIdleThread();
00364 }
00365
00366 continue;
00367 }
00368
00369
00370 try
00371 {
00372 in = (CJDBCInputStream) vdbPendingQueue.remove(0);
00373 out = (CJDBCOutputStream) vdbPendingQueue.remove(0);
00374 }
00375 catch (Exception e)
00376 {
00377 logger.error("Error while getting streams from connection");
00378 continue;
00379 }
00380
00381 synchronized (vdbActiveThreads)
00382 {
00383 if (!isActive)
00384 {
00385 vdb.removeIdleThread();
00386 isActive = true;
00387
00388 vdbActiveThreads.add(this);
00389 }
00390 }
00391 }
00392
00393
00394
00395 boolean success = false;
00396 try
00397 {
00398 login = in.readUTF();
00399 String password = in.readUTF();
00400 user = new VirtualDatabaseUser(login, password);
00401
00402 if (vdb.getAuthenticationManager().isValidVirtualUser(user))
00403 {
00404 needSkeleton = vdb.getRequestManager()
00405 .getRequiredParsingGranularity() != ParsingGranularities.NO_PARSING;
00406 out.writeObject(needSkeleton ? Boolean.TRUE : Boolean.FALSE);
00407 out.writeUTF(vdb.getBlobFilter().getXml());
00408 out.flush();
00409 success = true;
00410
00411 if (logger.isDebugEnabled())
00412 logger.debug("Login accepted for " + login);
00413 }
00414 else
00415 {
00416 String msg = "Authentication failed for user '" + login + "'";
00417 out.writeObject(new SQLException(msg));
00418 if (logger.isDebugEnabled())
00419 logger.debug(msg);
00420 continue;
00421 }
00422 }
00423 catch (OptionalDataException e)
00424 {
00425 logger.error("Protocol error while expecting user authentication (" + e
00426 + ")");
00427 continue;
00428 }
00429 catch (IOException e)
00430 {
00431 logger.error("I/O error during user authentication (" + e + ")");
00432 continue;
00433 }
00434 finally
00435 {
00436 if (!success)
00437 {
00438 try
00439 {
00440 out.close();
00441 in.close();
00442 }
00443 catch (IOException ignore)
00444 {
00445 }
00446 }
00447 }
00448
00449 currentTid = 0;
00450 transactionStarted = false;
00451 queryExecutedInThisTransaction = false;
00452 closed = false;
00453 int command;
00454 while (!closed && !isKilled)
00455 {
00456 try
00457 {
00458
00459 waitForCommand = true;
00460 command = in.readInt();
00461 waitForCommand = false;
00462
00463
00464 switch (command)
00465 {
00466 case Commands.ExecReadRequest :
00467 execReadRequest();
00468 break;
00469 case Commands.ExecWriteRequest :
00470 execWriteRequest();
00471 break;
00472 case Commands.ExecWriteRequestWithKeys :
00473 execWriteRequestWithKeys();
00474 break;
00475 case Commands.ExecReadStoredProcedure :
00476 execReadStoredProcedure();
00477 break;
00478 case Commands.ExecWriteStoredProcedure :
00479 execWriteStoredProcedure();
00480 break;
00481 case Commands.Begin :
00482 begin();
00483 break;
00484 case Commands.Commit :
00485 commit();
00486 break;
00487 case Commands.SetAutoCommit :
00488 setAutoCommit();
00489 break;
00490 case Commands.Rollback :
00491 rollback();
00492 break;
00493 case Commands.GetVirtualDatabaseName :
00494 getVirtualDatabaseName();
00495 break;
00496 case Commands.DatabaseMetaDataGetDatabaseProductName :
00497 databaseMetaDataGetDatabaseProductName();
00498 break;
00499 case Commands.GetControllerVersionNumber :
00500 getControllerVersionNumber();
00501 break;
00502 case Commands.DatabaseMetaDataGetTables :
00503 databaseMetaDataGetTables();
00504 break;
00505 case Commands.DatabaseMetaDataGetColumns :
00506 databaseMetaDataGetColumns();
00507 break;
00508 case Commands.DatabaseMetaDataGetPrimaryKeys :
00509 databaseMetaDataGetPrimaryKeys();
00510 break;
00511 case Commands.DatabaseMetaDataGetProcedures :
00512 databaseMetaDataGetProcedures();
00513 break;
00514 case Commands.DatabaseMetaDataGetProcedureColumns :
00515 databaseMetaDataGetProcedureColumns();
00516 break;
00517 case Commands.ConnectionGetCatalogs :
00518 connectionGetCatalogs();
00519 break;
00520 case Commands.ConnectionGetCatalog :
00521 connectionGetCatalog();
00522 break;
00523 case Commands.DatabaseMetaDataGetTableTypes :
00524 databaseMetaDataGetTableTypes();
00525 break;
00526 case Commands.DatabaseMetaDataGetSchemas :
00527 databaseMetaDataGetSchemas();
00528 break;
00529 case Commands.DatabaseMetaDataGetTablePrivileges :
00530 databaseMetaDataGetTablePrivileges();
00531 break;
00532 case Commands.ConnectionSetCatalog :
00533 connectionSetCatalog();
00534 break;
00535 case Commands.Close :
00536 close();
00537 break;
00538 case Commands.Reset :
00539 reset();
00540 break;
00541 case Commands.FetchNextResultSetRows :
00542 fetchNextResultSetRows();
00543 break;
00544 case Commands.CloseRemoteResultSet :
00545 closeRemoteResultSet();
00546 break;
00547 case Commands.DatabaseStaticMetadata :
00548 databaseStaticMetadata();
00549 break;
00550 case Commands.RestoreConnectionState :
00551 restoreConnectionState();
00552 break;
00553 default :
00554 String errorMsg = "Unsupported protocol command: " + command;
00555 logger.error(errorMsg);
00556 out.writeObject(new RuntimeException(errorMsg));
00557 out.flush();
00558 break;
00559 }
00560 }
00561 catch (OptionalDataException e)
00562 {
00563 logger.warn("Protocol error (" + e + ")");
00564 try
00565 {
00566 out.writeObject(e);
00567 out.flush();
00568 }
00569 catch (IOException ignore)
00570 {
00571 }
00572 }
00573 catch (RemoteException e)
00574 {
00575 logger.warn("Error during command execution (" + e + ")");
00576 try
00577 {
00578 out.writeObject(e);
00579 out.flush();
00580 }
00581 catch (IOException ignore)
00582 {
00583 }
00584 }
00585 catch (EOFException e)
00586 {
00587 logger.warn("Client (login:" + login + ",host:"
00588 + in.getSocket().getInetAddress().getHostName()
00589 + " closed connection with server");
00590 closed = true;
00591 }
00592 catch (SocketException e)
00593 {
00594
00595 closed = true;
00596 }
00597 catch (IOException e)
00598 {
00599 closed = true;
00600 logger.warn("Closing connection with client " + login
00601 + " because of IOException.(" + e + ")");
00602 }
00603 catch (ClassNotFoundException e)
00604 {
00605 logger.error("Protocol error (" + e + ")");
00606 try
00607 {
00608 out.writeObject(e);
00609 out.flush();
00610 }
00611 catch (IOException ignore)
00612 {
00613 }
00614 }
00615 catch (SQLException e)
00616 {
00617 logger
00618 .warn("Error during command execution (" + e.getMessage() + ")");
00619 try
00620 {
00621 out.writeObject(e);
00622 out.flush();
00623 }
00624 catch (IOException ignore)
00625 {
00626 }
00627 }
00628 catch (RuntimeException e)
00629 {
00630 logger.warn("Runtime error during command execution ("
00631 + e.getMessage() + ")", e);
00632 try
00633 {
00634 out.writeObject(new SQLException(e.getMessage()));
00635 out.flush();
00636 }
00637 catch (IOException ignore)
00638 {
00639 }
00640 }
00641 }
00642
00643
00644 if (transactionStarted)
00645 {
00646 if (logger.isDebugEnabled())
00647 logger.debug("Forcing transaction " + currentTid + " rollback");
00648 try
00649 {
00650 vdb.rollback(currentTid);
00651 }
00652 catch (Exception e)
00653 {
00654 if (logger.isDebugEnabled())
00655 logger.debug("Error during rollback of transaction " + currentTid
00656 + "(" + e + ")");
00657 }
00658 }
00659 if (!streamedResultSet.isEmpty())
00660 {
00661 for (Iterator iter = streamedResultSet.values().iterator(); iter
00662 .hasNext();)
00663 {
00664 ControllerResultSet crs = (ControllerResultSet) iter.next();
00665 crs.closeResultSet();
00666 }
00667 streamedResultSet.clear();
00668 }
00669 try
00670 {
00671 in.close();
00672 }
00673 catch (IOException ignore)
00674 {
00675 }
00676 try
00677 {
00678 out.close();
00679 }
00680 catch (IOException ignore)
00681 {
00682 }
00683 }
00684
00685 if (logger.isDebugEnabled())
00686 logger.debug("VirtualDatabaseWorkerThread associated to login: "
00687 + this.getUser() + " terminating.");
00688 }
00689
00690
00691
00692
00693
00694 private void close() throws IOException
00695 {
00696 if (logger.isDebugEnabled())
00697 logger.debug("Close command");
00698 out.writeObject(completed);
00699 out.flush();
00700 closed = true;
00701 }
00702
00703 private void closeRemoteResultSet() throws IOException
00704 {
00705 if (logger.isDebugEnabled())
00706 logger.debug("CloseRemoteResultSet command");
00707
00708 String cursor = in.readUTF();
00709 ControllerResultSet crsToClose = (ControllerResultSet) streamedResultSet
00710 .remove(cursor);
00711 if (crsToClose == null)
00712 {
00713 out
00714 .writeObject(new SQLException(
00715 "No valid ControllerResultSet to close."));
00716 out.flush();
00717 }
00718 else
00719 {
00720 crsToClose.closeResultSet();
00721 out.writeObject(completed);
00722 out.flush();
00723 }
00724 }
00725
00726 private void reset()
00727 {
00728
00729
00730 if (logger.isDebugEnabled())
00731 logger.debug("Reset command");
00732
00733
00734 if (transactionStarted)
00735 {
00736 if (queryExecutedInThisTransaction)
00737 {
00738 if (logger.isDebugEnabled())
00739 logger.debug("Forcing transaction " + currentTid + " rollback");
00740 try
00741 {
00742 vdb.rollback(currentTid);
00743 }
00744 catch (Exception e)
00745 {
00746 if (logger.isDebugEnabled())
00747 logger.debug("Error during rollback of transaction " + currentTid
00748 + "(" + e + ")");
00749 }
00750 }
00751 else
00752 {
00753
00754 if (logger.isDebugEnabled())
00755 logger.debug("Aborting transaction " + currentTid);
00756 try
00757 {
00758 vdb.abort(currentTid);
00759 }
00760 catch (Exception e)
00761 {
00762 if (logger.isDebugEnabled())
00763 logger.debug("Error while aborting transaction " + currentTid + "("
00764 + e + ")", e);
00765 }
00766 }
00767 currentTid = 0;
00768 transactionStarted = false;
00769 }
00770 }
00771
00772 private void restoreConnectionState() throws IOException
00773 {
00774 if (logger.isDebugEnabled())
00775 logger.debug("RestoreConnectionState command");
00776
00777 transactionStarted = !in.readBoolean();
00778 if (transactionStarted)
00779 currentTid = in.readLong();
00780 }
00781
00782
00783
00784
00785
00786 private void connectionSetCatalog() throws IOException
00787 {
00788 if (logger.isDebugEnabled())
00789 logger.debug("ConnectionSetCatalog command");
00790 String catalog = in.readUTF();
00791 boolean change = controller.hasVirtualDatabase(catalog);
00792 if (change)
00793 {
00794 VirtualDatabase tempvdb = controller.getVirtualDatabase(catalog);
00795 if (!tempvdb.getAuthenticationManager().isValidVirtualUser(user))
00796 out.writeObject(new SQLException(
00797 "User authentication has failed for asked catalog. No change"));
00798 else
00799 {
00800 this.vdb = tempvdb;
00801 out.writeObject(Boolean.TRUE);
00802 }
00803 }
00804 else
00805 out.writeObject(Boolean.FALSE);
00806 out.flush();
00807 }
00808
00809 private void connectionGetCatalog() throws IOException
00810 {
00811 if (logger.isDebugEnabled())
00812 logger.debug("ConnectionGetCatalog command");
00813 out.writeObject(vdb.getVirtualDatabaseName());
00814 out.flush();
00815 }
00816
00817 private void connectionGetCatalogs() throws IOException
00818 {
00819 if (logger.isDebugEnabled())
00820 logger.debug("ConnectionGetCatalogs command");
00821 ArrayList list = controller.getVirtualDatabaseNames();
00822 out.writeObject(vdb.getDynamicMetaData().getCatalogs(list));
00823 out.flush();
00824 }
00825
00826
00827
00828
00829
00830 private void databaseMetaDataGetColumns() throws IOException
00831 {
00832 if (logger.isDebugEnabled())
00833 logger.debug("DatabaseMetaDataGetColumns command");
00834 String ccatalog = in.readUTF();
00835 String cschemaPattern = in.readUTF();
00836 String ctableNamePattern = in.readUTF();
00837 String ccolumnNamePattern = in.readUTF();
00838 out.writeObject(vdb.getDynamicMetaData().getColumns(ccatalog,
00839 cschemaPattern, ctableNamePattern, ccolumnNamePattern));
00840 out.flush();
00841 }
00842
00843 private void databaseMetaDataGetDatabaseProductName() throws IOException
00844 {
00845 if (logger.isDebugEnabled())
00846 logger.debug("GetDatabaseProductName command");
00847 out.writeObject(vdb.getDatabaseProductName());
00848 out.flush();
00849 }
00850
00851 private void databaseMetaDataGetPrimaryKeys() throws IOException
00852 {
00853 if (logger.isDebugEnabled())
00854 logger.debug("DatabaseMetaDataGetPrimaryKeys command");
00855 String pcatalog = in.readUTF();
00856 String pschemaPattern = in.readUTF();
00857 String ptableNamePattern = in.readUTF();
00858 out.writeObject(vdb.getDynamicMetaData().getPrimaryKeys(pcatalog,
00859 pschemaPattern, ptableNamePattern));
00860 out.flush();
00861 }
00862
00863 private void databaseMetaDataGetProcedureColumns() throws IOException
00864 {
00865 if (logger.isDebugEnabled())
00866 logger.debug("DatabaseMetaDataGetProcedureColumns command");
00867 String pccatalog = in.readUTF();
00868 String pcschemaPattern = in.readUTF();
00869 String pcprocedureNamePattern = in.readUTF();
00870 String pccolumnNamePattern = in.readUTF();
00871 out.writeObject(vdb.getDynamicMetaData().getProcedureColumns(pccatalog,
00872 pcschemaPattern, pcprocedureNamePattern, pccolumnNamePattern));
00873 out.flush();
00874 }
00875
00876 private void databaseMetaDataGetProcedures() throws IOException
00877 {
00878 if (logger.isDebugEnabled())
00879 logger.debug("DatabaseMetaDataGetProcedures command");
00880 String rcatalog = in.readUTF();
00881 String rschemaPattern = in.readUTF();
00882 String procedureNamePattern = in.readUTF();
00883 out.writeObject(vdb.getDynamicMetaData().getProcedures(rcatalog,
00884 rschemaPattern, procedureNamePattern));
00885 out.flush();
00886 }
00887
00888 private void databaseMetaDataGetSchemas() throws IOException
00889 {
00890 if (logger.isDebugEnabled())
00891 logger.debug("DatabaseMetaDataGetSchemas Types command");
00892 out.writeObject(vdb.getDynamicMetaData().getSchemas());
00893 out.flush();
00894 }
00895
00896 private void databaseMetaDataGetTablePrivileges() throws IOException
00897 {
00898 if (logger.isDebugEnabled())
00899 logger.debug("DatabaseMetaDataGetTablePrivileges command");
00900 String tpcatalog = in.readUTF();
00901 String tpschemaPattern = in.readUTF();
00902 String tptablePattern = in.readUTF();
00903 out.writeObject(vdb.getDynamicMetaData().getTablePrivileges(tpcatalog,
00904 tpschemaPattern, tptablePattern));
00905 out.flush();
00906 }
00907
00908 private void databaseMetaDataGetTables() throws IOException,
00909 ClassNotFoundException, OptionalDataException
00910 {
00911 if (logger.isDebugEnabled())
00912 logger.debug("DatabaseMetaDataGetTables command");
00913 String tcatalog = in.readUTF();
00914 String tschemaPattern = in.readUTF();
00915 String ttableNamePattern = in.readUTF();
00916 String[] ttypes = (String[]) in.readObject();
00917 out.writeObject(vdb.getDynamicMetaData().getTables(tcatalog,
00918 tschemaPattern, ttableNamePattern, ttypes));
00919 out.flush();
00920 }
00921
00922 private void databaseMetaDataGetTableTypes() throws IOException
00923 {
00924 if (logger.isDebugEnabled())
00925 logger.debug("DatabaseMetaDataGetTable Types command");
00926 out.writeObject(vdb.getDynamicMetaData().getTableTypes());
00927 out.flush();
00928 }
00929
00930 private void databaseStaticMetadata() throws IOException
00931 {
00932 String key = in.readUTF();
00933 if (logger.isDebugEnabled())
00934 logger.debug("DatabaseStaticMetadata command for " + key);
00935 MetadataContainer container = vdb.getStaticMetaData()
00936 .getMetadataContainer();
00937 if (container == null)
00938 {
00939 String msg = "No metadata is available probably because no backend is enabled on that controller.";
00940 logger.info(msg);
00941 out.writeObject(new SQLException(msg));
00942 }
00943 else
00944 out.writeObject(container.get(key));
00945 out.flush();
00946 }
00947
00948 private void getControllerVersionNumber() throws IOException
00949 {
00950 if (logger.isDebugEnabled())
00951 logger.debug("GetControllerVersionNumber command");
00952 out.writeObject(Constants.VERSION);
00953 out.flush();
00954 }
00955
00956 private void getVirtualDatabaseName() throws IOException
00957 {
00958 if (logger.isDebugEnabled())
00959 logger.debug("GetVirtualDatabaseName command");
00960 out.writeObject(vdb.getDatabaseName());
00961 out.flush();
00962 }
00963
00964
00965
00966
00967
00968 private void begin() throws SQLException, IOException
00969 {
00970 if (logger.isDebugEnabled())
00971 logger.debug("Begin command");
00972 currentTid = vdb.begin(login);
00973 out.writeObject(new Long(currentTid));
00974 out.flush();
00975 transactionStarted = true;
00976 queryExecutedInThisTransaction = false;
00977 }
00978
00979 private void commit() throws SQLException, IOException
00980 {
00981 if (logger.isDebugEnabled())
00982 logger.debug("Commit command");
00983 vdb.commit(currentTid);
00984 currentTid = vdb.begin(login);
00985 out.writeObject(new Long(currentTid));
00986 out.flush();
00987 queryExecutedInThisTransaction = false;
00988 }
00989
00990 private void rollback() throws SQLException, IOException
00991 {
00992 if (logger.isDebugEnabled())
00993 logger.debug("Rollback command");
00994 vdb.rollback(currentTid);
00995 currentTid = vdb.begin(login);
00996 out.writeObject(new Long(currentTid));
00997 out.flush();
00998 queryExecutedInThisTransaction = false;
00999 }
01000
01001 private void setAutoCommit() throws SQLException, IOException
01002 {
01003 if (logger.isDebugEnabled())
01004 logger.debug("Set Auto commit command");
01005 vdb.commit(currentTid);
01006 currentTid = 0;
01007 transactionStarted = false;
01008 out.writeObject(Boolean.TRUE);
01009 out.flush();
01010 }
01011
01012
01013
01014
01015
01016 private void execReadRequest() throws OptionalDataException, IOException,
01017 SQLException
01018 {
01019 if (logger.isDebugEnabled())
01020 logger.debug("ExecReadRequest command");
01021 SelectRequest select = decodeReadRequestFromStream();
01022 transactionStarted = setRequestParameters(select, login, currentTid,
01023 transactionStarted);
01024 if (!transactionStarted)
01025 currentTid = 0;
01026 else
01027 queryExecutedInThisTransaction = true;
01028 ControllerResultSet crs = vdb.execReadRequest(select);
01029 out.writeObject(crs.getFields());
01030 out.writeObject(crs.getData());
01031 out.writeBoolean(crs.hasMoreData());
01032 if (crs.hasMoreData())
01033 {
01034 out.writeUTF(crs.getCursorName());
01035 System.out.println("adding streamedresultset");
01036
01037 streamedResultSet.put(crs.getCursorName(), crs);
01038 }
01039 else
01040 crs = null;
01041 out.flush();
01042 }
01043
01044 private void execReadStoredProcedure() throws OptionalDataException,
01045 IOException, SQLException
01046 {
01047 if (logger.isDebugEnabled())
01048 logger.debug("ExecReadStoredProcedure command");
01049 StoredProcedure readProc = decodeProcedureFromStream(true);
01050 transactionStarted = setRequestParameters(readProc, login, currentTid,
01051 transactionStarted);
01052 if (!transactionStarted)
01053 currentTid = 0;
01054 else
01055 queryExecutedInThisTransaction = true;
01056 ControllerResultSet sprs = vdb.execReadStoredProcedure(readProc);
01057 out.writeObject(sprs.getFields());
01058 out.writeObject(sprs.getData());
01059 out.writeBoolean(sprs.hasMoreData());
01060 if (sprs.hasMoreData())
01061 {
01062 out.writeUTF(sprs.getCursorName());
01063 System.out.println("adding streamedresultset");
01064
01065 streamedResultSet.put(sprs.getCursorName(), sprs);
01066 }
01067 else
01068 sprs = null;
01069 readProc = null;
01070 out.flush();
01071 }
01072
01073 private void execWriteRequest() throws OptionalDataException, IOException,
01074 SQLException
01075 {
01076 if (logger.isDebugEnabled())
01077 logger.debug("ExecWriteRequest command");
01078 AbstractWriteRequest write = decodeWriteRequestFromStream(false);
01079 transactionStarted = setRequestParameters(write, login, currentTid,
01080 transactionStarted);
01081 if (!transactionStarted)
01082 currentTid = 0;
01083 else
01084 queryExecutedInThisTransaction = true;
01085 out.writeObject(new Integer(vdb.execWriteRequest(write)));
01086 write = null;
01087 out.flush();
01088 }
01089
01090 private void execWriteRequestWithKeys() throws OptionalDataException,
01091 IOException, SQLException
01092 {
01093 if (logger.isDebugEnabled())
01094 logger.debug("ExecWriteRequestWithKeys command");
01095 AbstractWriteRequest writeWithKeys = decodeWriteRequestFromStream(true);
01096 transactionStarted = setRequestParameters(writeWithKeys, login, currentTid,
01097 transactionStarted);
01098 if (!transactionStarted)
01099 currentTid = 0;
01100 else
01101 queryExecutedInThisTransaction = true;
01102 ControllerResultSet keys = vdb.execWriteRequestWithKeys(writeWithKeys);
01103 out.writeObject(keys.getFields());
01104 out.writeObject(keys.getData());
01105 out.writeBoolean(keys.hasMoreData());
01106 if (keys.hasMoreData())
01107 {
01108 out.writeUTF(keys.getCursorName());
01109 System.out.println("adding streamedresultset");
01110
01111 streamedResultSet.put(keys.getCursorName(), keys);
01112 }
01113 else
01114 keys = null;
01115 writeWithKeys = null;
01116 out.flush();
01117 }
01118
01119 private void execWriteStoredProcedure() throws OptionalDataException,
01120 IOException, SQLException
01121 {
01122 if (logger.isDebugEnabled())
01123 logger.debug("ExecWriteStoredProcedure command");
01124 StoredProcedure writeProc = decodeProcedureFromStream(false);
01125 transactionStarted = setRequestParameters(writeProc, login, currentTid,
01126 transactionStarted);
01127 if (!transactionStarted)
01128 currentTid = 0;
01129 else
01130 queryExecutedInThisTransaction = true;
01131 out.writeObject(new Integer(vdb.execWriteStoredProcedure(writeProc)));
01132 writeProc = null;
01133 out.flush();
01134 }
01135
01136 private void fetchNextResultSetRows() throws IOException, SQLException
01137 {
01138 if (logger.isDebugEnabled())
01139 logger.debug("FetchNextResultSetRows command");
01140
01141 String cursorName = in.readUTF();
01142 int fetchSize = in.readInt();
01143 ControllerResultSet fetchCrs = (ControllerResultSet) streamedResultSet
01144 .get(cursorName);
01145 if (fetchCrs == null)
01146 {
01147 out.writeObject(new SQLException(
01148 "No valid ControllerResultSet to fetch data from"));
01149 out.flush();
01150 }
01151 else
01152 {
01153 out.writeObject(fetchCrs.fetchData(fetchSize));
01154 out.writeBoolean(fetchCrs.hasMoreData());
01155 out.flush();
01156 if (!fetchCrs.hasMoreData())
01157 streamedResultSet.remove(cursorName);
01158 }
01159 }
01160
01161
01162
01163
01164
01165
01166
01167
01168
01169
01170 public String[] retrieveClientData()
01171 {
01172 String[] data = new String[6];
01173 data[0] = in.getSocket().getInetAddress().getHostName();
01174 data[1] = in.getSocket().getInetAddress().getHostAddress();
01175 data[2] = String.valueOf(in.getBytesRead());
01176 data[3] = String.valueOf(out.getBytesWritten());
01177 data[4] = String.valueOf(in.getUseCompression());
01178 data[5] = String
01179 .valueOf(((System.currentTimeMillis() - in.getDateCreated()) / 1000));
01180 data[6] = String.valueOf(in.getSpeed());
01181 data[7] = String.valueOf(out.getSpeed());
01182 return data;
01183 }
01184
01185
01186
01187
01188
01189
01190 public long getBytesRead()
01191 {
01192 return in.getBytesRead();
01193 }
01194
01195
01196
01197
01198
01199
01200 public long getBytesWritten()
01201 {
01202 return out.getBytesWritten();
01203 }
01204
01205
01206
01207
01208
01209
01210 public long getTimeActive()
01211 {
01212 return ((System.currentTimeMillis() - in.getDateCreated()) / 1000);
01213 }
01214
01215
01216
01217
01218
01219
01220 public long getReadingSpeed()
01221 {
01222 return in.getSpeed();
01223 }
01224
01225
01226
01227
01228
01229
01230 public long getWritingSpeed()
01231 {
01232 return out.getSpeed();
01233 }
01234
01235
01236
01237
01238 public String getUser()
01239 {
01240 return user.getLogin();
01241 }
01242
01243
01244
01245
01246
01247 public void shutdown()
01248 {
01249
01250
01251 this.isKilled = true;
01252 try
01253 {
01254 if (waitForCommand)
01255 {
01256
01257 in.close();
01258 out.close();
01259 }
01260 }
01261 catch (IOException e)
01262 {
01263
01264
01265 }
01266 }
01267
01268 }