00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 package org.objectweb.cjdbc.controller.scheduler.raidb1;
00026
00027 import java.sql.SQLException;
00028 import java.util.ArrayList;
00029
00030 import org.objectweb.cjdbc.common.exceptions.RollbackException;
00031 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00032 import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00033 import org.objectweb.cjdbc.common.sql.SelectRequest;
00034 import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema;
00035 import org.objectweb.cjdbc.common.sql.schema.DatabaseTable;
00036 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00037 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels;
00038 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00039 import org.objectweb.cjdbc.controller.scheduler.schema.SchedulerDatabaseSchema;
00040 import org.objectweb.cjdbc.controller.scheduler.schema.SchedulerDatabaseTable;
00041 import org.objectweb.cjdbc.controller.scheduler.schema.TransactionExclusiveLock;
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054 public class RAIDb1OptimisticTransactionLevelScheduler
00055 extends
00056 AbstractScheduler
00057 {
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069 private long requestId;
00070 private SchedulerDatabaseSchema schedulerDatabaseSchema = null;
00071
00072
00073
00074
00075
00076
00077
00078
00079 public RAIDb1OptimisticTransactionLevelScheduler()
00080 {
00081 super(RAIDbLevels.RAIDb1, ParsingGranularities.TABLE);
00082 requestId = 0;
00083 }
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097 public synchronized void setDatabaseSchema(DatabaseSchema dbs)
00098 {
00099 if (schedulerDatabaseSchema == null)
00100 {
00101 logger.info("Setting new database schema");
00102 schedulerDatabaseSchema = new SchedulerDatabaseSchema(dbs);
00103 }
00104 else
00105 {
00106 SchedulerDatabaseSchema newSchema = new SchedulerDatabaseSchema(dbs);
00107 ArrayList tables = schedulerDatabaseSchema.getTables();
00108 ArrayList newTables = newSchema.getTables();
00109 if (newTables == null)
00110 {
00111 logger.info("Removing all tables.");
00112 schedulerDatabaseSchema = null;
00113 return;
00114 }
00115
00116
00117 for (int i = 0; i < tables.size(); i++)
00118 {
00119 SchedulerDatabaseTable t = (SchedulerDatabaseTable) tables.get(i);
00120 if (!newSchema.hasTable(t.getName()))
00121 {
00122 schedulerDatabaseSchema.removeTable(t);
00123 if (logger.isInfoEnabled())
00124 logger.info("Removing table " + t.getName());
00125 }
00126 }
00127
00128
00129 int size = newTables.size();
00130 for (int i = 0; i < size; i++)
00131 {
00132 SchedulerDatabaseTable t = (SchedulerDatabaseTable) newTables.get(i);
00133 if (!schedulerDatabaseSchema.hasTable(t.getName()))
00134 {
00135 schedulerDatabaseSchema.addTable(t);
00136 if (logger.isInfoEnabled())
00137 logger.info("Adding table " + t.getName());
00138 }
00139 }
00140 }
00141 }
00142
00143
00144
00145
00146
00147
00148
00149 public void mergeDatabaseSchema(DatabaseSchema dbs)
00150 {
00151 try
00152 {
00153 logger.info("Merging new database schema");
00154 schedulerDatabaseSchema.mergeSchema(new SchedulerDatabaseSchema(dbs));
00155 }
00156 catch (Exception e)
00157 {
00158 logger.error("Error while merging new database schema", e);
00159 }
00160 }
00161
00162
00163
00164
00165
00166
00167
00168 public final void scheduleReadRequest(SelectRequest request)
00169 throws SQLException
00170 {
00171 synchronized (this)
00172 {
00173 request.setId(requestId++);
00174 }
00175 }
00176
00177
00178
00179
00180 public final void readCompletedNotify(SelectRequest request)
00181 {
00182 }
00183
00184
00185
00186
00187
00188
00189
00190 public void scheduleNonSuspendedWriteRequest(AbstractWriteRequest request)
00191 throws SQLException, RollbackException
00192 {
00193 if (request.isCreate())
00194 {
00195 synchronized (this)
00196 {
00197 request.setId(requestId++);
00198 }
00199 return;
00200 }
00201
00202 SchedulerDatabaseTable t = schedulerDatabaseSchema.getTable(request
00203 .getTableName());
00204 if (t == null)
00205 {
00206 String msg = "No table found for request " + request.getId();
00207 logger.error(msg);
00208 throw new SQLException(msg);
00209 }
00210
00211
00212 TransactionExclusiveLock tableLock = t.getLock();
00213 if (!request.isAutoCommit())
00214 {
00215 synchronized (this)
00216 {
00217 if (tableLock.isLocked())
00218 {
00219 long owner = tableLock.getLocker();
00220 long us = request.getTransactionId();
00221 if (owner != us)
00222 {
00223 ArrayList tables = schedulerDatabaseSchema.getTables();
00224 ArrayList weAreblocking = new ArrayList();
00225 int size = tables.size();
00226 for (int i = 0; i < size; i++)
00227 {
00228 SchedulerDatabaseTable table = (SchedulerDatabaseTable) tables
00229 .get(i);
00230 if (table == null)
00231 continue;
00232 TransactionExclusiveLock lock = table.getLock();
00233
00234 if (lock.isLocked())
00235 {
00236 if (lock.getLocker() == us)
00237 {
00238
00239 if (lock.isWaiting(owner))
00240 {
00241 releaseLocks(us);
00242 throw new RollbackException(
00243 "Deadlock detected, rollbacking transaction " + us);
00244 }
00245 else
00246 weAreblocking.addAll(lock.getWaitingList());
00247 }
00248 }
00249 }
00250 }
00251 else
00252 {
00253
00254 request.setId(requestId++);
00255 return;
00256 }
00257 }
00258 else
00259 {
00260 acquireLockAndSetRequestId(request, tableLock);
00261 return;
00262 }
00263 }
00264 }
00265
00266 acquireLockAndSetRequestId(request, tableLock);
00267 }
00268
00269 private void acquireLockAndSetRequestId(AbstractWriteRequest request,
00270 TransactionExclusiveLock tableLock) throws SQLException
00271 {
00272
00273 if (tableLock.acquire(request))
00274 {
00275 synchronized (this)
00276 {
00277 request.setId(requestId++);
00278 }
00279 if (logger.isDebugEnabled())
00280 logger.debug("Request " + request.getId() + " scheduled for write ("
00281 + getPendingWrites() + " pending writes)");
00282 }
00283 else
00284 {
00285 if (logger.isWarnEnabled())
00286 logger.warn("Request " + request.getId() + " timed out ("
00287 + request.getTimeout() + " ms)");
00288 throw new SQLException("Timeout (" + request.getTimeout()
00289 + ") for request: " + request.getId());
00290 }
00291 }
00292
00293
00294
00295
00296 public final void notifyWriteCompleted(AbstractWriteRequest request)
00297 {
00298 if (request.isCreate())
00299 {
00300 if (logger.isDebugEnabled())
00301 logger.debug("Adding table '" + request.getTableName()
00302 + "' to scheduler schema");
00303 synchronized (this)
00304 {
00305 schedulerDatabaseSchema.addTable(new SchedulerDatabaseTable(
00306 new DatabaseTable(request.getTableName())));
00307 }
00308 }
00309 else if (request.isDrop())
00310 {
00311 if (logger.isDebugEnabled())
00312 logger.debug("Removing table '" + request.getTableName()
00313 + "' to scheduler schema");
00314 synchronized (this)
00315 {
00316 schedulerDatabaseSchema.removeTable(schedulerDatabaseSchema
00317 .getTable(request.getTableName()));
00318 }
00319 return;
00320 }
00321
00322
00323
00324 if (request.isAutoCommit())
00325 {
00326 SchedulerDatabaseTable t = schedulerDatabaseSchema.getTable(request
00327 .getTableName());
00328 if (t == null)
00329 {
00330 String msg = "No table found to release lock for request "
00331 + request.getId();
00332 logger.error(msg);
00333 }
00334 else
00335 t.getLock().release();
00336 }
00337 }
00338
00339
00340
00341
00342
00343
00344
00345
00346 protected final void commitTransaction(long transactionId)
00347 {
00348 releaseLocks(transactionId);
00349 }
00350
00351
00352
00353
00354 protected final void rollbackTransaction(long transactionId)
00355 {
00356 releaseLocks(transactionId);
00357 }
00358
00359
00360
00361
00362
00363
00364 private synchronized void releaseLocks(long transactionId)
00365 {
00366 ArrayList tables = schedulerDatabaseSchema.getTables();
00367 int size = tables.size();
00368 for (int i = 0; i < size; i++)
00369 {
00370 SchedulerDatabaseTable t = (SchedulerDatabaseTable) tables.get(i);
00371 if (t == null)
00372 continue;
00373 TransactionExclusiveLock lock = t.getLock();
00374
00375 if (lock.isLocked())
00376 if (lock.getLocker() == transactionId)
00377 lock.release();
00378 }
00379 }
00380
00381
00382
00383
00384
00385
00386
00387
00388 public String getXmlImpl()
00389 {
00390 return "<" + DatabasesXmlTags.ELT_RAIDb1Scheduler + " "
00391 + DatabasesXmlTags.ATT_level + "=\""
00392 + DatabasesXmlTags.VAL_optimisticTransaction + "\"/>";
00393 }
00394
00395 }