Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1 Class Reference

Inheritance diagram for org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1:

Inheritance graph
[legend]
Collaboration diagram for org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 RAIDb1 (VirtualDatabase vdb, WaitForCompletionPolicy waitForCompletionPolicy) throws Exception
abstract ControllerResultSet execReadRequest (SelectRequest request, MetadataCache metadataCache) throws SQLException
int execWriteRequest (AbstractWriteRequest request) throws AllBackendsFailedException, NoMoreBackendException, SQLException
ControllerResultSet execWriteRequestWithKeys (AbstractWriteRequest request, MetadataCache metadataCache) throws AllBackendsFailedException, SQLException
ControllerResultSet execReadStoredProcedure (StoredProcedure proc, MetadataCache metadataCache) throws SQLException
int execWriteStoredProcedure (StoredProcedure proc) throws SQLException
final void begin (TransactionMarkerMetaData tm) throws SQLException
void commit (TransactionMarkerMetaData tm) throws SQLException
void rollback (TransactionMarkerMetaData tm) throws SQLException
void enableBackend (DatabaseBackend db, boolean writeEnabled) throws SQLException
synchronized void disableBackend (DatabaseBackend db) throws SQLException
String getXmlImpl ()
abstract String getRaidb1Xml ()

Protected Member Functions

ControllerResultSet executeRequestOnBackend (SelectRequest request, DatabaseBackend backend, MetadataCache metadataCache) throws SQLException, UnreachableBackendException
ControllerResultSet executeStoredProcedureOnBackend (StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache) throws SQLException, UnreachableBackendException
void waitForAllWritesToComplete (long transactionId) throws SQLException
void waitForAllWritesToComplete (DatabaseBackend backend, long transactionId) throws SQLException
void waitForAllWritesToComplete (DatabaseBackend backend) throws SQLException

Protected Attributes

ArrayList backendBlockingThreads
ArrayList backendNonBlockingThreads
ReadPrioritaryFIFOWriteLock backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock()
ReadPrioritaryFIFOWriteLock backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock()
WaitForCompletionPolicy waitForCompletionPolicy

Static Protected Attributes

Trace logger

Detailed Description

RAIDb-1 load balancer.

This class is an abstract call because the read requests coming from the request controller are NOT treated here but in the subclasses. Transaction management and write requests are broadcasted to all backends.

Author:
Emmanuel Cecchet
Version:
1.0

Definition at line 74 of file RAIDb1.java.


Constructor & Destructor Documentation

org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.RAIDb1 VirtualDatabase  vdb,
WaitForCompletionPolicy  waitForCompletionPolicy
throws Exception
 

Creates a new RAIDb-1 Round Robin request load balancer. A new backend worker thread is created for each backend.

Parameters:
vdb the virtual database this load balancer belongs to.
waitForCompletionPolicy How many backends must complete before returning the result?
Exceptions:
Exception if an error occurs

Definition at line 119 of file RAIDb1.java.

References org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.backendBlockingThreads, and org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.backendNonBlockingThreads.

00121   {
00122     super(vdb, RAIDbLevels.RAIDb1, ParsingGranularities.NO_PARSING);
00123     this.waitForCompletionPolicy = waitForCompletionPolicy;
00124     backendBlockingThreads = new ArrayList();
00125     backendNonBlockingThreads = new ArrayList();
00126   }


Member Function Documentation

final void org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.begin TransactionMarkerMetaData  tm  )  throws SQLException [virtual]
 

Begins a new transaction.

Parameters:
tm the transaction marker metadata
Exceptions:
SQLException if an error occurs

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 867 of file RAIDb1.java.

00868   {
00869   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.commit TransactionMarkerMetaData  tm  )  throws SQLException [virtual]
 

Commits a transaction.

Parameters:
tm the transaction marker metadata
Exceptions:
SQLException if an error occurs

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 877 of file RAIDb1.java.

