Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2 Class Reference

Inheritance diagram for org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2:

Inheritance graph
[legend]
Collaboration diagram for org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 RAIDb2 (VirtualDatabase vdb, WaitForCompletionPolicy waitForCompletionPolicy, CreateTablePolicy createTablePolicy) throws Exception
int execWriteRequest (AbstractWriteRequest request) throws AllBackendsFailedException, SQLException
ControllerResultSet execWriteRequestWithKeys (AbstractWriteRequest request, MetadataCache metadataCache) throws AllBackendsFailedException, SQLException
abstract ControllerResultSet execReadRequest (SelectRequest request, MetadataCache metadataCache) throws SQLException
ControllerResultSet execReadStoredProcedure (StoredProcedure proc, MetadataCache metadataCache) throws SQLException
int execWriteStoredProcedure (StoredProcedure proc) throws SQLException
final void begin (TransactionMarkerMetaData tm) throws SQLException
void commit (TransactionMarkerMetaData tm) throws SQLException
void rollback (TransactionMarkerMetaData tm) throws SQLException
void enableBackend (DatabaseBackend db, boolean writeEnabled) throws SQLException
synchronized void disableBackend (DatabaseBackend db) throws SQLException
String getXmlImpl ()
abstract String getRaidb2Xml ()

Protected Member Functions

ControllerResultSet executeRequestOnBackend (SelectRequest request, DatabaseBackend backend, MetadataCache metadataCache) throws SQLException, UnreachableBackendException
ControllerResultSet executeStoredProcedureOnBackend (StoredProcedure proc, DatabaseBackend backend, MetadataCache metadataCache) throws SQLException, UnreachableBackendException
void waitForAllWritesToComplete (long transactionId) throws SQLException
void waitForAllWritesToComplete (DatabaseBackend backend, long transactionId) throws SQLException
void waitForAllWritesToComplete (DatabaseBackend backend) throws SQLException

Protected Attributes

ArrayList backendBlockingThreads
ArrayList backendNonBlockingThreads
ReadPrioritaryFIFOWriteLock backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock()
ReadPrioritaryFIFOWriteLock backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock()
WaitForCompletionPolicy waitForCompletionPolicy
CreateTablePolicy createTablePolicy

Static Protected Attributes

Trace logger

Detailed Description

RAIDb-2 load balancer.

This class is an abstract call because the read requests coming from the Request Manager are NOT treated here but in the subclasses. Transaction management and write requests are broadcasted to all backends owning the written table.

Author:
Emmanuel Cecchet
Version:
1.0

Definition at line 76 of file RAIDb2.java.


Constructor & Destructor Documentation

org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.RAIDb2 VirtualDatabase  vdb,
WaitForCompletionPolicy  waitForCompletionPolicy,
CreateTablePolicy  createTablePolicy
throws Exception
 

Creates a new RAIDb-1 Round Robin request load balancer. A new backend worker thread is created for each backend.

Parameters:
vdb the virtual database this load balancer belongs to.
waitForCompletionPolicy how many backends must complete before returning the result ?
createTablePolicy the policy defining how 'create table' statements should be handled
Exceptions:
Exception if an error occurs

Definition at line 113 of file RAIDb2.java.

00116   {
00117     super(vdb, RAIDbLevels.RAIDb2, ParsingGranularities.TABLE);
00118 
00119     this.waitForCompletionPolicy = waitForCompletionPolicy;
00120     backendBlockingThreads = new ArrayList();
00121     backendNonBlockingThreads = new ArrayList();
00122     this.createTablePolicy = createTablePolicy;
00123   }


Member Function Documentation

final void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.begin TransactionMarkerMetaData  tm  )  throws SQLException [virtual]
 

Begins a new transaction.

Parameters:
tm the transaction marker metadata
Exceptions:
SQLException if an error occurs

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 908 of file RAIDb2.java.

00909   {
00910   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.commit TransactionMarkerMetaData  tm  )  throws SQLException [virtual]
 

Commits a transaction.

Parameters:
tm the transaction marker metadata
Exceptions:
SQLException if an error occurs

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 918 of file RAIDb2.java.

References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction(), and org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.setExpiredTimeout().

