00001
00025
package org.objectweb.cjdbc.controller.virtualdatabase;
00026
00027
import java.io.EOFException;
00028
import java.io.IOException;
00029
import java.io.OptionalDataException;
00030
import java.net.SocketException;
00031
import java.rmi.RemoteException;
00032
import java.sql.SQLException;
00033
import java.util.ArrayList;
00034
import java.util.HashMap;
00035
import java.util.Iterator;
00036
00037
import org.objectweb.cjdbc.common.log.Trace;
00038
import org.objectweb.cjdbc.common.sql.AbstractRequest;
00039
import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00040
import org.objectweb.cjdbc.common.sql.AlterRequest;
00041
import org.objectweb.cjdbc.common.sql.CreateRequest;
00042
import org.objectweb.cjdbc.common.sql.DeleteRequest;
00043
import org.objectweb.cjdbc.common.sql.DropRequest;
00044
import org.objectweb.cjdbc.common.sql.InsertRequest;
00045
import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00046
import org.objectweb.cjdbc.common.sql.SelectRequest;
00047
import org.objectweb.cjdbc.common.sql.StoredProcedure;
00048
import org.objectweb.cjdbc.common.sql.UpdateRequest;
00049
import org.objectweb.cjdbc.common.stream.CJDBCInputStream;
00050
import org.objectweb.cjdbc.common.stream.CJDBCOutputStream;
00051
import org.objectweb.cjdbc.common.users.VirtualDatabaseUser;
00052
import org.objectweb.cjdbc.common.util.Constants;
00053
import org.objectweb.cjdbc.controller.core.Controller;
00054
import org.objectweb.cjdbc.driver.protocol.CommandCompleted;
00055
import org.objectweb.cjdbc.driver.protocol.Commands;
00056
00065 public class VirtualDatabaseWorkerThread extends Thread
00066 {
00067
00068
00069
00070
00071
00072
00073
00074
00075 private static int threadID = 0;
00076
00078 private boolean isKilled =
false;
00079
00081 boolean needSkeleton =
false;
00082
00084 private VirtualDatabase vdb;
00085
00086 private final CommandCompleted
completed =
new CommandCompleted();
00087
00089 private Trace
logger = null;
00090
00091 private CJDBCInputStream
in = null;
00092 private CJDBCOutputStream
out = null;
00093
00094 private VirtualDatabaseUser
user;
00095
00096 private Controller
controller;
00097
00098 private boolean waitForCommand;
00099
00100 private HashMap
streamedResultSet;
00101
00102
00103
00104
00105
00112 public VirtualDatabaseWorkerThread(Controller controller,
VirtualDatabase vdb)
00113 {
00114 super(
"VirtualDatabaseWorkerThread-" + vdb.
getName());
00115
this.vdb = vdb;
00116
this.
controller = controller;
00117
try
00118 {
00119
this.logger = Trace
00120 .getLogger(
"org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabaseWorkerThread."
00121 + vdb.
getName() +
"." + (
threadID++));
00122 }
00123
catch (Exception e)
00124 {
00125
this.logger = vdb.
logger;
00126 }
00127 }
00128
00133 public void shutdown()
00134 {
00135
00136
00137
this.isKilled =
true;
00138
try
00139 {
00140
if (
waitForCommand)
00141 {
00142
00143
in.close();
00144
out.close();
00145 }
00146 }
00147
catch (IOException e)
00148 {
00149
00150
00151 }
00152
00153 }
00154
00165 private boolean setRequestParameters(
AbstractRequest request, String login,
00166
long tid,
boolean transactionStarted)
throws SQLException
00167 {
00168 request.setLogin(login);
00169
if (request.isAutoCommit() && transactionStarted)
00170 {
00171
vdb.
commit(tid);
00172
return false;
00173 }
00174
else
00175 request.setTransactionId(tid);
00176
return transactionStarted;
00177 }
00178
00182 public void run()
00183 {
00184 ArrayList vdbActiveThreads =
vdb.
getActiveThreads();
00185 ArrayList vdbPendingQueue =
vdb.
getPendingConnections();
00186
00187
00188
streamedResultSet =
new HashMap();
00189
boolean isActive =
true;
00190
00191
if (vdbActiveThreads == null)
00192 {
00193
logger
00194 .error(
"Got null active threads queue in VirtualDatabaseWorkerThread");
00195
isKilled =
true;
00196 }
00197
if (vdbPendingQueue == null)
00198 {
00199
logger.error(
"Got null connection queue in VirtualDatabaseWorkerThread");
00200
isKilled =
true;
00201 }
00202
00203
00204
while (!
isKilled)
00205 {
00206
00207
synchronized (vdbPendingQueue)
00208 {
00209
while (vdbPendingQueue.isEmpty())
00210 {
00211
if (!
vdb.
poolConnectionThreads)
00212 {
00213
isKilled =
true;
00214
break;
00215 }
00216
boolean timeout =
false;
00217
try
00218 {
00219
if (isActive)
00220 {
00221 isActive =
false;
00222
00223
synchronized (vdbActiveThreads)
00224 {
00225 vdbActiveThreads.remove(
this);
00226
vdb.
addIdleThread();
00227 }
00228 }
00229
long before = System.currentTimeMillis();
00230 vdbPendingQueue.wait(
vdb.
getMaxThreadIdleTime());
00231
long now = System.currentTimeMillis();
00232
00233 timeout = now - before >=
vdb.
getMaxThreadIdleTime();
00234 }
00235
catch (InterruptedException e)
00236 {
00237
logger.warn(
"VirtualDatabaseWorkerThread wait() interrupted");
00238 }
00239
if (timeout && vdbPendingQueue.isEmpty())
00240 {
00241
if (
vdb.
currentNbOfThreads >
vdb.
minNbOfThreads)
00242 {
00243
isKilled =
true;
00244
break;
00245 }
00246 }
00247 }
00248
00249
if (
isKilled)
00250 {
00251
synchronized (vdbActiveThreads)
00252 {
00253
if (isActive)
00254 {
00255 vdbActiveThreads.remove(
this);
00256
vdb.
removeCurrentNbOfThread();
00257 }
00258
else
00259
vdb.
removeIdleThread();
00260 }
00261
00262
continue;
00263 }
00264
00265
00266
try
00267 {
00268
in = (CJDBCInputStream) vdbPendingQueue.remove(0);
00269
out = (CJDBCOutputStream) vdbPendingQueue.remove(0);
00270 }
00271
catch (Exception e)
00272 {
00273
logger.error(
"Error while getting streams from connection");
00274
continue;
00275 }
00276
00277
synchronized (vdbActiveThreads)
00278 {
00279
if (!isActive)
00280 {
00281
vdb.
removeIdleThread();
00282 isActive =
true;
00283
00284 vdbActiveThreads.add(
this);
00285 }
00286 }
00287 }
00288
00289
00290
00291
boolean success =
false;
00292 String login;
00293
try
00294 {
00295 login =
in.readUTF();
00296 String password =
in.readUTF();
00297
user =
new VirtualDatabaseUser(login, password);
00298
00299
if (
vdb.
getAuthenticationManager().
isValidVirtualUser(
user))
00300 {
00301
needSkeleton =
vdb.
getRequestManager()
00302 .
getRequiredParsingGranularity() !=
ParsingGranularities.NO_PARSING;
00303
out.writeObject(
new Boolean(
needSkeleton));
00304
out.writeUTF(
vdb.
getBlobFilter().
getXml());
00305
out.flush();
00306 success =
true;
00307
00308
if (
logger.isDebugEnabled())
00309
logger.debug(
"Login accepted for " + login);
00310 }
00311
else
00312 {
00313 String msg =
"Authentication failed for user '" + login +
"'";
00314
out.writeObject(
new SQLException(msg));
00315
if (
logger.isDebugEnabled())
00316
logger.debug(msg);
00317
continue;
00318 }
00319 }
00320
catch (OptionalDataException e)
00321 {
00322
logger.error(
"Protocol error while expecting user authentication (" + e
00323 +
")");
00324
continue;
00325 }
00326
catch (IOException e)
00327 {
00328
logger.error(
"I/O error during user authentication (" + e +
")");
00329
continue;
00330 }
00331 finally
00332 {
00333
if (!success)
00334 {
00335
try
00336 {
00337
out.close();
00338
in.close();
00339 }
00340
catch (IOException ignore)
00341 {
00342 }
00343 }
00344 }
00345
00346
00347
long currentTid = 0;
00348
boolean transactionStarted =
false;
00349
boolean queryExecutedInThisTransaction =
false;
00350
boolean closed =
false;
00351
int command;
00352
while (!closed && !
isKilled)
00353 {
00354
try
00355 {
00356
00357
waitForCommand =
true;
00358 command =
in.readInt();
00359
waitForCommand =
false;
00360
00361
00362
switch (command)
00363 {
00364
case Commands.ExecReadRequest :
00365
if (
logger.isDebugEnabled())
00366
logger.debug(
"ExecReadRequest command");
00367
00368
SelectRequest select =
readRequestFromStream();
00369 transactionStarted =
setRequestParameters(select, login,
00370 currentTid, transactionStarted);
00371
00372
00373
if (!transactionStarted)
00374 currentTid = 0;
00375
else
00376 queryExecutedInThisTransaction =
true;
00377
ControllerResultSet crs =
vdb.
execReadRequest(select);
00378
if (crs.
hasMoreData())
00379
streamedResultSet.put(crs.
getCursorName(), crs);
00380
out.writeObject(crs.
getFields());
00381
out.writeObject(crs.
getData());
00382
out.writeBoolean(crs.
hasMoreData());
00383
if (crs.
hasMoreData())
00384
out.writeUTF(crs.
getCursorName());
00385
out.flush();
00386
break;
00387
case Commands.ExecWriteRequest :
00388
if (
logger.isDebugEnabled())
00389
logger.debug(
"ExecWriteRequest command");
00390
AbstractWriteRequest write =
writeRequestFromStream(
false);
00391 transactionStarted =
setRequestParameters(write, login,
00392 currentTid, transactionStarted);
00393
if (!transactionStarted)
00394 currentTid = 0;
00395
else
00396 queryExecutedInThisTransaction =
true;
00397
out.writeObject(
new Integer(
vdb.
execWriteRequest(write)));
00398
out.flush();
00399
break;
00400
case Commands.ExecWriteRequestWithKeys :
00401
if (
logger.isDebugEnabled())
00402
logger.debug(
"ExecWriteRequestWithKeys command");
00403
AbstractWriteRequest writeWithKeys =
writeRequestFromStream(
true);
00404 transactionStarted =
setRequestParameters(writeWithKeys, login,
00405 currentTid, transactionStarted);
00406
if (!transactionStarted)
00407 currentTid = 0;
00408
else
00409 queryExecutedInThisTransaction =
true;
00410
ControllerResultSet keys =
vdb
00411 .
execWriteRequestWithKeys(writeWithKeys);
00412
out.writeObject(keys.
getFields());
00413
out.writeObject(keys.
getData());
00414
out.writeBoolean(keys.
hasMoreData());
00415
if (keys.
hasMoreData())
00416
out.writeUTF(keys.
getCursorName());
00417
out.flush();
00418
break;
00419
case Commands.ExecReadStoredProcedure :
00420
if (
logger.isDebugEnabled())
00421
logger.debug(
"ExecReadStoredProcedure command");
00422
StoredProcedure readProc =
procedureFromStream(
true);
00423 transactionStarted =
setRequestParameters(readProc, login,
00424 currentTid, transactionStarted);
00425
if (!transactionStarted)
00426 currentTid = 0;
00427
else
00428 queryExecutedInThisTransaction =
true;
00429
ControllerResultSet sprs =
vdb.
execReadStoredProcedure(readProc);
00430
out.writeObject(sprs.
getFields());
00431
out.writeObject(sprs.
getData());
00432
out.writeBoolean(sprs.
hasMoreData());
00433
if (sprs.
hasMoreData())
00434
out.writeUTF(sprs.
getCursorName());
00435
out.flush();
00436
break;
00437
case Commands.ExecWriteStoredProcedure :
00438
if (
logger.isDebugEnabled())
00439
logger.debug(
"ExecWriteStoredProcedure command");
00440
StoredProcedure writeProc =
procedureFromStream(
false);
00441 transactionStarted =
setRequestParameters(writeProc, login,
00442 currentTid, transactionStarted);
00443
if (!transactionStarted)
00444 currentTid = 0;
00445
else
00446 queryExecutedInThisTransaction =
true;
00447
out.writeObject(
new Integer(
vdb
00448 .execWriteStoredProcedure(writeProc)));
00449
out.flush();
00450
break;
00451
case Commands.Begin :
00452
if (
logger.isDebugEnabled())
00453
logger.debug(
"Begin command");
00454 currentTid =
vdb.
begin(login);
00455
out.writeObject(
new Long(currentTid));
00456
out.flush();
00457 transactionStarted =
true;
00458
break;
00459
case Commands.Commit :
00460
if (
logger.isDebugEnabled())
00461
logger.debug(
"Commit command");
00462
vdb.
commit(currentTid);
00463 currentTid =
vdb.
begin(login);
00464
out.writeObject(
new Long(currentTid));
00465
out.flush();
00466
break;
00467
case Commands.SetAutoCommit :
00468
if (
logger.isDebugEnabled())
00469
logger.debug(
"Set Auto commit command");
00470
vdb.
commit(currentTid);
00471 currentTid = 0;
00472 transactionStarted =
false;
00473
out.writeObject(Boolean.TRUE);
00474
out.flush();
00475
break;
00476
case Commands.Rollback :
00477
if (
logger.isDebugEnabled())
00478
logger.debug(
"Rollback command");
00479
vdb.
rollback(currentTid);
00480 currentTid =
vdb.
begin(login);
00481
out.writeObject(
new Long(currentTid));
00482
out.flush();
00483
break;
00484
case Commands.GetVirtualDatabaseName :
00485
if (
logger.isDebugEnabled())
00486
logger.debug(
"GetVirtualDatabaseName command");
00487
out.writeObject(
vdb.
getDatabaseName());
00488
out.flush();
00489
break;
00490
case Commands.GetDatabaseProductName :
00491
if (
logger.isDebugEnabled())
00492
logger.debug(
"GetDatabaseProductName command");
00493
out.writeObject(
vdb.
getDatabaseProductName());
00494
out.flush();
00495
break;
00496
case Commands.GetControllerVersionNumber :
00497
if (
logger.isDebugEnabled())
00498
logger.debug(
"GetControllerVersionNumber command");
00499
out.writeObject(
Constants.VERSION);
00500
out.flush();
00501
break;
00502
case Commands.DatabaseMetaDataGetTables :
00503
if (
logger.isDebugEnabled())
00504
logger.debug(
"DatabaseMetaDataGetTables command");
00505 String tcatalog =
in.readUTF();
00506 String tschemaPattern =
in.readUTF();
00507 String ttableNamePattern =
in.readUTF();
00508 String[] ttypes = (String[])
in.readObject();
00509
out.writeObject(
vdb.
getMetaData().
getTables(tcatalog,
00510 tschemaPattern, ttableNamePattern, ttypes));
00511
out.flush();
00512
break;
00513
case Commands.DatabaseMetaDataGetColumns :
00514
if (
logger.isDebugEnabled())
00515
logger.debug(
"DatabaseMetaDataGetColumns command");
00516 String ccatalog =
in.readUTF();
00517 String cschemaPattern =
in.readUTF();
00518 String ctableNamePattern =
in.readUTF();
00519 String ccolumnNamePattern =
in.readUTF();
00520
out.writeObject(
vdb.
getMetaData().
getColumns(ccatalog,
00521 cschemaPattern, ctableNamePattern, ccolumnNamePattern));
00522
out.flush();
00523
break;
00524
case Commands.DatabaseMetaDataGetPrimaryKeys :
00525
if (
logger.isDebugEnabled())
00526
logger.debug(
"DatabaseMetaDataGetPrimaryKeys command");
00527 String pcatalog =
in.readUTF();
00528 String pschemaPattern =
in.readUTF();
00529 String ptableNamePattern =
in.readUTF();
00530
out.writeObject(
vdb.
getMetaData().
getPrimaryKeys(pcatalog,
00531 pschemaPattern, ptableNamePattern));
00532
out.flush();
00533
break;
00534
case Commands.DatabaseMetaDataGetProcedures :
00535
if (
logger.isDebugEnabled())
00536
logger.debug(
"DatabaseMetaDataGetProcedures command");
00537 String rcatalog =
in.readUTF();
00538 String rschemaPattern =
in.readUTF();
00539 String procedureNamePattern =
in.readUTF();
00540
out.writeObject(
vdb.
getMetaData().
getProcedures(rcatalog,
00541 rschemaPattern, procedureNamePattern));
00542
out.flush();
00543
break;
00544
case Commands.DatabaseMetaDataGetProcedureColumns :
00545
if (
logger.isDebugEnabled())
00546
logger.debug(
"DatabaseMetaDataGetProcedureColumns command");
00547 String pccatalog =
in.readUTF();
00548 String pcschemaPattern =
in.readUTF();
00549 String pcprocedureNamePattern =
in.readUTF();
00550 String pccolumnNamePattern =
in.readUTF();
00551
out
00552 .writeObject(
vdb.
getMetaData().
getProcedureColumns(pccatalog,
00553 pcschemaPattern, pcprocedureNamePattern,
00554 pccolumnNamePattern));
00555
out.flush();
00556
break;
00557
case Commands.DatabaseMetaDataGetCatalogs :
00558
if (
logger.isDebugEnabled())
00559
logger.debug(
"DatabaseMetaDataGetCatalogs command");
00560 ArrayList list =
controller.listVirtualDatabases();
00561
out.writeObject(
vdb.
getMetaData().
getCatalogs(list));
00562
out.flush();
00563
break;
00564
case Commands.DatabaseMetaDataGetCatalog :
00565
if (
logger.isDebugEnabled())
00566
logger.debug(
"DatabaseMetaDataGetCatalog command");
00567
out.writeObject(
vdb.
getName());
00568
out.flush();
00569
break;
00570
case Commands.DatabaseMetaDataGetTableTypes :
00571
if (
logger.isDebugEnabled())
00572
logger.debug(
"DatabaseMetaDataGetTable Types command");
00573
out.writeObject(
vdb.
getMetaData().
getTableTypes());
00574
out.flush();
00575
break;
00576
case Commands.DatabaseMetaDataGetSchemas :
00577
if (
logger.isDebugEnabled())
00578
logger.debug(
"DatabaseMetaDataGetSchemas Types command");
00579
out.writeObject(
vdb.
getMetaData().
getSchemas());
00580
out.flush();
00581
break;
00582
case Commands.DatabaseMetaDataGetTablePrivileges :
00583
if (
logger.isDebugEnabled())
00584
logger.debug(
"DatabaseMetaDataGetTablePrivileges command");
00585 String tpcatalog =
in.readUTF();
00586 String tpschemaPattern =
in.readUTF();
00587 String tptablePattern =
in.readUTF();
00588
out.writeObject(
vdb.
getMetaData().
getTablePrivileges(tpcatalog,
00589 tpschemaPattern, tptablePattern));
00590
out.flush();
00591
break;
00592
case Commands.ConnectionSetCatalog :
00593
if (
logger.isDebugEnabled())
00594
logger.debug(
"ConnectionSetCatalog command");
00595 String catalog =
in.readUTF();
00596
boolean change =
controller.hasVirtualDatabase(catalog);
00597
if (change)
00598 {
00599
VirtualDatabase tempvdb =
controller
00600 .getVirtualDatabase(catalog);
00601
if (!tempvdb.
getAuthenticationManager()
00602 .
isValidVirtualUser(
user))
00603
out
00604 .writeObject(
new SQLException(
00605
"User authentication has failed for asked catalog. No change"));
00606
else
00607 {
00608
this.vdb = tempvdb;
00609
out.writeObject(
new Boolean(
true));
00610 }
00611 }
00612
else
00613
out.writeObject(
new Boolean(
false));
00614
out.flush();
00615
break;
00616
case Commands.Close :
00617
if (
logger.isDebugEnabled())
00618
logger.debug(
"Close command");
00619
out.writeObject(
completed);
00620
out.flush();
00621 closed =
true;
00622
break;
00623
case Commands.Reset :
00624
if (
logger.isDebugEnabled())
00625
logger.debug(
"Reset command");
00626
00627
00628
if (transactionStarted && queryExecutedInThisTransaction)
00629 {
00630
if (
logger.isDebugEnabled())
00631
logger.debug(
"Forcing transaction " + currentTid
00632 +
" rollback");
00633
try
00634 {
00635
vdb.
rollback(currentTid);
00636 }
00637
catch (Exception e)
00638 {
00639
if (
logger.isDebugEnabled())
00640
logger.debug(
"Error during rollback of transaction "
00641 + currentTid +
"(" + e +
")");
00642 }
00643 }
00644 currentTid = 0;
00645 transactionStarted =
false;
00646
break;
00647
case Commands.FetchNextResultSetRows :
00648
if (
logger.isDebugEnabled())
00649
logger.debug(
"FetchNextResultSetRows command");
00650
00651 String cursorName =
in.readUTF();
00652
int fetchSize =
in.readInt();
00653
ControllerResultSet fetchCrs = (
ControllerResultSet)
streamedResultSet
00654 .get(cursorName);
00655
if (fetchCrs == null)
00656 {
00657
out.writeObject(
new SQLException(
00658
"No valid ControllerResultSet to fetch data from"));
00659
out.flush();
00660 }
00661
else
00662 {
00663
out.writeObject(fetchCrs.
fetchData(fetchSize));
00664
out.writeBoolean(fetchCrs.
hasMoreData());
00665
out.flush();
00666
if (!fetchCrs.
hasMoreData())
00667
streamedResultSet.remove(cursorName);
00668 }
00669
break;
00670
case Commands.CloseRemoteResultSet :
00671
if (
logger.isDebugEnabled())
00672
logger.debug(
"CloseRemoteResultSet command");
00673
00674 String cursor =
in.readUTF();
00675
ControllerResultSet crsToClose = (
ControllerResultSet)
streamedResultSet
00676 .remove(cursor);
00677
if (crsToClose == null)
00678 {
00679
out.writeObject(
new SQLException(
00680
"No valid ControllerResultSet to close."));
00681
out.flush();
00682 }
00683
else
00684 {
00685 crsToClose.
closeResultSet();
00686
out.writeObject(
completed);
00687
out.flush();
00688 }
00689
break;
00690
case Commands.RestoreConnectionState :
00691
if (
logger.isDebugEnabled())
00692
logger.debug(
"RestoreConnectionState command");
00693
00694 transactionStarted = !
in.readBoolean();
00695
if (transactionStarted)
00696 currentTid =
in.readLong();
00697
break;
00698
default :
00699 String errorMsg =
"Unsupported protocol command: " + command;
00700
logger.error(errorMsg);
00701
out.writeObject(
new RuntimeException(errorMsg));
00702
out.flush();
00703
break;
00704 }
00705 }
00706
catch (OptionalDataException e)
00707 {
00708
logger.warn(
"Protocol error (" + e +
")");
00709
try
00710 {
00711
out.writeObject(e);
00712
out.flush();
00713 }
00714
catch (IOException ignore)
00715 {
00716 }
00717 }
00718
catch (RemoteException e)
00719 {
00720
logger.warn(
"Error during command execution (" + e +
")");
00721
try
00722 {
00723
out.writeObject(e);
00724
out.flush();
00725 }
00726
catch (IOException ignore)
00727 {
00728 }
00729 }
00730
catch (EOFException e)
00731 {
00732
logger.warn(
"Client " + login +
" closed connection with server", e);
00733 closed =
true;
00734 }
00735
catch (SocketException e)
00736 {
00737
00738 closed =
true;
00739 }
00740
catch (IOException e)
00741 {
00742 closed =
true;
00743
logger.warn(
"Closing connection with client " + login
00744 +
" because of IOException.(" + e +
")");
00745 }
00746
catch (ClassNotFoundException e)
00747 {
00748
logger.error(
"Protocol error (" + e +
")");
00749
try
00750 {
00751
out.writeObject(e);
00752
out.flush();
00753 }
00754
catch (IOException ignore)
00755 {
00756 }
00757 }
00758
catch (SQLException e)
00759 {
00760
logger
00761 .warn(
"Error during command execution (" + e.getMessage() +
")");
00762
try
00763 {
00764
out.writeObject(e);
00765
out.flush();
00766 }
00767
catch (IOException ignore)
00768 {
00769 }
00770 }
00771
catch (RuntimeException e)
00772 {
00773
logger.warn(
"Runtime error during command execution ("
00774 + e.getMessage() +
")");
00775
try
00776 {
00777
out.writeObject(
new SQLException(e.getMessage()));
00778
out.flush();
00779 }
00780
catch (IOException ignore)
00781 {
00782 }
00783 }
00784 }
00785
00786
00787
if (transactionStarted)
00788 {
00789
if (
logger.isDebugEnabled())
00790
logger.debug(
"Forcing transaction " + currentTid +
" rollback");
00791
try
00792 {
00793
vdb.
rollback(currentTid);
00794 }
00795
catch (Exception e)
00796 {
00797
if (
logger.isDebugEnabled())
00798
logger.debug(
"Error during rollback of transaction " + currentTid
00799 +
"(" + e +
")");
00800 }
00801 }
00802
if (!
streamedResultSet.isEmpty())
00803 {
00804
for (Iterator iter =
streamedResultSet.values().iterator(); iter
00805 .hasNext();)
00806 {
00807
ControllerResultSet crs = (
ControllerResultSet) iter.next();
00808 crs.
closeResultSet();
00809
00810 }
00811
streamedResultSet.clear();
00812 }
00813
try
00814 {
00815
in.close();
00816 }
00817
catch (IOException ignore)
00818 {
00819 }
00820
try
00821 {
00822
out.close();
00823 }
00824
catch (IOException ignore)
00825 {
00826 }
00827 }
00828
00829
if (
logger.isDebugEnabled())
00830
logger.debug(
"VirtualDatabaseWorkerThread associated to login: "
00831 +
this.
getUser() +
" terminating.");
00832 }
00833
00842 private StoredProcedure procedureFromStream(
boolean isRead)
00843
throws OptionalDataException, IOException
00844 {
00845 String sql =
in.readUTF();
00846
boolean escape =
in.readBoolean();
00847 String lineSeparator =
in.readUTF();
00848
int timeout =
in.readInt();
00849
StoredProcedure proc =
new StoredProcedure(sql, escape, timeout,
00850 lineSeparator);
00851 proc.
setIsAutoCommit(
in.readBoolean());
00852 proc.
setDriverProcessed(
in.readBoolean());
00853
if (isRead)
00854 {
00855 proc.
setMaxRows(
in.readInt());
00856 proc.
setFetchSize(
in.readInt());
00857 }
00858
00859
if (
needSkeleton || !proc.
isDriverProcessed())
00860
if (
in.readBoolean())
00861 proc.
setSqlSkeleton(
in.readUTF());
00862
return proc;
00863 }
00864
00873 private AbstractWriteRequest writeRequestFromStream(
boolean withKeys)
00874
throws OptionalDataException, IOException
00875 {
00876
AbstractWriteRequest writeRequest;
00877
int requestType =
in.readInt();
00878 String sql =
in.readUTF();
00879
boolean escape =
in.readBoolean();
00880 String lineSeparator =
in.readUTF();
00881
int timeout =
in.readInt();
00882
switch (requestType)
00883 {
00884
case Commands.CreateRequest :
00885 writeRequest =
new CreateRequest(sql, escape, timeout, lineSeparator);
00886
break;
00887
case Commands.AlterRequest :
00888 writeRequest =
new AlterRequest(sql, escape, timeout, lineSeparator);
00889
break;
00890
case Commands.DeleteRequest :
00891 writeRequest =
new DeleteRequest(sql, escape, timeout, lineSeparator);
00892
break;
00893
case Commands.DropRequest :
00894 writeRequest =
new DropRequest(sql, escape, timeout, lineSeparator);
00895
break;
00896
case Commands.InsertRequest :
00897 writeRequest =
new InsertRequest(sql, escape, timeout, lineSeparator);
00898
break;
00899
case Commands.UpdateRequest :
00900 writeRequest =
new UpdateRequest(sql, escape, timeout, lineSeparator);
00901
break;
00902
default :
00903
throw new IOException(
"Invalid Write Query Type");
00904 }
00905
00906 writeRequest.
setIsAutoCommit(
in.readBoolean());
00907 writeRequest.
setDriverProcessed(
in.readBoolean());
00908
if (withKeys)
00909 {
00910 writeRequest.
setMaxRows(
in.readInt());
00911 writeRequest.
setFetchSize(
in.readInt());
00912 }
00913
00914
00915
if (
needSkeleton || !writeRequest.
isDriverProcessed())
00916
if (
in.readBoolean())
00917 writeRequest.
setSqlSkeleton(
in.readUTF());
00918
00919
return writeRequest;
00920 }
00921
00929 private SelectRequest readRequestFromStream() throws OptionalDataException,
00930 IOException
00931 {
00932 String sql =
in.readUTF();
00933
boolean escape =
in.readBoolean();
00934 String lineSeparator =
in.readUTF();
00935
int timeout =
in.readInt();
00936
SelectRequest select =
new SelectRequest(sql, escape, timeout,
00937 lineSeparator);
00938 select.
setIsAutoCommit(
in.readBoolean());
00939 select.
setDriverProcessed(
in.readBoolean());
00940 select.
setMaxRows(
in.readInt());
00941 select.
setFetchSize(
in.readInt());
00942
if (
in.readBoolean())
00943 select.
setCursorName(
in.readUTF());
00944
00945
if (
needSkeleton || !select.
isDriverProcessed())
00946
if (
in.readBoolean())
00947 select.
setSqlSkeleton(
in.readUTF());
00948
return select;
00949 }
00950
00956 public String[]
retrieveClientData()
00957 {
00958 String[] data =
new String[6];
00959 data[0] =
in.getSocket().getInetAddress().getHostName();
00960 data[1] =
in.getSocket().getInetAddress().getHostAddress();
00961 data[2] = String.valueOf(
in.getBytesRead());
00962 data[3] = String.valueOf(
out.getBytesWritten());
00963 data[4] = String.valueOf(
in.getUseCompression());
00964 data[5] = String
00965 .valueOf(((System.currentTimeMillis() -
in.getDateCreated()) / 1000));
00966 data[6] = String.valueOf(
in.getSpeed());
00967 data[7] = String.valueOf(
out.getSpeed());
00968
return data;
00969 }
00970
00976 public long getBytesRead()
00977 {
00978
return in.getBytesRead();
00979 }
00980
00986 public long getBytesWritten()
00987 {
00988
return out.getBytesWritten();
00989 }
00990
00996 public long getTimeActive()
00997 {
00998
return ((System.currentTimeMillis() -
in.getDateCreated()) / 1000);
00999 }
01000
01006 public long getReadingSpeed()
01007 {
01008
return in.getSpeed();
01009 }
01010
01016 public long getWritingSpeed()
01017 {
01018
return out.getSpeed();
01019 }
01020
01024 public String
getUser()
01025 {
01026
return user.getLogin();
01027 }
01028 }