References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.hasTaskForTransaction(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.insertTaskAfterLastWriteForTransaction(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction(), and org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.setExpiredTimeout().

00878   {
00879     long tid = tm.getTransactionId();
00880     Long lTid = new Long(tid);
00881     // List of backends that still have pending queries for the transaction to
00882     // commit
00883     ArrayList asynchronousBackends = null;
00884     CommitTask task = null;
00885 
00886     if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00887     {
00888       try
00889       {
00890         backendBlockingThreadsRWLock.acquireWrite();
00891         // Lock in write to ensure that all writes are posted and we wait in the
00892         // queue, else a read lock has the priority with the implementation we
00893         // are using.
00894       }
00895       catch (InterruptedException e)
00896       {
00897         String msg = Translate.get(
00898             "loadbalancer.backendlist.acquire.writelock.failed", e);
00899         logger.error(msg);
00900         throw new SQLException(msg);
00901       }
00902 
00903       int nbOfThreads = backendBlockingThreads.size();
00904       // Create the task
00905       task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm
00906           .getTimeout(), tm.getLogin(), tid);
00907 
00908       for (int i = 0; i < nbOfThreads; i++)
00909       {
00910         BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
00911             .get(i);
00912         if (thread.hasTaskForTransaction(lTid))
00913         {
00914           if (asynchronousBackends == null)
00915             asynchronousBackends = new ArrayList();
00916           asynchronousBackends.add(thread.getBackend());
00917           synchronized (thread)
00918           {
00919             thread.insertTaskAfterLastWriteForTransaction(task, lTid);
00920             thread.notify();
00921           }
00922         }
00923       }
00924 
00925       backendBlockingThreadsRWLock.releaseWrite();
00926     }
00927 
00928     try
00929     {
00930       backendNonBlockingThreadsRWLock.acquireWrite();
00931     }
00932     catch (InterruptedException e)
00933     {
00934       String msg = Translate.get(
00935           "loadbalancer.backendlist.acquire.writelock.failed", e);
00936       logger.error(msg);
00937       throw new SQLException(msg);
00938     }
00939 
00940     int nbOfThreads = backendNonBlockingThreads.size();
00941     ArrayList commitList = new ArrayList();
00942 
00943     // Build the list of backends that need to commit this transaction
00944     for (int i = 0; i < nbOfThreads; i++)
00945     {
00946       BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
00947           .get(i);
00948       DatabaseBackend backend = thread.getBackend();
00949       // If the transaction has been started on this backend and it was not
00950       // previously treated in the asynchronous backend list (late nodes), then
00951       // we have to post the task now in the asynchronous list.
00952       if (backend.isStartedTransaction(lTid)
00953           && ((asynchronousBackends == null) || (!asynchronousBackends
00954               .contains(backend))))
00955         commitList.add(thread);
00956     }
00957 
00958     nbOfThreads = commitList.size();
00959     if (nbOfThreads == 0)
00960     {
00961       backendNonBlockingThreadsRWLock.releaseWrite();
00962       return;
00963     }
00964 
00965     if (task == null)
00966       task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm
00967           .getTimeout(), tm.getLogin(), tid);
00968 
00969     synchronized (task)
00970     {
00971       // Post the task in each backendThread tasklist and wakeup the threads
00972       for (int i = 0; i < nbOfThreads; i++)
00973       {
00974         BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i);
00975         synchronized (thread)
00976         {
00977           thread.addTask(task, tid);
00978           thread.notify();
00979         }
00980       }
00981 
00982       backendNonBlockingThreadsRWLock.releaseWrite();
00983 
00984       // Wait for completion (notified by the task)
00985       try
00986       {
00987         // Wait on task
00988         long timeout = tm.getTimeout();
00989         if (timeout > 0)
00990         {
00991           long start = System.currentTimeMillis();
00992           task.wait(timeout);
00993           long end = System.currentTimeMillis();
00994           long remaining = timeout - (end - start);
00995           if (remaining <= 0)
00996           {
00997             if (task.setExpiredTimeout())
00998             { // Task will be ignored by all backends
00999               String msg = Translate.get("loadbalancer.commit.timeout",
01000                   new String[]{String.valueOf(tid),
01001                       String.valueOf(task.getSuccess()),
01002                       String.valueOf(task.getFailed())});
01003               logger.warn(msg);
01004               throw new SQLException(msg);
01005             }
01006             // else task execution already started, to late to cancel
01007           }
01008         }
01009         else
01010           task.wait();
01011       }
01012       catch (InterruptedException e)
01013       {
01014         if (task.setExpiredTimeout())
01015         { // Task will be ignored by all backends
01016           String msg = Translate.get("loadbalancer.commit.timeout",
01017               new String[]{String.valueOf(tid),
01018                   String.valueOf(task.getSuccess()),
01019                   String.valueOf(task.getFailed())});
01020           logger.warn(msg);
01021           throw new SQLException(msg);
01022         }
01023         // else task execution already started, to late to cancel
01024       }
01025 
01026       if (task.getSuccess() > 0)
01027         return;
01028       else
01029       { // All tasks failed
01030         ArrayList exceptions = task.getExceptions();
01031         if (exceptions == null)
01032           throw new SQLException(Translate.get(
01033               "loadbalancer.commit.all.failed", tid));
01034         else
01035         {
01036           String errorMsg = Translate.get("loadbalancer.commit.failed.stack",
01037               tid)
01038               + "\n";
01039           SQLException ex = SQLExceptionFactory.getSQLException(exceptions,
01040               errorMsg);
01041           logger.error(ex.getMessage());
01042           throw ex;
01043         }
01044       }
01045     }
01046   }