00919   {
00920     long tid = tm.getTransactionId();
00921     if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00922       waitForAllWritesToComplete(tid);
00923 
00924     try
00925     {
00926       backendNonBlockingThreadsRWLock.acquireWrite();
00927     }
00928     catch (InterruptedException e)
00929     {
00930       String msg = Translate.get(
00931           "loadbalancer.backendlist.acquire.writelock.failed", e);
00932       logger.error(msg);
00933       throw new SQLException(msg);
00934     }
00935 
00936     int nbOfThreads = backendNonBlockingThreads.size();
00937     ArrayList commitList = new ArrayList();
00938     Long iTid = new Long(tid);
00939 
00940     // Build the list of backend that need to commit this transaction
00941     for (int i = 0; i < nbOfThreads; i++)
00942     {
00943       BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
00944           .get(i);
00945       if (thread.getBackend().isStartedTransaction(iTid))
00946         commitList.add(thread);
00947     }
00948 
00949     nbOfThreads = commitList.size();
00950     if (nbOfThreads == 0)
00951     {
00952       backendNonBlockingThreadsRWLock.releaseWrite();
00953       return;
00954     }
00955 
00956     // Create the task
00957     CommitTask task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm
00958         .getTimeout(), tm.getLogin(), tid);
00959 
00960     synchronized (task)
00961     {
00962       // Post the task in each backendThread tasklist and wakeup the threads
00963       for (int i = 0; i < nbOfThreads; i++)
00964       {
00965         BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i);
00966         synchronized (thread)
00967         {
00968           thread.addTask(task, tid);
00969           thread.notify();
00970         }
00971       }
00972 
00973       backendNonBlockingThreadsRWLock.releaseWrite();
00974 
00975       // Wait for completion (notified by the task)
00976       try
00977       {
00978         // Wait on task
00979         long timeout = tm.getTimeout();
00980         if (timeout > 0)
00981         {
00982           long start = System.currentTimeMillis();
00983           task.wait(timeout);
00984           long end = System.currentTimeMillis();
00985           long remaining = timeout - (end - start);
00986           if (remaining <= 0)
00987           {
00988             if (task.setExpiredTimeout())
00989             { // Task will be ignored by all backends
00990               String msg = Translate.get("loadbalancer.commit.timeout",
00991                   new String[]{String.valueOf(tid),
00992                       String.valueOf(task.getSuccess()),
00993                       String.valueOf(task.getFailed())});
00994               logger.warn(msg);
00995               throw new SQLException(msg);
00996             }
00997             // else task execution already started, to late to cancel
00998           }
00999         }
01000         else
01001           task.wait();
01002       }
01003       catch (InterruptedException e)
01004       {
01005         if (task.setExpiredTimeout())
01006         { // Task will be ignored by all backends
01007           String msg = Translate.get("loadbalancer.commit.timeout",
01008               new String[]{String.valueOf(tid),
01009                   String.valueOf(task.getSuccess()),
01010                   String.valueOf(task.getFailed())});
01011           logger.warn(msg);
01012           throw new SQLException(msg);
01013         }
01014         // else task execution already started, to late to cancel
01015       }
01016 
01017       if (task.getSuccess() > 0)
01018         return;
01019       else
01020       { // All tasks failed
01021         ArrayList exceptions = task.getExceptions();
01022         if (exceptions == null)
01023           throw new SQLException(Translate.get(
01024               "loadbalancer.commit.all.failed", tid));
01025         else
01026         {
01027           String errorMsg = Translate.get("loadbalancer.commit.failed.stack",
01028               tid)
01029               + "\n";
01030           for (int i = 0; i < exceptions.size(); i++)
01031             errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
01032           logger.error(errorMsg);
01033           throw new SQLException(errorMsg);
01034         }
01035       }
01036     }
01037   }

synchronized void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.disableBackend DatabaseBackend  db  )  throws SQLException [virtual]
 

Disables a backend that was previously enabled.

Ask the corresponding connection manager to finalize the connections if needed.

No sanity checks are performed by this function.

Parameters:
db the database backend to disable
Exceptions:
SQLException if an error occurs

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Reimplemented in org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec.

Definition at line 1353 of file RAIDb2.java.

References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.equals(), and org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend().

