00087 {
00088 Socket clientSocket;
00089
00090
if (
serverThread == null)
00091 {
00092
logger.error(Translate.get(
"controller.workerthread.null.serverthread"));
00093
isKilled =
true;
00094 }
00095
else if (
serverThread.controllerServerThreadPendingQueue == null)
00096 {
00097
logger.error(Translate.get(
"controller.workerthread.null.pendingqueue"));
00098
isKilled =
true;
00099 }
00100
00101
00102
while (!
isKilled)
00103 {
00104
if (
serverThread.isShuttingDown())
00105
break;
00106
00107
synchronized (
serverThread.controllerServerThreadPendingQueue)
00108 {
00109
while (
serverThread.controllerServerThreadPendingQueue.isEmpty())
00110 {
00111
00112
serverThread.idleWorkerThreads++;
00113
boolean timeout =
false;
00114
try
00115 {
00116
long before = System.currentTimeMillis();
00117
serverThread.controllerServerThreadPendingQueue
00118 .wait(ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME);
00119
long now = System.currentTimeMillis();
00120
00121 timeout = now - before >= ControllerConstants.DEFAULT_CONTROLLER_WORKER_THREAD_SLEEP_TIME;
00122 }
00123
catch (InterruptedException ignore)
00124 {
00125 }
00126
serverThread.idleWorkerThreads--;
00127
00128
if (
serverThread.controllerServerThreadPendingQueue == null)
00129 {
00130
isKilled =
true;
00131
break;
00132 }
00133
if (timeout
00134 &&
serverThread.controllerServerThreadPendingQueue.isEmpty())
00135 {
00136
00137
isKilled =
true;
00138
break;
00139 }
00140 }
00141
00142
if (
isKilled)
00143
break;
00144
00145
00146 clientSocket = (Socket)
serverThread.controllerServerThreadPendingQueue
00147 .remove(0);
00148 }
00149
00150
if (clientSocket == null)
00151 {
00152
logger.error(Translate.get(
"controller.workerthread.null.socket"));
00153
continue;
00154 }
00155
else if (
logger.isDebugEnabled())
00156
logger.debug(Translate.get(
"controller.workerthread.connection.from",
00157
new String[]{clientSocket.getInetAddress().toString(),
00158 String.valueOf(clientSocket.getPort())}));
00159
00160
try
00161 {
00162
00163
00164 clientSocket.setTcpNoDelay(
true);
00165
00166
00167 CJDBCInputStream in =
new CJDBCInputStream(clientSocket);
00168 CJDBCOutputStream out =
new CJDBCOutputStream(clientSocket);
00169
00170
00171
int version = in.readInt();
00172
if (version != Commands.ProtocolVersion)
00173 {
00174
logger.warn(Translate.get(
00175
"controller.workerthread.protocol.incompatible", version));
00176
continue;
00177 }
00178 String virtualDatabaseName = in.readUTF();
00179
00180
00181 VirtualDatabase vdb =
serverThread.controller
00182 .getVirtualDatabase(virtualDatabaseName);
00183
if (vdb == null)
00184 {
00185 String msg = Translate.get(
"virtualdatabase.not.found",
00186 virtualDatabaseName);
00187
logger.warn(msg);
00188
continue;
00189 }
00190
00191
00192
00193
00194 ArrayList vdbActiveThreads = vdb.getActiveThreads();
00195 ArrayList vdbPendingQueue = vdb.getPendingConnections();
00196
00197
if (vdbActiveThreads == null)
00198 {
00199
logger.error(Translate
00200 .get(
"controller.workerthread.null.active.thread"));
00201
isKilled =
true;
00202 }
00203
if (vdbPendingQueue == null)
00204 {
00205
logger
00206 .error(Translate.get(
"controller.workerthread.null.connection"));
00207
isKilled =
true;
00208 }
00209
00210
00211
boolean tooManyConnections;
00212
synchronized (vdbActiveThreads)
00213 {
00214
while (vdb.getCurrentNbOfThreads() < vdb.getMinNbOfThreads())
00215 {
00216
00217
if (
logger.isDebugEnabled())
00218
logger.debug(Translate
00219 .get(
"controller.workerthread.starting.thread.for.minimum"));
00220 VirtualDatabaseWorkerThread thread =
new VirtualDatabaseWorkerThread(
00221
serverThread.controller, vdb);
00222 vdbActiveThreads.add(thread);
00223 vdb.addCurrentNbOfThread();
00224 thread.start();
00225 }
00226
00227
00228
00229 tooManyConnections = (vdb.getMaxNbOfConnections() > 0)
00230 && vdbActiveThreads.size() + vdbPendingQueue.size() > vdb
00231 .getMaxNbOfConnections();
00232 }
00233
if (tooManyConnections)
00234 {
00235 out.writeObject(
new SQLException(Translate
00236 .get(
"controller.workerthread.too.many.connections")));
00237 out.flush();
00238
continue;
00239 }
00240
00241
00242
synchronized (vdbPendingQueue)
00243 {
00244 vdbPendingQueue.add(in);
00245 vdbPendingQueue.add(out);
00246
00247 clientSocket = null;
00248
synchronized (vdbActiveThreads)
00249 {
00250
if (vdb.getIdleThreads() == 0)
00251 {
00252
if ((vdb.getCurrentNbOfThreads() <= vdb.getMaxNbOfThreads())
00253 || (vdb.getMaxNbOfThreads() == 0))
00254 {
00255
00256
if (
logger.isDebugEnabled())
00257
logger.debug(Translate
00258 .get(
"controller.workerthread.starting.thread"));
00259 VirtualDatabaseWorkerThread thread =
new VirtualDatabaseWorkerThread(
00260
serverThread.controller, vdb);
00261 vdbActiveThreads.add(thread);
00262 vdb.addCurrentNbOfThread();
00263 thread.start();
00264 }
00265
else if (
logger.isInfoEnabled())
00266
logger.info(Translate.get(
00267
"controller.workerthread.maximum.thread", vdb
00268 .getMaxNbOfThreads()));
00269 }
00270
else
00271 {
00272
if (
logger.isDebugEnabled())
00273
logger.debug(Translate
00274 .get(
"controller.workerthread.notify.thread"));
00275
00276
00277
00278
00279
00280 vdbPendingQueue.notifyAll();
00281 }
00282 }
00283 }
00284 }
00285
00286
catch (OptionalDataException e)
00287 {
00288
logger
00289 .error(Translate.get(
"controller.workerthread.protocol.error", e));
00290 }
00291
catch (IOException e)
00292 {
00293
logger.error(Translate.get(
"controller.workerthread.io.error", e));
00294 }
00295 finally
00296 {
00297
try
00298 {
00299
if (clientSocket != null)
00300 {
00301
if (
logger.isDebugEnabled())
00302
logger.debug(Translate
00303 .get(
"controller.workerthread.connection.closing"));
00304 clientSocket.close();
00305 }
00306 }
00307
catch (IOException ignore)
00308 {
00309 }
00310 }
00311 }
00312
00313
if (
logger.isDebugEnabled())
00314
logger.debug(Translate.get(
"controller.workerthread.terminating"));
00315 }