synchronized void org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.disableBackend DatabaseBackend  db  )  throws SQLException [virtual]
 

Disables a backend that was previously enabled.

Ask the corresponding connection manager to finalize the connections if needed.

No sanity checks are performed by this function.

Parameters:
db the database backend to disable
Exceptions:
SQLException if an error occurs

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Reimplemented in org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec.

Definition at line 1412 of file RAIDb1.java.

References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.equals(), and org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend().

01414   {
01415     if (db.isWriteEnabled())
01416     {
01417       // Starts with backendBlockingThreads
01418       try
01419       {
01420         backendBlockingThreadsRWLock.acquireWrite();
01421       }
01422       catch (InterruptedException e)
01423       {
01424         String msg = Translate.get(
01425             "loadbalancer.backendlist.acquire.writelock.failed", e);
01426         logger.error(msg);
01427         throw new SQLException(msg);
01428       }
01429 
01430       int nbOfThreads = backendBlockingThreads.size();
01431 
01432       // Find the right blocking thread
01433       for (int i = 0; i < nbOfThreads; i++)
01434       {
01435         BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01436             .get(i);
01437         if (thread.getBackend().equals(db))
01438         {
01439           logger.info(Translate
01440               .get("loadbalancer.backend.workerthread.blocking.remove", db
01441                   .getName()));
01442 
01443           // Remove it from the backendBlockingThread list
01444           backendBlockingThreads.remove(thread);
01445 
01446           synchronized (thread)
01447           {
01448             // Kill the thread
01449             thread.addPriorityTask(new KillThreadTask(1, 1));
01450             thread.notify();
01451           }
01452           break;
01453         }
01454       }
01455 
01456       backendBlockingThreadsRWLock.releaseWrite();
01457 
01458       // Continue with backendNonBlockingThreads
01459 
01460       try
01461       {
01462         backendNonBlockingThreadsRWLock.acquireWrite();
01463       }
01464       catch (InterruptedException e)
01465       {
01466         String msg = Translate.get(
01467             "loadbalancer.backendlist.acquire.writelock.failed", e);
01468         logger.error(msg);
01469         throw new SQLException(msg);
01470       }
01471 
01472       // Find the right non-blocking thread
01473       nbOfThreads = backendNonBlockingThreads.size();
01474       for (int i = 0; i < nbOfThreads; i++)
01475       {
01476         BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
01477             .get(i);
01478         if (thread.getBackend().equals(db))
01479         {
01480           logger.info(Translate.get(
01481               "loadbalancer.backend.workerthread.non.blocking.remove", db
01482                   .getName()));
01483 
01484           // Remove it from the backendNonBlockingThreads list
01485           backendNonBlockingThreads.remove(thread);
01486 
01487           synchronized (thread)
01488           {
01489             // Kill the thread
01490             thread.addPriorityTask(new KillThreadTask(1, 1));
01491             thread.notify();
01492           }
01493           break;
01494         }
01495       }
01496 
01497       backendNonBlockingThreadsRWLock.releaseWrite();
01498     }
01499 
01500     db.disable();
01501     if (db.isInitialized())
01502       db.finalizeConnections();
01503   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.enableBackend DatabaseBackend  db,
boolean  writeEnabled
throws SQLException [virtual]
 

Enables a Backend that was previously disabled.

Ask the corresponding connection manager to initialize the connections if needed.

No sanity checks are performed by this function.

Parameters:
db the database backend to enable
writeEnabled True if the backend must be enabled for writes
Exceptions:
SQLException if an error occurs

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Reimplemented in org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec.

Definition at line 1349 of file RAIDb1.java.

01351   {
01352     if (writeEnabled && db.isWriteCanBeEnabled())
01353     {
01354       // Create 2 worker threads
01355       BackendWorkerThread blockingThread = new BackendWorkerThread(db, this);
01356       BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this);
01357 
01358       // Add first to the blocking thread list
01359       try
01360       {
01361         backendBlockingThreadsRWLock.acquireWrite();
01362       }
01363       catch (InterruptedException e)
01364       {
01365         String msg = Translate.get(
01366             "loadbalancer.backendlist.acquire.writelock.failed", e);
01367         logger.error(msg);
01368         throw new SQLException(msg);
01369       }
01370       backendBlockingThreads.add(blockingThread);
01371       backendBlockingThreadsRWLock.releaseWrite();
01372       blockingThread.start();
01373       logger.info(Translate.get(
01374           "loadbalancer.backend.workerthread.blocking.add", db.getName()));
01375 
01376       // Then add to the non-blocking thread list
01377       try
01378       {
01379         backendNonBlockingThreadsRWLock.acquireWrite();
01380       }
01381       catch (InterruptedException e)
01382       {
01383         String msg = Translate.get(
01384             "loadbalancer.backendlist.acquire.writelock.failed", e);
01385         logger.error(msg);
01386         throw new SQLException(msg);
01387       }
01388       backendNonBlockingThreads.add(nonBlockingThread);
01389       backendNonBlockingThreadsRWLock.releaseWrite();
01390       nonBlockingThread.start();
01391       logger.info(Translate.get(
01392           "loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
01393       db.enableWrite();
01394     }
01395 
01396     if (!db.isInitialized())
01397       db.initializeConnections();
01398     db.enableRead();
01399   }

abstract ControllerResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.execReadRequest SelectRequest  request,
MetadataCache  metadataCache
throws SQLException [pure virtual]
 

See also:
AbstractLoadBalancer.execReadRequest(SelectRequest, MetadataCache)

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Implemented in org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_RR, and org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRR.

ControllerResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.execReadStoredProcedure StoredProcedure  proc,
MetadataCache  metadataCache
throws SQLException [virtual]
 

See also:
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.execReadStoredProcedure(StoredProcedure, MetadataCache)

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 696 of file RAIDb1.java.

References org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask.getResult().

00698   {
00699     ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure(
00700         proc, true, metadataCache);
00701     return task.getResult();
00702   }

ControllerResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.executeRequestOnBackend SelectRequest  request,
DatabaseBackend  backend,
MetadataCache  metadataCache
throws SQLException, UnreachableBackendException [protected]
 

Execute a read request on the selected backend.

Parameters:
request the request to execute
backend the backend that will execute the request
metadataCache the metadataCache if any or null
Returns:
the ResultSet
Exceptions:
SQLException if an error occurs

Definition at line 175 of file RAIDb1.java.

References org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.handleMacros().

00178   {
00179     // Handle macros
00180     handleMacros(request);
00181 
00182     // Ok, we have a backend, let's execute the request
00183     AbstractConnectionManager cm = backend.getConnectionManager(request
00184         .getLogin());
00185 
00186     // Sanity check
00187     if (cm == null)
00188     {
00189       String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00190           new String[]{request.getLogin(), backend.getName()});
00191       logger.error(msg);
00192       throw new SQLException(msg);
00193     }
00194 
00195     // Execute the query
00196     if (request.isAutoCommit())
00197     {
00198       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00199         // We could do something finer grain here by waiting
00200         // only for writes that depend on the tables we need
00201         // but is that really worth the overhead ?
00202         waitForAllWritesToComplete(backend);
00203 
00204       ControllerResultSet rs = null;
00205       boolean badConnection;
00206       do
00207       {
00208         badConnection = false;
00209         // Use a connection just for this request
00210         Connection c = null;
00211         try
00212         {
00213           c = cm.getConnection();
00214         }
00215         catch (UnreachableBackendException e1)
00216         {
00217           logger.error(Translate.get(
00218               "loadbalancer.backend.disabling.unreachable", backend.getName()));
00219           disableBackend(backend);
00220           throw new UnreachableBackendException(Translate.get(
00221               "loadbalancer.backend.unreacheable", backend.getName()));
00222         }
00223 
00224         // Sanity check
00225         if (c == null)
00226           throw new SQLException(Translate.get(
00227               "loadbalancer.backend.no.connection", backend.getName()));
00228 
00229         // Execute Query
00230         try
00231         {
00232           rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00233           cm.releaseConnection(c);
00234         }
00235         catch (SQLException e)
00236         {
00237           cm.releaseConnection(c);
00238           throw SQLExceptionFactory.getSQLException(e, Translate.get(
00239               "loadbalancer.request.failed.on.backend", new String[]{
00240                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00241                   backend.getName(), e.getMessage()}));
00242         }
00243         catch (BadConnectionException e)
00244         { // Get rid of the bad connection
00245           cm.deleteConnection(c);
00246           badConnection = true;
00247         }
00248       }
00249       while (badConnection);
00250       if (logger.isDebugEnabled())
00251         logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00252             String.valueOf(request.getId()), backend.getName()}));
00253       return rs;
00254     }
00255     else
00256     { // Inside a transaction
00257       Connection c;
00258       long tid = request.getTransactionId();
00259       Long lTid = new Long(tid);
00260 
00261       // Wait for previous writes to complete
00262       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00263         waitForAllWritesToComplete(backend, request.getTransactionId());
00264 
00265       try
00266       {
00267         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00268       }
00269       catch (UnreachableBackendException e1)
00270       {
00271         logger.error(Translate.get(
00272             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00273         disableBackend(backend);
00274         throw new SQLException(Translate.get(
00275             "loadbalancer.backend.unreacheable", backend.getName()));
00276       }
00277       catch (NoTransactionStartWhenDisablingException e)
00278       {
00279         String msg = Translate.get("loadbalancer.backend.is.disabling",
00280             new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00281                 backend.getName()});
00282         logger.error(msg);
00283         throw new SQLException(msg);
00284       }
00285 
00286       // Sanity check
00287       if (c == null)
00288         throw new SQLException(Translate.get(
00289             "loadbalancer.unable.retrieve.connection", new String[]{
00290                 String.valueOf(tid), backend.getName()}));
00291 
00292       // Execute Query
00293       ControllerResultSet rs = null;
00294       try
00295       {
00296         rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00297       }
00298       catch (SQLException e)
00299       {
00300         throw SQLExceptionFactory.getSQLException(e, Translate.get(
00301             "loadbalancer.request.failed.on.backend", new String[]{
00302                 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00303                 backend.getName(), e.getMessage()}));
00304       }
00305       catch (BadConnectionException e)
00306       { // Connection failed, so did the transaction
00307         // Disable the backend.
00308         cm.deleteConnection(tid);
00309         String msg = Translate.get(
00310             "loadbalancer.backend.disabling.connection.failure", backend
00311                 .getName());
00312         logger.error(msg);
00313         disableBackend(backend);
00314         throw new SQLException(msg);
00315       }
00316       if (logger.isDebugEnabled())
00317         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00318             new String[]{String.valueOf(tid), String.valueOf(request.getId()),
00319                 backend.getName()}));
00320       return rs;
00321     }
00322   }

ControllerResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.executeStoredProcedureOnBackend StoredProcedure  proc,
DatabaseBackend  backend,
MetadataCache  metadataCache
throws SQLException, UnreachableBackendException [protected]
 

Execute a stored procedure on the selected backend.

Parameters:
proc the stored procedure to execute
backend the backend that will execute the request
metadataCache the metadataCache if any or null
Returns:
the ResultSet
Exceptions:
SQLException if an error occurs

Definition at line 561 of file RAIDb1.java.

00564   {
00565     // Handle macros
00566     handleMacros(proc);
00567 
00568     // Ok, we have a backend, let's execute the request
00569     AbstractConnectionManager cm = backend
00570         .getConnectionManager(proc.getLogin());
00571 
00572     // Sanity check
00573     if (cm == null)
00574     {
00575       String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00576           new String[]{proc.getLogin(), backend.getName()});
00577       logger.error(msg);
00578       throw new SQLException(msg);
00579     }
00580 
00581     // Execute the query
00582     if (proc.isAutoCommit())
00583     {
00584       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00585         // We could do something finer grain here by waiting
00586         // only for writes that depend on the tables we need
00587         // but is that really worth the overhead ?
00588         waitForAllWritesToComplete(backend);
00589 
00590       // Use a connection just for this request
00591       Connection c = null;
00592       try
00593       {
00594         c = cm.getConnection();
00595       }
00596       catch (UnreachableBackendException e1)
00597       {
00598         logger.error(Translate.get(
00599             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00600         disableBackend(backend);
00601         throw new UnreachableBackendException(Translate.get(
00602             "loadbalancer.backend.unreacheable", backend.getName()));
00603       }
00604 
00605       // Sanity check
00606       if (c == null)
00607         throw new UnreachableBackendException(Translate.get(
00608             "loadbalancer.backend.no.connection", backend.getName()));
00609 
00610       // Execute Query
00611       ControllerResultSet rs = null;
00612       try
00613       {
00614         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00615             backend, c, metadataCache);
00616       }
00617       catch (Exception e)
00618       {
00619         throw new SQLException(Translate.get(
00620             "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00621                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00622                 backend.getName(), e.getMessage()}));
00623       }
00624       finally
00625       {
00626         cm.releaseConnection(c);
00627       }
00628       if (logger.isDebugEnabled())
00629         logger.debug(Translate.get("loadbalancer.storedprocedure.on",
00630             new String[]{String.valueOf(proc.getId()), backend.getName()}));
00631       return rs;
00632     }
00633     else
00634     { // Inside a transaction
00635       Connection c;
00636       long tid = proc.getTransactionId();
00637       Long lTid = new Long(tid);
00638 
00639       // Wait for previous writes to complete
00640       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00641         waitForAllWritesToComplete(backend, proc.getTransactionId());
00642 
00643       try
00644       {
00645         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00646       }
00647       catch (UnreachableBackendException e1)
00648       {
00649         logger.error(Translate.get(
00650             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00651         disableBackend(backend);
00652         throw new SQLException(Translate.get(
00653             "loadbalancer.backend.unreacheable", backend.getName()));
00654       }
00655       catch (NoTransactionStartWhenDisablingException e)
00656       {
00657         String msg = Translate.get("loadbalancer.backend.is.disabling",
00658             new String[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00659                 backend.getName()});
00660         logger.error(msg);
00661         throw new SQLException(msg);
00662       }
00663 
00664       // Sanity check
00665       if (c == null)
00666         throw new SQLException(Translate.get(
00667             "loadbalancer.unable.retrieve.connection", new String[]{
00668                 String.valueOf(tid), backend.getName()}));
00669 
00670       // Execute Query
00671       ControllerResultSet rs;
00672       try
00673       {
00674         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00675             backend, c, metadataCache);
00676       }
00677       catch (Exception e)
00678       {
00679         throw new SQLException(Translate.get(
00680             "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00681                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00682                 backend.getName(), e.getMessage()}));
00683       }
00684       if (logger.isDebugEnabled())
00685         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00686             new String[]{String.valueOf(tid), String.valueOf(proc.getId()),
00687                 backend.getName()}));
00688       return rs;
00689     }
00690   }

int org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.execWriteRequest AbstractWriteRequest  request  )  throws AllBackendsFailedException, NoMoreBackendException, SQLException [virtual]
 

Performs a write request. This request is broadcasted to all nodes.

Parameters:
request an AbstractWriteRequest
Returns:
number of rows affected by the request
Exceptions:
AllBackendsFailedException if all backends failed to execute the request
SQLException if an error occurs
NoMoreBackendException if no backends left to execute the request

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 335 of file RAIDb1.java.

00337   {
00338     return ((WriteRequestTask) execWriteRequest(request, false, null))
00339         .getResult();
00340   }

ControllerResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.execWriteRequestWithKeys AbstractWriteRequest  request,
MetadataCache  metadataCache
throws AllBackendsFailedException, SQLException [virtual]
 

Perform a write request and return the auto generated keys.

Parameters:
request the request to execute
metadataCache the metadataCache if any or null
Returns:
auto generated keys.
Exceptions:
AllBackendsFailedException if all backends failed to execute the request
SQLException if an error occurs

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 352 of file RAIDb1.java.

00355   {
00356     return ((WriteRequestWithKeysTask) execWriteRequest(request, true,
00357         metadataCache)).getResult();
00358   }

int org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.execWriteStoredProcedure StoredProcedure  proc  )  throws SQLException [virtual]
 

See also:
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.execWriteStoredProcedure(org.objectweb.cjdbc.common.sql.StoredProcedure)

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 707 of file RAIDb1.java.

References org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask.getResult().

00708   {
00709     WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure(
00710         proc, false, null);
00711     return task.getResult();
00712   }

abstract String org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.getRaidb1Xml  )  [pure virtual]
 