01355   {
01356     if (db.isWriteEnabled())
01357     {
01358       // Start with the backendBlockingThread list
01359       try
01360       {
01361         backendBlockingThreadsRWLock.acquireWrite();
01362       }
01363       catch (InterruptedException e)
01364       {
01365         String msg = Translate.get(
01366             "loadbalancer.backendlist.acquire.writelock.failed", e);
01367         logger.error(msg);
01368         throw new SQLException(msg);
01369       }
01370 
01371       int nbOfThreads = backendBlockingThreads.size();
01372 
01373       // Find the right blocking thread
01374       for (int i = 0; i < nbOfThreads; i++)
01375       {
01376         BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01377             .get(i);
01378         if (thread.getBackend().equals(db))
01379         {
01380           logger.info(Translate
01381               .get("loadbalancer.backend.workerthread.blocking.remove", db
01382                   .getName()));
01383 
01384           backendBlockingThreads.remove(thread);
01385 
01386           synchronized (thread)
01387           {
01388             // Kill the thread
01389             thread.addPriorityTask(new KillThreadTask(1, 1));
01390             thread.notify();
01391           }
01392           break;
01393         }
01394       }
01395 
01396       backendBlockingThreadsRWLock.releaseWrite();
01397 
01398       // Continue with the backendNonBlockingThread list
01399 
01400       try
01401       {
01402         backendNonBlockingThreadsRWLock.acquireWrite();
01403       }
01404       catch (InterruptedException e)
01405       {
01406         String msg = Translate.get(
01407             "loadbalancer.backendlist.acquire.writelock.failed", e);
01408         logger.error(msg);
01409         throw new SQLException(msg);
01410       }
01411 
01412       // Find the right non-blocking thread
01413       nbOfThreads = backendNonBlockingThreads.size();
01414       for (int i = 0; i < nbOfThreads; i++)
01415       {
01416         BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
01417             .get(i);
01418         if (thread.getBackend().equals(db))
01419         {
01420           logger.info(Translate.get(
01421               "loadbalancer.backend.workerthread.non.blocking.remove", db
01422                   .getName()));
01423 
01424           backendNonBlockingThreads.remove(thread);
01425 
01426           synchronized (thread)
01427           {
01428             // Kill the thread
01429             thread.addPriorityTask(new KillThreadTask(1, 1));
01430             thread.notify();
01431           }
01432           break;
01433         }
01434       }
01435 
01436       backendNonBlockingThreadsRWLock.releaseWrite();
01437     }
01438 
01439     db.disable();
01440     if (db.isInitialized())
01441       db.finalizeConnections();
01442   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.enableBackend DatabaseBackend  db,
boolean  writeEnabled
throws SQLException [virtual]
 

Enables a Backend that was previously disabled.

Ask the corresponding connection manager to initialize the connections if needed.

No sanity checks are performed by this function.

Parameters:
db The database backend to enable
writeEnabled True if the backend must be enabled for writes
Exceptions:
SQLException if an error occurs

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Reimplemented in org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec.

Definition at line 1290 of file RAIDb2.java.

01292   {
01293     if (writeEnabled)
01294     {
01295       // Create 2 worker threads
01296       BackendWorkerThread blockingThread = new BackendWorkerThread(db, this);
01297       BackendWorkerThread nonBlockingThread = new BackendWorkerThread(db, this);
01298 
01299       // Add first to the blocking thread list
01300       try
01301       {
01302         backendBlockingThreadsRWLock.acquireWrite();
01303       }
01304       catch (InterruptedException e)
01305       {
01306         String msg = Translate.get(
01307             "loadbalancer.backendlist.acquire.writelock.failed", e);
01308         logger.error(msg);
01309         throw new SQLException(msg);
01310       }
01311       backendBlockingThreads.add(blockingThread);
01312       backendBlockingThreadsRWLock.releaseWrite();
01313       blockingThread.start();
01314       logger.info(Translate.get(
01315           "loadbalancer.backend.workerthread.blocking.add", db.getName()));
01316 
01317       // Then add to the non-blocking thread list
01318       try
01319       {
01320         backendNonBlockingThreadsRWLock.acquireWrite();
01321       }
01322       catch (InterruptedException e)
01323       {
01324         String msg = Translate.get(
01325             "loadbalancer.backendlist.acquire.writelock.failed", e);
01326         logger.error(msg);
01327         throw new SQLException(msg);
01328       }
01329       backendNonBlockingThreads.add(nonBlockingThread);
01330       backendNonBlockingThreadsRWLock.releaseWrite();
01331       nonBlockingThread.start();
01332       logger.info(Translate.get(
01333           "loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
01334       db.enableWrite();
01335     }
01336 
01337     if (!db.isInitialized())
01338       db.initializeConnections();
01339     db.enableRead();
01340   }

abstract ControllerResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execReadRequest SelectRequest  request,
MetadataCache  metadataCache
throws SQLException [pure virtual]
 

Implementation specific load balanced read execution.

Parameters:
request an SelectRequest
metadataCache the metadataCache if any or null
Returns:
the corresponding java.sql.ResultSet
Exceptions:
SQLException if an error occurs

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Implemented in org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_RR, and org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRR.

ControllerResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execReadStoredProcedure StoredProcedure  proc,
MetadataCache  metadataCache
throws SQLException [virtual]
 

See also:
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.execReadStoredProcedure(StoredProcedure, MetadataCache)

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 735 of file RAIDb2.java.

References org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask.getResult().

00737   {
00738     ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure(
00739         proc, true, metadataCache);
00740     return task.getResult();
00741   }

ControllerResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.executeRequestOnBackend SelectRequest  request,
DatabaseBackend  backend,
MetadataCache  metadataCache
throws SQLException, UnreachableBackendException [protected]
 

Execute a read request on the selected backend.

Parameters:
request the request to execute
backend the backend that will execute the request
metadataCache a metadataCache if any or null
Returns:
the ResultSet
Exceptions:
SQLException if an error occurs

Definition at line 442 of file RAIDb2.java.

00445   {
00446     // Handle macros
00447     handleMacros(request);
00448 
00449     // Ok, we have a backend, let's execute the request
00450     AbstractConnectionManager cm = backend.getConnectionManager(request
00451         .getLogin());
00452 
00453     // Sanity check
00454     if (cm == null)
00455     {
00456       String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00457           new String[]{request.getLogin(), backend.getName()});
00458       logger.error(msg);
00459       throw new SQLException(msg);
00460     }
00461 
00462     // Execute the query
00463     if (request.isAutoCommit())
00464     {
00465       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00466         // We could do something finer grain here by waiting
00467         // only for writes that depend on the tables we need
00468         // but is that really worth the overhead ?
00469         waitForAllWritesToComplete(backend);
00470 
00471       ControllerResultSet rs = null;
00472       boolean badConnection;
00473       do
00474       {
00475         badConnection = false;
00476         // Use a connection just for this request
00477         Connection c = null;
00478         try
00479         {
00480           c = cm.getConnection();
00481         }
00482         catch (UnreachableBackendException e1)
00483         {
00484           logger.error(Translate.get(
00485               "loadbalancer.backend.disabling.unreachable", backend.getName()));
00486           disableBackend(backend);
00487           throw new UnreachableBackendException(Translate.get(
00488               "loadbalancer.backend.unreacheable", backend.getName()));
00489         }
00490 
00491         // Sanity check
00492         if (c == null)
00493           throw new UnreachableBackendException(
00494               "No more connections on backend " + backend.getName());
00495 
00496         // Execute Query
00497         try
00498         {
00499           rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00500           cm.releaseConnection(c);
00501         }
00502         catch (SQLException e)
00503         {
00504           cm.releaseConnection(c);
00505           throw new SQLException(Translate.get(
00506               "loadbalancer.request.failed.on.backend", new String[]{
00507                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00508                   backend.getName(), e.getMessage()}));
00509         }
00510         catch (BadConnectionException e)
00511         { // Get rid of the bad connection
00512           cm.deleteConnection(c);
00513           badConnection = true;
00514         }
00515       }
00516       while (badConnection);
00517       if (logger.isDebugEnabled())
00518         logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00519             String.valueOf(request.getId()), backend.getName()}));
00520       return rs;
00521     }
00522     else
00523     { // Inside a transaction
00524       Connection c;
00525       long tid = request.getTransactionId();
00526       Long lTid = new Long(tid);
00527 
00528       // Wait for previous writes to complete
00529       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00530         waitForAllWritesToComplete(backend, request.getTransactionId());
00531 
00532       try
00533       {
00534         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00535       }
00536       catch (UnreachableBackendException e1)
00537       {
00538         logger.error(Translate.get(
00539             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00540         disableBackend(backend);
00541         throw new SQLException(Translate.get(
00542             "loadbalancer.backend.unreacheable", backend.getName()));
00543       }
00544       catch (NoTransactionStartWhenDisablingException e)
00545       {
00546         String msg = Translate.get("loadbalancer.backend.is.disabling",
00547             new String[]{request.getSQLShortForm(vdb.getSQLShortFormLength()),
00548                 backend.getName()});
00549         logger.error(msg);
00550         throw new SQLException(msg);
00551       }
00552 
00553       // Sanity check
00554       if (c == null)
00555         throw new SQLException(Translate.get(
00556             "loadbalancer.unable.retrieve.connection", new String[]{
00557                 String.valueOf(tid), backend.getName()}));
00558 
00559       // Execute Query
00560       ControllerResultSet rs = null;
00561       try
00562       {
00563         rs = executeSelectRequestOnBackend(request, backend, c, metadataCache);
00564       }
00565       catch (SQLException e)
00566       {
00567         throw new SQLException(Translate.get(
00568             "loadbalancer.request.failed.on.backend", new String[]{
00569                 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00570                 backend.getName(), e.getMessage()}));
00571       }
00572       catch (BadConnectionException e)
00573       { // Connection failed, so did the transaction
00574         // Disable the backend.
00575         cm.deleteConnection(tid);
00576         String msg = Translate.get(
00577             "loadbalancer.backend.disabling.connection.failure", backend
00578                 .getName());
00579         logger.error(msg);
00580         disableBackend(backend);
00581         throw new SQLException(msg);
00582       }
00583       if (logger.isDebugEnabled())
00584         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00585             new String[]{String.valueOf(tid), String.valueOf(request.getId()),
00586                 backend.getName()}));
00587       return rs;
00588     }
00589   }

ControllerResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.executeStoredProcedureOnBackend StoredProcedure  proc,
DatabaseBackend  backend,
MetadataCache  metadataCache
throws SQLException, UnreachableBackendException [protected]
 

Execute a stored procedure on the selected backend.

Parameters:
proc the stored procedure to execute
backend the backend that will execute the request
metadataCache the metadataCache if any or null
Returns:
the ResultSet
Exceptions:
SQLException if an error occurs

Definition at line 600 of file RAIDb2.java.

00603   {
00604     // Handle macros
00605     handleMacros(proc);
00606 
00607     // Ok, we have a backend, let's execute the request
00608     AbstractConnectionManager cm = backend
00609         .getConnectionManager(proc.getLogin());
00610 
00611     // Sanity check
00612     if (cm == null)
00613     {
00614       String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00615           new String[]{proc.getLogin(), backend.getName()});
00616       logger.error(msg);
00617       throw new SQLException(msg);
00618     }
00619 
00620     // Execute the query
00621     if (proc.isAutoCommit())
00622     {
00623       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00624         // We could do something finer grain here by waiting
00625         // only for writes that depend on the tables we need
00626         // but is that really worth the overhead ?
00627         waitForAllWritesToComplete(backend);
00628 
00629       // Use a connection just for this request
00630       Connection c = null;
00631       try
00632       {
00633         c = cm.getConnection();
00634       }
00635       catch (UnreachableBackendException e1)
00636       {
00637         logger.error(Translate.get(
00638             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00639         disableBackend(backend);
00640         throw new UnreachableBackendException(Translate.get(
00641             "loadbalancer.backend.unreacheable", backend.getName()));
00642       }
00643 
00644       // Sanity check
00645       if (c == null)
00646         throw new SQLException(Translate.get(
00647             "loadbalancer.backend.no.connection", backend.getName()));
00648 
00649       // Execute Query
00650       ControllerResultSet rs = null;
00651       try
00652       {
00653         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00654             backend, c, metadataCache);
00655       }
00656       catch (Exception e)
00657       {
00658         throw new SQLException(Translate.get(
00659             "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00660                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00661                 backend.getName(), e.getMessage()}));
00662       }
00663       finally
00664       {
00665         cm.releaseConnection(c);
00666       }
00667       if (logger.isDebugEnabled())
00668         logger.debug(Translate.get("loadbalancer.storedprocedure.on",
00669             new String[]{String.valueOf(proc.getId()), backend.getName()}));
00670       return rs;
00671     }
00672     else
00673     { // Inside a transaction
00674       Connection c;
00675       long tid = proc.getTransactionId();
00676       Long lTid = new Long(tid);
00677 
00678       // Wait for previous writes to complete
00679       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00680         waitForAllWritesToComplete(backend, proc.getTransactionId());
00681 
00682       try
00683       {
00684         c = backend.getConnectionForTransactionAndLazyBeginIfNeeded(lTid, cm);
00685       }
00686       catch (UnreachableBackendException e1)
00687       {
00688         logger.error(Translate.get(
00689             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00690         disableBackend(backend);
00691         throw new SQLException(Translate.get(
00692             "loadbalancer.backend.unreacheable", backend.getName()));
00693       }
00694       catch (NoTransactionStartWhenDisablingException e)
00695       {
00696         String msg = Translate.get("loadbalancer.backend.is.disabling",
00697             new String[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00698                 backend.getName()});
00699         logger.error(msg);
00700         throw new SQLException(msg);
00701       }
00702 
00703       // Sanity check
00704       if (c == null)
00705         throw new SQLException(Translate.get(
00706             "loadbalancer.unable.retrieve.connection", new String[]{
00707                 String.valueOf(tid), backend.getName()}));
00708 
00709       // Execute Query
00710       ControllerResultSet rs;
00711       try
00712       {
00713         rs = AbstractLoadBalancer.executeReadStoredProcedureOnBackend(proc,
00714             backend, c, metadataCache);
00715       }
00716       catch (Exception e)
00717       {
00718         throw new SQLException(Translate.get(
00719             "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00720                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00721                 backend.getName(), e.getMessage()}));
00722       }
00723       if (logger.isDebugEnabled())
00724         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00725             new String[]{String.valueOf(tid), String.valueOf(proc.getId()),
00726                 backend.getName()}));
00727       return rs;
00728     }
00729   }

int org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest AbstractWriteRequest  request  )  throws AllBackendsFailedException, SQLException [virtual]
 

Performs a write request. This request is broadcasted to all nodes that owns the table to be written.

Parameters:
request an AbstractWriteRequest
Returns:
number of rows affected by the request
Exceptions:
AllBackendsFailedException if all backends failed to execute the request
SQLException if an error occurs

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 167 of file RAIDb2.java.

Referenced by org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeys().

00169   {
00170     return ((WriteRequestTask) execWriteRequest(request, false, null))
00171         .getResult();
00172   }

ControllerResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeys AbstractWriteRequest  request,
MetadataCache  metadataCache
throws AllBackendsFailedException, SQLException [virtual]
 

Perform a write request and return the auto generated keys.

Parameters:
request the request to execute
metadataCache the metadataCache if any or null
Returns:
auto generated keys.
Exceptions:
AllBackendsFailedException if all backends failed to execute the request
SQLException if an error occurs

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 184 of file RAIDb2.java.

References org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest().

00187   {
00188     return ((WriteRequestWithKeysTask) execWriteRequest(request, true,
00189         metadataCache)).getResult();
00190   }

int org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteStoredProcedure StoredProcedure  proc  )  throws SQLException [virtual]
 

See also:
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.execWriteStoredProcedure(StoredProcedure)

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 746 of file RAIDb2.java.

References org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask.getResult().

00747   {
00748     WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure(
00749         proc, false, null);
00750     return task.getResult();
00751   }

abstract String org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.getRaidb2Xml  )  [pure virtual]
 

return xml formatted information about this raidb2 load balancer

Returns:
xml formatted string

Implemented in org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_RR, and org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRR.

String org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.getXmlImpl  )  [virtual]
 

