src/org/objectweb/cjdbc/controller/scheduler/raidb1/RAIDb1OptimisticTransactionLevelScheduler.java

説明を見る。
00001 00025 package org.objectweb.cjdbc.controller.scheduler.raidb1; 00026 00027 import java.sql.SQLException; 00028 import java.util.ArrayList; 00029 00030 import org.objectweb.cjdbc.common.exceptions.RollbackException; 00031 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 00032 import org.objectweb.cjdbc.common.sql.ParsingGranularities; 00033 import org.objectweb.cjdbc.common.sql.SelectRequest; 00034 import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema; 00035 import org.objectweb.cjdbc.common.sql.schema.DatabaseTable; 00036 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 00037 import org.objectweb.cjdbc.controller.requestmanager.RAIDbLevels; 00038 import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler; 00039 import org.objectweb.cjdbc.controller.scheduler.schema.SchedulerDatabaseSchema; 00040 import org.objectweb.cjdbc.controller.scheduler.schema.SchedulerDatabaseTable; 00041 import org.objectweb.cjdbc.controller.scheduler.schema.TransactionExclusiveLock; 00042 00054 public class RAIDb1OptimisticTransactionLevelScheduler 00055 extends 00056 AbstractScheduler 00057 { 00058 00059 // 00060 // How the code is organized ? 00061 // 00062 // 1. Member variables 00063 // 2. Constructor 00064 // 3. Request handling 00065 // 4. Transaction management 00066 // 5. Debug/Monitoring 00067 // 00068 00069 private long requestId; 00070 private SchedulerDatabaseSchema schedulerDatabaseSchema = null; 00071 00072 // 00073 // Constructor 00074 // 00075 00079 public RAIDb1OptimisticTransactionLevelScheduler() 00080 { 00081 super(RAIDbLevels.RAIDb1, ParsingGranularities.TABLE); 00082 requestId = 0; 00083 } 00084 00085 // 00086 // Request Handling 00087 // 00088 00097 public synchronized void setDatabaseSchema(DatabaseSchema dbs) 00098 { 00099 if (schedulerDatabaseSchema == null) 00100 { 00101 logger.info("Setting new database schema"); 00102 schedulerDatabaseSchema = new SchedulerDatabaseSchema(dbs); 00103 } 00104 else 00105 { // Schema is updated, compute the diff ! 00106 SchedulerDatabaseSchema newSchema = new SchedulerDatabaseSchema(dbs); 00107 ArrayList tables = schedulerDatabaseSchema.getTables(); 00108 ArrayList newTables = newSchema.getTables(); 00109 if (newTables == null) 00110 { // New schema is empty (no backend is active anymore) 00111 logger.info("Removing all tables."); 00112 schedulerDatabaseSchema = null; 00113 return; 00114 } 00115 00116 // Remove extra-tables 00117 for (int i = 0; i < tables.size(); i++) 00118 { 00119 SchedulerDatabaseTable t = (SchedulerDatabaseTable) tables.get(i); 00120 if (!newSchema.hasTable(t.getName())) 00121 { 00122 schedulerDatabaseSchema.removeTable(t); 00123 if (logger.isInfoEnabled()) 00124 logger.info("Removing table " + t.getName()); 00125 } 00126 } 00127 00128 // Add missing tables 00129 int size = newTables.size(); 00130 for (int i = 0; i < size; i++) 00131 { 00132 SchedulerDatabaseTable t = (SchedulerDatabaseTable) newTables.get(i); 00133 if (!schedulerDatabaseSchema.hasTable(t.getName())) 00134 { 00135 schedulerDatabaseSchema.addTable(t); 00136 if (logger.isInfoEnabled()) 00137 logger.info("Adding table " + t.getName()); 00138 } 00139 } 00140 } 00141 } 00142 00149 public void mergeDatabaseSchema(DatabaseSchema dbs) 00150 { 00151 try 00152 { 00153 logger.info("Merging new database schema"); 00154 schedulerDatabaseSchema.mergeSchema(new SchedulerDatabaseSchema(dbs)); 00155 } 00156 catch (Exception e) 00157 { 00158 logger.error("Error while merging new database schema", e); 00159 } 00160 } 00161 00168 public final void scheduleReadRequest(SelectRequest request) 00169 throws SQLException 00170 { 00171 synchronized (this) 00172 { 00173 request.setId(requestId++); 00174 } 00175 } 00176 00180 public final void readCompletedNotify(SelectRequest request) 00181 { 00182 } 00183 00190 public void scheduleNonSuspendedWriteRequest(AbstractWriteRequest request) 00191 throws SQLException, RollbackException 00192 { 00193 if (request.isCreate()) 00194 { 00195 synchronized (this) 00196 { 00197 request.setId(requestId++); 00198 } 00199 return; 00200 } 00201 00202 SchedulerDatabaseTable t = schedulerDatabaseSchema.getTable(request 00203 .getTableName()); 00204 if (t == null) 00205 { 00206 String msg = "No table found for request " + request.getId(); 00207 logger.error(msg); 00208 throw new SQLException(msg); 00209 } 00210 00211 // Deadlock detection 00212 TransactionExclusiveLock tableLock = t.getLock(); 00213 if (!request.isAutoCommit()) 00214 { 00215 synchronized (this) 00216 { 00217 if (tableLock.isLocked()) 00218 { // Is the lock owner blocked by a lock we already own? 00219 long owner = tableLock.getLocker(); 00220 long us = request.getTransactionId(); 00221 if (owner != us) 00222 { // Parse all tables 00223 ArrayList tables = schedulerDatabaseSchema.getTables(); 00224 ArrayList weAreblocking = new ArrayList(); 00225 int size = tables.size(); 00226 for (int i = 0; i < size; i++) 00227 { 00228 SchedulerDatabaseTable table = (SchedulerDatabaseTable) tables 00229 .get(i); 00230 if (table == null) 00231 continue; 00232 TransactionExclusiveLock lock = table.getLock(); 00233 // Are we the lock owner ? 00234 if (lock.isLocked()) 00235 { 00236 if (lock.getLocker() == us) 00237 { 00238 // Is 'owner' in the list of the blocked transactions? 00239 if (lock.isWaiting(owner)) 00240 { // Deadlock detected, we must rollback 00241 releaseLocks(us); 00242 throw new RollbackException( 00243 "Deadlock detected, rollbacking transaction " + us); 00244 } 00245 else 00246 weAreblocking.addAll(lock.getWaitingList()); 00247 } 00248 } 00249 } 00250 } 00251 else 00252 { // We are the lock owner and already synchronized on this 00253 // Assign the request id and exit 00254 request.setId(requestId++); 00255 return; 00256 } 00257 } 00258 else 00259 { // Lock is free, take it in the synchronized block 00260 acquireLockAndSetRequestId(request, tableLock); 00261 return; 00262 } 00263 } 00264 } 00265 00266 acquireLockAndSetRequestId(request, tableLock); 00267 } 00268 00269 private void acquireLockAndSetRequestId(AbstractWriteRequest request, 00270 TransactionExclusiveLock tableLock) throws SQLException 00271 { 00272 // Acquire the lock 00273 if (tableLock.acquire(request)) 00274 { 00275 synchronized (this) 00276 { 00277 request.setId(requestId++); 00278 } 00279 if (logger.isDebugEnabled()) 00280 logger.debug("Request " + request.getId() + " scheduled for write (" 00281 + getPendingWrites() + " pending writes)"); 00282 } 00283 else 00284 { 00285 if (logger.isWarnEnabled()) 00286 logger.warn("Request " + request.getId() + " timed out (" 00287 + request.getTimeout() + " ms)"); 00288 throw new SQLException("Timeout (" + request.getTimeout() 00289 + ") for request: " + request.getId()); 00290 } 00291 } 00292 00296 public final void notifyWriteCompleted(AbstractWriteRequest request) 00297 { 00298 if (request.isCreate()) 00299 { // Add table to schema 00300 if (logger.isDebugEnabled()) 00301 logger.debug("Adding table '" + request.getTableName() 00302 + "' to scheduler schema"); 00303 synchronized (this) 00304 { 00305 schedulerDatabaseSchema.addTable(new SchedulerDatabaseTable( 00306 new DatabaseTable(request.getTableName()))); 00307 } 00308 } 00309 else if (request.isDrop()) 00310 { // Drop table from schema 00311 if (logger.isDebugEnabled()) 00312 logger.debug("Removing table '" + request.getTableName() 00313 + "' to scheduler schema"); 00314 synchronized (this) 00315 { 00316 schedulerDatabaseSchema.removeTable(schedulerDatabaseSchema 00317 .getTable(request.getTableName())); 00318 } 00319 return; 00320 } 00321 00322 // Requests outside transaction delimiters must release the lock 00323 // as soon as they have executed 00324 if (request.isAutoCommit()) 00325 { 00326 SchedulerDatabaseTable t = schedulerDatabaseSchema.getTable(request 00327 .getTableName()); 00328 if (t == null) 00329 { 00330 String msg = "No table found to release lock for request " 00331 + request.getId(); 00332 logger.error(msg); 00333 } 00334 else 00335 t.getLock().release(); 00336 } 00337 } 00338 00339 // 00340 // Transaction Management 00341 // 00342 00346 protected final void commitTransaction(long transactionId) 00347 { 00348 releaseLocks(transactionId); 00349 } 00350 00354 protected final void rollbackTransaction(long transactionId) 00355 { 00356 releaseLocks(transactionId); 00357 } 00358 00364 private synchronized void releaseLocks(long transactionId) 00365 { 00366 ArrayList tables = schedulerDatabaseSchema.getTables(); 00367 int size = tables.size(); 00368 for (int i = 0; i < size; i++) 00369 { 00370 SchedulerDatabaseTable t = (SchedulerDatabaseTable) tables.get(i); 00371 if (t == null) 00372 continue; 00373 TransactionExclusiveLock lock = t.getLock(); 00374 // Are we the lock owner ? 00375 if (lock.isLocked()) 00376 if (lock.getLocker() == transactionId) 00377 lock.release(); 00378 } 00379 } 00380 00381 // 00382 // Debug/Monitoring 00383 // 00384 00388 public String getXmlImpl() 00389 { 00390 return "<" + DatabasesXmlTags.ELT_RAIDb1Scheduler + " " 00391 + DatabasesXmlTags.ATT_level + "=\"" 00392 + DatabasesXmlTags.VAL_optimisticTransaction + "\"/>"; 00393 } 00394 00395 }

CJDBCversion1.0.4に対してTue Oct 12 15:16:03 2004に生成されました。 doxygen 1.3.8