Surrounding raidb1 tags can be treated by <method>getXmlImpl </method> above, but more detailed content have to be returned by the method <method>getRaidb1Xml </method> below.

Returns:
content of Raidb1 xml

Implemented in org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_RR, and org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRR.

String org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.getXmlImpl  )  [virtual]
 

See also:
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getXmlImpl

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Reimplemented in org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec.

Definition at line 1508 of file RAIDb1.java.

01509   {
01510     StringBuffer info = new StringBuffer();
01511     info.append("<" + DatabasesXmlTags.ELT_RAIDb_1 + ">");
01512     if (waitForCompletionPolicy != null)
01513       info.append(waitForCompletionPolicy.getXml());
01514     if (macroHandler != null)
01515       info.append(macroHandler.getXml());
01516     info.append(getRaidb1Xml());
01517     info.append("</" + DatabasesXmlTags.ELT_RAIDb_1 + ">");
01518     return info.toString();
01519   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.rollback TransactionMarkerMetaData  tm  )  throws SQLException [virtual]
 

Rollbacks a transaction.

Parameters:
tm the transaction marker metadata
Exceptions:
SQLException if an error occurs

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 1054 of file RAIDb1.java.

References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.hasTaskForTransaction(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.insertTaskAfterLastWriteForTransaction(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction(), and org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.setExpiredTimeout().

01055   {
01056     long tid = tm.getTransactionId();
01057     Long lTid = new Long(tid);
01058     // List of backends that still have pending queries for the transaction to
01059     // commit
01060     ArrayList asynchronousBackends = null;
01061     RollbackTask task = null;
01062 
01063     if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
01064     {
01065       try
01066       {
01067         backendBlockingThreadsRWLock.acquireWrite();
01068         // Lock in write to ensure that all writes are posted and we wait in the
01069         // queue, else a read lock has the priority with the implementation we
01070         // are using.
01071       }
01072       catch (InterruptedException e)
01073       {
01074         String msg = Translate.get(
01075             "loadbalancer.backendlist.acquire.writelock.failed", e);
01076         logger.error(msg);
01077         throw new SQLException(msg);
01078       }
01079 
01080       int nbOfThreads = backendBlockingThreads.size();
01081       // Create the task
01082       task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, tm
01083           .getTimeout(), tm.getLogin(), tid);
01084 
01085       for (int i = 0; i < nbOfThreads; i++)
01086       {
01087         BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01088             .get(i);
01089         if (thread.hasTaskForTransaction(lTid))
01090         {
01091           if (asynchronousBackends == null)
01092             asynchronousBackends = new ArrayList();
01093           asynchronousBackends.add(thread.getBackend());
01094           synchronized (thread)
01095           {
01096             thread.insertTaskAfterLastWriteForTransaction(task, lTid);
01097             thread.notify();
01098           }
01099         }
01100       }
01101 
01102       backendBlockingThreadsRWLock.releaseWrite();
01103     }
01104 
01105     try
01106     {
01107       backendNonBlockingThreadsRWLock.acquireWrite();
01108     }
01109     catch (InterruptedException e)
01110     {
01111       String msg = Translate.get(
01112           "loadbalancer.backendlist.acquire.writelock.failed", e);
01113       logger.error(msg);
01114       throw new SQLException(msg);
01115     }
01116     int nbOfThreads = backendNonBlockingThreads.size();
01117     ArrayList rollbackList = new ArrayList();
01118 
01119     // Build the list of backend that need to rollback this transaction
01120     for (int i = 0; i < nbOfThreads; i++)
01121     {
01122       BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
01123           .get(i);
01124       DatabaseBackend backend = thread.getBackend();
01125       // If the transaction has been started on this backend and it was not
01126       // previously treated in the asynchronous backend list (late nodes), then
01127       // we have to post the task now in the asynchronous list.
01128       if (backend.isStartedTransaction(lTid)
01129           && ((asynchronousBackends == null) || (!asynchronousBackends
01130               .contains(backend))))
01131         rollbackList.add(thread);
01132     }
01133 
01134     nbOfThreads = rollbackList.size();
01135     if (nbOfThreads == 0)
01136     {
01137       backendNonBlockingThreadsRWLock.releaseWrite();
01138       return;
01139     }
01140 
01141     if (task == null)
01142       task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads, tm
01143           .getTimeout(), tm.getLogin(), tid);
01144 
01145     synchronized (task)
01146     {
01147       // Post the task in each backendThread tasklist and wakeup the threads
01148       for (int i = 0; i < nbOfThreads; i++)
01149       {
01150         BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i);
01151         synchronized (thread)
01152         {
01153           thread.addTask(task, tid);
01154           thread.notify();
01155         }
01156       }
01157 
01158       backendNonBlockingThreadsRWLock.releaseWrite();
01159 
01160       // Wait for completion (notified by the task)
01161       try
01162       {
01163         // Wait on task
01164         long timeout = tm.getTimeout();
01165         if (timeout > 0)
01166         {
01167           long start = System.currentTimeMillis();
01168           task.wait(timeout);
01169           long end = System.currentTimeMillis();
01170           long remaining = timeout - (end - start);
01171           if (remaining <= 0)
01172           {
01173             if (task.setExpiredTimeout())
01174             { // Task will be ignored by all backends
01175               String msg = Translate.get("loadbalancer.rollback.timeout",
01176                   new String[]{String.valueOf(tid),
01177                       String.valueOf(task.getSuccess()),
01178                       String.valueOf(task.getFailed())});
01179               logger.warn(msg);
01180               throw new SQLException(msg);
01181             }
01182             // else task execution already started, to late to cancel
01183           }
01184         }
01185         else
01186           task.wait();
01187       }
01188       catch (InterruptedException e)
01189       {
01190         if (task.setExpiredTimeout())
01191         { // Task will be ignored by all backends
01192           String msg = Translate.get("loadbalancer.rollback.timeout",
01193               new String[]{String.valueOf(tid),
01194                   String.valueOf(task.getSuccess()),
01195                   String.valueOf(task.getFailed())});
01196           logger.warn(msg);
01197           throw new SQLException(msg);
01198         }
01199         // else task execution already started, to late to cancel
01200       }
01201 
01202       if (task.getSuccess() > 0)
01203         return;
01204       else
01205       { // All tasks failed
01206         ArrayList exceptions = task.getExceptions();
01207         if (exceptions == null)
01208           throw new SQLException(Translate.get(
01209               "loadbalancer.rollback.all.failed", tid));
01210         else
01211         {
01212           String errorMsg = Translate.get("loadbalancer.rollback.failed.stack",
01213               tid)
01214               + "\n";
01215           SQLException ex = SQLExceptionFactory.getSQLException(exceptions,
01216               errorMsg);
01217           logger.error(ex.getMessage());
01218           throw ex;
01219         }
01220       }
01221     }
01222   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.waitForAllWritesToComplete DatabaseBackend  backend  )  throws SQLException [protected]
 