See also:
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getXmlImpl

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Reimplemented in org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec.

Definition at line 1447 of file RAIDb2.java.

01448   {
01449     StringBuffer info = new StringBuffer();
01450     info.append("<" + DatabasesXmlTags.ELT_RAIDb_2 + ">");
01451     if (createTablePolicy != null)
01452       info.append(createTablePolicy.getXml());
01453     if (waitForCompletionPolicy != null)
01454       info.append(waitForCompletionPolicy.getXml());
01455     if (macroHandler != null)
01456       info.append(macroHandler.getXml());
01457     this.getRaidb2Xml();
01458     info.append("</" + DatabasesXmlTags.ELT_RAIDb_2 + ">");
01459     return info.toString();
01460   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.rollback TransactionMarkerMetaData  tm  )  throws SQLException [virtual]
 

Rollbacks a transaction.

Parameters:
tm the transaction marker metadata
Exceptions:
SQLException if an error occurs

Implements org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 1045 of file RAIDb2.java.

References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction(), and org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.setExpiredTimeout().

01046   {
01047     long tid = tm.getTransactionId();
01048     if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
01049       waitForAllWritesToComplete(tid);
01050 
01051     try
01052     {
01053       backendNonBlockingThreadsRWLock.acquireWrite();
01054     }
01055     catch (InterruptedException e)
01056     {
01057       String msg = Translate.get(
01058           "loadbalancer.backendlist.acquire.writelock.failed", e);
01059       logger.error(msg);
01060       throw new SQLException(msg);
01061     }
01062     int nbOfThreads = backendNonBlockingThreads.size();
01063     ArrayList rollbackList = new ArrayList();
01064     Long iTid = new Long(tid);
01065 
01066     // Build the list of backend that need to rollback this transaction
01067     for (int i = 0; i < nbOfThreads; i++)
01068     {
01069       BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
01070           .get(i);
01071       if (thread.getBackend().isStartedTransaction(iTid))
01072         rollbackList.add(thread);
01073     }
01074 
01075     nbOfThreads = rollbackList.size();
01076     if (nbOfThreads == 0)
01077     {
01078       backendNonBlockingThreadsRWLock.releaseWrite();
01079       return;
01080     }
01081 
01082     // Create the task
01083     RollbackTask task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads,
01084         tm.getTimeout(), tm.getLogin(), tid);
01085 
01086     synchronized (task)
01087     {
01088       // Post the task in each backendThread tasklist and wakeup the threads
01089       for (int i = 0; i < nbOfThreads; i++)
01090       {
01091         BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i);
01092         synchronized (thread)
01093         {
01094           thread.addTask(task, tid);
01095           thread.notify();
01096         }
01097       }
01098 
01099       backendNonBlockingThreadsRWLock.releaseWrite();
01100 
01101       // Wait for completion (notified by the task)
01102       try
01103       {
01104         // Wait on task
01105         long timeout = tm.getTimeout();
01106         if (timeout > 0)
01107         {
01108           long start = System.currentTimeMillis();
01109           task.wait(timeout);
01110           long end = System.currentTimeMillis();
01111           long remaining = timeout - (end - start);
01112           if (remaining <= 0)
01113           {
01114             if (task.setExpiredTimeout())
01115             { // Task will be ignored by all backends
01116               String msg = Translate.get("loadbalancer.rollback.timeout",
01117                   new String[]{String.valueOf(tid),
01118                       String.valueOf(task.getSuccess()),
01119                       String.valueOf(task.getFailed())});
01120               logger.warn(msg);
01121               throw new SQLException(msg);
01122             }
01123             // else task execution already started, to late to cancel
01124           }
01125         }
01126         else
01127           task.wait();
01128       }
01129       catch (InterruptedException e)
01130       {
01131         if (task.setExpiredTimeout())
01132         { // Task will be ignored by all backends
01133           String msg = Translate.get("loadbalancer.rollback.timeout",
01134               new String[]{String.valueOf(tid),
01135                   String.valueOf(task.getSuccess()),
01136                   String.valueOf(task.getFailed())});
01137           logger.warn(msg);
01138           throw new SQLException(msg);
01139         }
01140         // else task execution already started, to late to cancel
01141       }
01142 
01143       if (task.getSuccess() > 0)
01144         return;
01145       else
01146       { // All tasks failed
01147         ArrayList exceptions = task.getExceptions();
01148         if (exceptions == null)
01149           throw new SQLException(Translate.get(
01150               "loadbalancer.rollback.all.failed", tid));
01151         else
01152         {
01153           String errorMsg = Translate.get("loadbalancer.rollback.failed.stack",
01154               tid)
01155               + "\n";
01156           for (int i = 0; i < exceptions.size(); i++)
01157             errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
01158           logger.error(errorMsg);
01159           throw new SQLException(errorMsg);
01160         }
01161       }
01162     }
01163   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.waitForAllWritesToComplete DatabaseBackend  backend  )  throws SQLException [protected]
 

Waits for all writes in the blocking thread queue of the given backend to complete.

See also:
executeRequestOnBackend

Definition at line 1243 of file RAIDb2.java.

References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), and org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().

01245   {
01246     try
01247     {
01248       backendBlockingThreadsRWLock.acquireWrite();
01249       // Lock in write to ensure that all writes are posted and we wait in the
01250       // queue, else a read lock has the priority with the implementation we are
01251       // using.
01252     }
01253     catch (InterruptedException e)
01254     {
01255       String msg = Translate.get(
01256           "loadbalancer.backendlist.acquire.writelock.failed", e);
01257       logger.error(msg);
01258       throw new SQLException(msg);
01259     }
01260 
01261     int nbOfThreads = backendBlockingThreads.size();
01262 
01263     for (int i = 0; i < nbOfThreads; i++)
01264     {
01265       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01266           .get(i);
01267       if (thread.getBackend() == backend)
01268         thread.waitForAllTasksToComplete();
01269     }
01270 
01271     backendBlockingThreadsRWLock.releaseWrite();
01272   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.waitForAllWritesToComplete DatabaseBackend  backend,
long  transactionId
throws SQLException [protected]
 

Waits for all writes of the given transaction in the blocking thread queue of the given backend to complete before being able to complete the transaction.

See also:
executeRequestOnBackend

Definition at line 1206 of file RAIDb2.java.

References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), and org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().