Waits for all writes in the blocking thread queue of the given backend to complete.

See also:
executeRequestOnBackend

Definition at line 1302 of file RAIDb1.java.

References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), and org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().

01304   {
01305     try
01306     {
01307       backendBlockingThreadsRWLock.acquireWrite();
01308       // Lock in write to ensure that all writes are posted and we wait in the
01309       // queue, else a read lock has the priority with the implementation we are
01310       // using.
01311     }
01312     catch (InterruptedException e)
01313     {
01314       String msg = Translate.get(
01315           "loadbalancer.backendlist.acquire.writelock.failed", e);
01316       logger.error(msg);
01317       throw new SQLException(msg);
01318     }
01319 
01320     int nbOfThreads = backendBlockingThreads.size();
01321 
01322     for (int i = 0; i < nbOfThreads; i++)
01323     {
01324       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01325           .get(i);
01326       if (thread.getBackend() == backend)
01327         thread.waitForAllTasksToComplete();
01328     }
01329 
01330     backendBlockingThreadsRWLock.releaseWrite();
01331   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.waitForAllWritesToComplete DatabaseBackend  backend,
long  transactionId
throws SQLException [protected]
 

Waits for all writes of the given transaction in the blocking thread queue of the given backend to complete before being able to complete the transaction.

See also:
executeRequestOnBackend

Definition at line 1265 of file RAIDb1.java.

References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), and org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().