01208   {
01209     try
01210     {
01211       backendBlockingThreadsRWLock.acquireWrite();
01212       // Lock in write to ensure that all writes are posted and we wait in the
01213       // queue, else a read lock has the priority with the implementation we are
01214       // using.
01215     }
01216     catch (InterruptedException e)
01217     {
01218       String msg = Translate.get(
01219           "loadbalancer.backendlist.acquire.writelock.failed", e);
01220       logger.error(msg);
01221       throw new SQLException(msg);
01222     }
01223 
01224     int nbOfThreads = backendBlockingThreads.size();
01225 
01226     for (int i = 0; i < nbOfThreads; i++)
01227     {
01228       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01229           .get(i);
01230       if (thread.getBackend() == backend)
01231         thread.waitForAllTasksToComplete(transactionId);
01232     }
01233 
01234     backendBlockingThreadsRWLock.releaseWrite();
01235   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.waitForAllWritesToComplete long  transactionId  )  throws SQLException [protected]
 

Waits for all writes of the given transaction in the blocking thread queue to complete before being able to complete the transaction.

Definition at line 1169 of file RAIDb2.java.

References org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().

01171   {
01172     try
01173     {
01174       backendBlockingThreadsRWLock.acquireWrite();
01175       // Lock in write to ensure that all writes are posted and we wait in the
01176       // queue, else a read lock has the priority with the implementation we are
01177       // using.
01178     }
01179     catch (InterruptedException e)
01180     {
01181       String msg = Translate.get(
01182           "loadbalancer.backendlist.acquire.writelock.failed", e);
01183       logger.error(msg);
01184       throw new SQLException(msg);
01185     }
01186 
01187     int nbOfThreads = backendBlockingThreads.size();
01188 
01189     for (int i = 0; i < nbOfThreads; i++)
01190     {
01191       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01192           .get(i);
01193       thread.waitForAllTasksToComplete(transactionId);
01194     }
01195 
01196     backendBlockingThreadsRWLock.releaseWrite();
01197   }


Member Data Documentation

Trace org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.logger [static, protected]
 

Initial value:

 Trace
                                                                            .getLogger("org.objectweb.cjdbc.controller.loadbalancer.raidb2")

Reimplemented from org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.

Definition at line 95 of file RAIDb2.java.


The documentation for this class was generated from the following file:
Generated on Mon Apr 11 22:04:08 2005 for C-JDBC by  doxygen 1.3.9.1