01267   {
01268     try
01269     {
01270       backendBlockingThreadsRWLock.acquireWrite();
01271       // Lock in write to ensure that all writes are posted and we wait in the
01272       // queue, else a read lock has the priority with the implementation we are
01273       // using.
01274     }
01275     catch (InterruptedException e)
01276     {
01277       String msg = Translate.get(
01278           "loadbalancer.backendlist.acquire.writelock.failed", e);
01279       logger.error(msg);
01280       throw new SQLException(msg);
01281     }
01282 
01283     int nbOfThreads = backendBlockingThreads.size();
01284 
01285     for (int i = 0; i < nbOfThreads; i++)
01286     {
01287       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01288           .get(i);
01289       if (thread.getBackend() == backend)
01290         thread.waitForAllTasksToComplete(transactionId);
01291     }
01292 
01293     backendBlockingThreadsRWLock.releaseWrite();
01294   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.waitForAllWritesToComplete long  transactionId  )  throws SQLException [protected]
 

Waits for all writes of the given transaction in the blocking thread queue to complete before being able to complete the transaction.

Definition at line 1228 of file RAIDb1.java.

References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().

01230   {
01231     try
01232     {
01233       backendBlockingThreadsRWLock.acquireWrite();
01234       // Lock in write to ensure that all writes are posted and we wait in the
01235       // queue, else a read lock has the priority with the implementation we are
01236       // using.
01237     }
01238     catch (InterruptedException e)
01239     {
01240       String msg = Translate.get(
01241           "loadbalancer.backendlist.acquire.writelock.failed", e);
01242       logger.error(msg);
01243       throw new SQLException(msg);
01244     }
01245 
01246     int nbOfThreads = backendBlockingThreads.size();
01247 
01248     for (int i = 0; i < nbOfThreads; i++)
01249     {
01250       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01251           .get(i);
01252       thread.waitForAllTasksToComplete(transactionId);
01253     }
01254 
01255     backendBlockingThreadsRWLock.releaseWrite();
01256   }


Member Data Documentation

ArrayList org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.backendBlockingThreads [protected]
 

List of BackendWorkerThread that executes possibly blocking queries

Definition at line 90 of file RAIDb1.java.

Referenced by org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.RAIDb1().

ReadPrioritaryFIFOWriteLock org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock() [protected]
 

Lock on backendBlockingThreads list

Definition at line 97 of file RAIDb1.java.

ArrayList org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.backendNonBlockingThreads [protected]
 

List of BackendWorkerThread that executes non-blocking queries

Definition at line 95 of file RAIDb1.java.

Referenced by org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.RAIDb1().

ReadPrioritaryFIFOWriteLock org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock() [protected]
 

Lock on backendNonBlockingThreads list

Definition at line 99 of file RAIDb1.java.

Trace org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.logger [static, protected]
 

Initial value:

 Trace
                                                                            .getLogger("org.objectweb.cjdbc.controller.loadbalancer.RAIDb1")

Reimplemented from org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 103 of file RAIDb1.java.

WaitForCompletionPolicy org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1.waitForCompletionPolicy [protected]
 

Should we wait for all backends to commit before returning ?

Definition at line 101 of file RAIDb1.java.


The documentation for this class was generated from the following file:
Generated on Mon Apr 11 22:04:02 2005 for C-JDBC by  doxygen 1.3.9.1