クラス org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2

org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2に対する継承グラフ

Inheritance graph
[凡例]
org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2のコラボレーション図

Collaboration graph
[凡例]
すべてのメンバ一覧

説明

RAIDb-2 load balancer.

This class is an abstract call because the read requests coming from the Request Manager are NOT treated here but in the subclasses. Transaction management and write requests are broadcasted to all backends owning the written table.

作者:
Emmanuel Cecchet
バージョン:
1.0

RAIDb2.java76 行で定義されています。

Public メソッド

 RAIDb2 (VirtualDatabase vdb, WaitForCompletionPolicy waitForCompletionPolicy, CreateTablePolicy createTablePolicy, long timestampResolution) throws SQLException
int execWriteRequest (AbstractWriteRequest request) throws AllBackendsFailedException, SQLException
ResultSet execWriteRequestWithKeys (AbstractWriteRequest request) throws AllBackendsFailedException, SQLException
abstract java.sql.ResultSet execReadRequest (SelectRequest request) throws SQLException
ResultSet execReadStoredProcedure (StoredProcedure proc) throws SQLException
int execWriteStoredProcedure (StoredProcedure proc) throws SQLException
final void begin (TransactionMarkerMetaData tm) throws SQLException
void commit (TransactionMarkerMetaData tm) throws SQLException
void rollback (TransactionMarkerMetaData tm) throws SQLException
void enableBackend (DatabaseBackend db, boolean writeEnabled) throws SQLException
synchronized void disableBackend (DatabaseBackend db) throws SQLException
String getXmlImpl ()
abstract String getRaidb2Xml ()
int getRAIDbLevel ()
void setRAIDbLevel (int raidbLevel)
int getParsingGranularity ()
void setParsingGranularity (int parsingGranularity)
abstract ResultSet execReadOnlyReadStoredProcedure (StoredProcedure proc) throws SQLException
void setWeight (String name, int w) throws SQLException
abstract String getInformation ()
String getXml ()

Protected メソッド

java.sql.ResultSet executeRequestOnBackend (SelectRequest request, DatabaseBackend backend) throws SQLException, UnreachableBackendException
java.sql.ResultSet executeStoredProcedureOnBackend (StoredProcedure proc, DatabaseBackend backend) throws SQLException, UnreachableBackendException
void waitForAllWritesToComplete (long transactionId) throws SQLException
void waitForAllWritesToComplete (DatabaseBackend backend, long transactionId) throws SQLException
void waitForAllWritesToComplete (DatabaseBackend backend) throws SQLException
ResultSet executeStatementOnBackend (SelectRequest request, DatabaseBackend backend, Connection c) throws SQLException, BadConnectionException

Protected 変数

ArrayList backendBlockingThreads
ArrayList backendNonBlockingThreads
ReadPrioritaryFIFOWriteLock backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock()
ReadPrioritaryFIFOWriteLock backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock()
WaitForCompletionPolicy waitForCompletionPolicy
CreateTablePolicy createTablePolicy
VirtualDatabase vdb
int raidbLevel
int parsingGranularity

Static Protected 変数

Trace logger

Private メソッド

int getNbToWait (int nbOfThreads)
void execWriteRequest (AbstractWriteRequest request, boolean useKeys) throws AllBackendsFailedException, SQLException
AbstractTask callStoredProcedure (StoredProcedure proc, boolean isRead) throws SQLException

Private 変数

long timestampResolution
int execWriteRequestResult = 0
ResultSet execWriteRequestWithKeysResult = null


コンストラクタとデストラクタ

org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.RAIDb2 VirtualDatabase  vdb,
WaitForCompletionPolicy  waitForCompletionPolicy,
CreateTablePolicy  createTablePolicy,
long  timestampResolution
throws SQLException
 

Creates a new RAIDb-1 Round Robin request load balancer. A new backend worker thread is created for each backend.

引数:
vdb the virtual database this load balancer belongs to.
waitForCompletionPolicy how many backends must complete before returning the result ?
createTablePolicy the policy defining how 'create table' statements should be handled
timestampResolution timestamp resolution for NOW macros
例外:
SQLException if an error occurs

RAIDb2.java115 行で定義されています。

00119   {
00120     super(vdb, RAIDbLevels.RAIDb2, ParsingGranularities.TABLE);
00121 
00122     this.waitForCompletionPolicy = waitForCompletionPolicy;
00123     backendBlockingThreads = new ArrayList();
00124     backendNonBlockingThreads = new ArrayList();
00125     this.createTablePolicy = createTablePolicy;
00126     this.timestampResolution = timestampResolution;
00127   }


メソッド

final void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.begin TransactionMarkerMetaData  tm  )  throws SQLException [virtual]
 

Begins a new transaction.

引数:
tm the transaction marker metadata
例外:
SQLException if an error occurs

org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています.

RAIDb2.java1005 行で定義されています。

01006   {
01007   }

AbstractTask org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.callStoredProcedure StoredProcedure  proc,
boolean  isRead
throws SQLException [private]
 

Post the stored procedure call in the threads task list.

引数:
proc the stored procedure to call
isRead true if the call returns a ResultSet
戻り値:
the task that has been executed (caller can get the result by calling getResult())
例外:
SQLException if an error occurs

RAIDb2.java843 行で定義されています。

参照先 org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.acquireRead(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.hasStoredProcedure(), org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.releaseRead(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.removeTask(), と org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.setTotalNb().

00845   {
00846     ArrayList backendThreads = backendBlockingThreads;
00847     ReadPrioritaryFIFOWriteLock lock = backendBlockingThreadsRWLock;
00848 
00849     try
00850     {
00851       lock.acquireRead();
00852     }
00853     catch (InterruptedException e)
00854     {
00855       String msg = Translate.get(
00856           "loadbalancer.backendlist.acquire.readlock.failed", e);
00857       logger.error(msg);
00858       throw new SQLException(msg);
00859     }
00860 
00861     int nbOfThreads = backendThreads.size();
00862 
00863     // Create the task
00864     AbstractTask task;
00865     if (isRead)
00866       task = new ReadStoredProcedureTask(getNbToWait(nbOfThreads), nbOfThreads,
00867           proc);
00868     else
00869       task = new WriteStoredProcedureTask(getNbToWait(nbOfThreads),
00870           nbOfThreads, proc);
00871 
00872     synchronized (task)
00873     {
00874       int nbOfBackends = 0;
00875 
00876       // Post the task in each backendThread tasklist and wakeup the threads
00877       for (int i = 0; i < nbOfThreads; i++)
00878       {
00879         BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00880             .get(i);
00881         if (thread.getBackend().hasStoredProcedure(proc.getProcedureName()))
00882         {
00883           nbOfBackends++;
00884           synchronized (thread)
00885           {
00886             if ((waitForCompletionPolicy.getPolicy() == WaitForCompletionPolicy.ALL))
00887               thread.addTask(task);
00888             else
00889               thread.addTask(task, proc.getTransactionId());
00890             thread.notify();
00891           }
00892         }
00893         if (nbOfBackends == 0)
00894           throw new SQLException(Translate.get(
00895               "loadbalancer.backend.no.required.storedprocedure", proc
00896                   .getProcedureName()));
00897         else
00898           task.setTotalNb(nbOfBackends);
00899       }
00900 
00901       lock.releaseRead();
00902 
00903       // Wait for completion (notified by the task)
00904       try
00905       {
00906         // Wait on task
00907         long timeout = proc.getTimeout() * 1000;
00908         if (timeout > 0)
00909         {
00910           long start = System.currentTimeMillis();
00911           task.wait(timeout);
00912           long end = System.currentTimeMillis();
00913           long remaining = timeout - (end - start);
00914           if (remaining <= 0)
00915           {
00916             String msg = Translate.get("loadbalancer.storedprocedure.timeout",
00917                 new String[]{String.valueOf(proc.getId()),
00918                     String.valueOf(task.getSuccess()),
00919                     String.valueOf(task.getFailed())});
00920 
00921             // Try to remove the request from the task list
00922             lock.acquireRead();
00923             nbOfThreads = backendThreads.size();
00924             for (int i = 0; i < nbOfThreads; i++)
00925             {
00926               BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00927                   .get(i);
00928               synchronized (thread)
00929               {
00930                 thread.removeTask(task);
00931               }
00932             }
00933             lock.releaseRead();
00934 
00935             logger.warn(msg);
00936             throw new SQLException(msg);
00937           }
00938           // No need to update request timeout since the execution is finished
00939         }
00940         else
00941           task.wait();
00942       }
00943       catch (InterruptedException e)
00944       {
00945         // Try to remove the request from the task list
00946         try
00947         {
00948           lock.acquireRead();
00949           nbOfThreads = backendThreads.size();
00950         }
00951         catch (InterruptedException ignore)
00952         {
00953           nbOfThreads = 0; // Give up
00954         }
00955         for (int i = 0; i < nbOfThreads; i++)
00956         {
00957           BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00958               .get(i);
00959           synchronized (thread)
00960           {
00961             thread.removeTask(task);
00962           }
00963         }
00964         lock.releaseRead();
00965 
00966         throw new SQLException(Translate.get(
00967             "loadbalancer.storedprocedure.timeout", new String[]{
00968                 String.valueOf(proc.getId()),
00969                 String.valueOf(task.getSuccess()),
00970                 String.valueOf(task.getFailed())}));
00971       }
00972 
00973       if (task.getSuccess() > 0)
00974         return task;
00975       else
00976       { // All tasks failed
00977         ArrayList exceptions = task.getExceptions();
00978         if (exceptions == null)
00979           throw new SQLException(Translate.get(
00980               "loadbalancer.storedprocedure.all.failed", proc.getId()));
00981         else
00982         {
00983           String errorMsg = Translate.get(
00984               "loadbalancer.storedprocedure.failed.stack", proc.getId())
00985               + "\n";
00986           for (int i = 0; i < exceptions.size(); i++)
00987             errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
00988           logger.error(errorMsg);
00989           throw new SQLException(errorMsg);
00990         }
00991       }
00992     }
00993   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.commit TransactionMarkerMetaData  tm  )  throws SQLException [virtual]
 

Commits a transaction.

引数:
tm the transaction marker metadata
例外:
SQLException if an error occurs

org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています.

RAIDb2.java1015 行で定義されています。

参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().

01016   {
01017     if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
01018       waitForAllWritesToComplete(tm.getTransactionId());
01019 
01020     try
01021     {
01022       backendNonBlockingThreadsRWLock.acquireRead();
01023     }
01024     catch (InterruptedException e)
01025     {
01026       String msg = Translate.get(
01027           "loadbalancer.backendlist.acquire.readlock.failed", e);
01028       logger.error(msg);
01029       throw new SQLException(msg);
01030     }
01031 
01032     int nbOfThreads = backendNonBlockingThreads.size();
01033     ArrayList commitList = new ArrayList();
01034     Long iTid = new Long(tm.getTransactionId());
01035 
01036     // Build the list of backend that need to commit this transaction
01037     for (int i = 0; i < nbOfThreads; i++)
01038     {
01039       BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
01040           .get(i);
01041       if (thread.getBackend().isStartedTransaction(iTid))
01042         commitList.add(thread);
01043     }
01044 
01045     nbOfThreads = commitList.size();
01046     if (nbOfThreads == 0)
01047     {
01048       backendNonBlockingThreadsRWLock.releaseRead();
01049       return;
01050     }
01051 
01052     // Create the task
01053     CommitTask task = new CommitTask(getNbToWait(nbOfThreads), nbOfThreads, tm
01054         .getTimeout(), tm.getLogin(), tm.getTransactionId());
01055 
01056     synchronized (task)
01057     {
01058       // Post the task in each backendThread tasklist and wakeup the threads
01059       for (int i = 0; i < nbOfThreads; i++)
01060       {
01061         BackendWorkerThread thread = (BackendWorkerThread) commitList.get(i);
01062         synchronized (thread)
01063         {
01064           thread.addTask(task);
01065           thread.notify();
01066         }
01067       }
01068 
01069       backendNonBlockingThreadsRWLock.releaseRead();
01070 
01071       // Wait for completion (notified by the task)
01072       try
01073       {
01074         // Wait on task
01075         long timeout = tm.getTimeout();
01076         if (timeout > 0)
01077         {
01078           long start = System.currentTimeMillis();
01079           task.wait(timeout);
01080           long end = System.currentTimeMillis();
01081           long remaining = timeout - (end - start);
01082           if (remaining <= 0)
01083           {
01084             String msg = Translate.get("loadbalancer.commit.timeout",
01085                 new String[]{String.valueOf(tm.getTransactionId()),
01086                     String.valueOf(task.getSuccess()),
01087                     String.valueOf(task.getFailed())});
01088             logger.warn(msg);
01089             throw new SQLException(msg);
01090           }
01091         }
01092         else
01093           task.wait();
01094       }
01095       catch (InterruptedException e)
01096       {
01097         throw new SQLException(Translate.get("loadbalancer.commit.timeout",
01098             new String[]{String.valueOf(tm.getTransactionId()),
01099                 String.valueOf(task.getSuccess()),
01100                 String.valueOf(task.getFailed())}));
01101       }
01102 
01103       if (task.getSuccess() > 0)
01104         return;
01105       else
01106       { // All tasks failed
01107         ArrayList exceptions = task.getExceptions();
01108         if (exceptions == null)
01109           throw new SQLException(Translate.get(
01110               "loadbalancer.commit.all.failed", tm.getTransactionId()));
01111         else
01112         {
01113           String errorMsg = Translate.get("loadbalancer.commit.failed.stack",
01114               tm.getTransactionId())
01115               + "\n";
01116           for (int i = 0; i < exceptions.size(); i++)
01117             errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
01118           logger.error(errorMsg);
01119           throw new SQLException(errorMsg);
01120         }
01121       }
01122     }
01123   }

synchronized void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.disableBackend DatabaseBackend  db  )  throws SQLException [virtual]
 

Disables a backend that was previously enabled.

Ask the corresponding connection manager to finalize the connections if needed.

No sanity checks are performed by this function.

引数:
db the database backend to disable
例外:
SQLException if an error occurs

org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています.

org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。

RAIDb2.java1421 行で定義されています。

参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addPriorityTask(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.equals(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend().

01423   {
01424     if (db.isWriteEnabled())
01425     {
01426       // Start with the backendBlockingThread list
01427       try
01428       {
01429         backendBlockingThreadsRWLock.acquireWrite();
01430       }
01431       catch (InterruptedException e)
01432       {
01433         String msg = Translate.get(
01434             "loadbalancer.backendlist.acquire.writelock.failed", e);
01435         logger.error(msg);
01436         throw new SQLException(msg);
01437       }
01438 
01439       int nbOfThreads = backendBlockingThreads.size();
01440 
01441       // Find the right blocking thread
01442       for (int i = 0; i < nbOfThreads; i++)
01443       {
01444         BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01445             .get(i);
01446         if (thread.getBackend().equals(db))
01447         {
01448           logger.info(Translate
01449               .get("loadbalancer.backend.workerthread.blocking.remove", db
01450                   .getName()));
01451 
01452           backendBlockingThreads.remove(thread);
01453 
01454           synchronized (thread)
01455           {
01456             // Kill the thread
01457             thread.addPriorityTask(new KillThreadTask(1, 1));
01458             thread.notify();
01459           }
01460           break;
01461         }
01462       }
01463 
01464       backendBlockingThreadsRWLock.releaseWrite();
01465 
01466       // Continue with the backendNonBlockingThread list
01467 
01468       try
01469       {
01470         backendNonBlockingThreadsRWLock.acquireWrite();
01471       }
01472       catch (InterruptedException e)
01473       {
01474         String msg = Translate.get(
01475             "loadbalancer.backendlist.acquire.writelock.failed", e);
01476         logger.error(msg);
01477         throw new SQLException(msg);
01478       }
01479 
01480       // Find the right non-blocking thread
01481       nbOfThreads = backendNonBlockingThreads.size();
01482       for (int i = 0; i < nbOfThreads; i++)
01483       {
01484         BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
01485             .get(i);
01486         if (thread.getBackend().equals(db))
01487         {
01488           logger.info(Translate.get(
01489               "loadbalancer.backend.workerthread.non.blocking.remove", db
01490                   .getName()));
01491 
01492           backendNonBlockingThreads.remove(thread);
01493 
01494           synchronized (thread)
01495           {
01496             // Kill the thread
01497             thread.addPriorityTask(new KillThreadTask(1, 1));
01498             thread.notify();
01499           }
01500           break;
01501         }
01502       }
01503 
01504       backendNonBlockingThreadsRWLock.releaseWrite();
01505     }
01506 
01507     db.disable();
01508     if (db.isInitialized())
01509       db.finalizeConnections();
01510   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.enableBackend DatabaseBackend  db,
boolean  writeEnabled
throws SQLException [virtual]
 

Enables a Backend that was previously disabled.

Ask the corresponding connection manager to initialize the connections if needed.

No sanity checks are performed by this function.

引数:
db The database backend to enable
writeEnabled True if the backend must be enabled for writes
例外:
SQLException if an error occurs

org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています.

org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。

RAIDb2.java1356 行で定義されています。

参照先 org.objectweb.cjdbc.controller.backend.DatabaseBackend.enableWrite(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.getName().

01358   {
01359     if (writeEnabled)
01360     {
01361       // Create 2 worker threads
01362       BackendWorkerThread blockingThread = new BackendWorkerThread(
01363           (DatabaseBackend) db, this);
01364       BackendWorkerThread nonBlockingThread = new BackendWorkerThread(
01365           (DatabaseBackend) db, this);
01366 
01367       // Add first to the blocking thread list
01368       try
01369       {
01370         backendBlockingThreadsRWLock.acquireWrite();
01371       }
01372       catch (InterruptedException e)
01373       {
01374         String msg = Translate.get(
01375             "loadbalancer.backendlist.acquire.writelock.failed", e);
01376         logger.error(msg);
01377         throw new SQLException(msg);
01378       }
01379       backendBlockingThreads.add(blockingThread);
01380       backendBlockingThreadsRWLock.releaseWrite();
01381       blockingThread.start();
01382       logger.info(Translate.get(
01383           "loadbalancer.backend.workerthread.blocking.add", db.getName()));
01384 
01385       // Then add to the non-blocking thread list
01386       try
01387       {
01388         backendNonBlockingThreadsRWLock.acquireWrite();
01389       }
01390       catch (InterruptedException e)
01391       {
01392         String msg = Translate.get(
01393             "loadbalancer.backendlist.acquire.writelock.failed", e);
01394         logger.error(msg);
01395         throw new SQLException(msg);
01396       }
01397       backendNonBlockingThreads.add(nonBlockingThread);
01398       backendNonBlockingThreadsRWLock.releaseWrite();
01399       nonBlockingThread.start();
01400       logger.info(Translate.get(
01401           "loadbalancer.backend.workerthread.non.blocking.add", db.getName()));
01402       db.enableWrite();
01403     }
01404 
01405     if (!db.isInitialized())
01406       db.initializeConnections();
01407     db.enableRead();
01408   }

abstract ResultSet org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.execReadOnlyReadStoredProcedure StoredProcedure  proc  )  throws SQLException [pure virtual, inherited]
 

Call a read-only stored procedure that returns a ResultSet. The stored procedure will be executed by one node only.

引数:
proc the stored procedure call
戻り値:
a java.sql.ResultSet value
例外:
SQLException if an error occurs

org.objectweb.cjdbc.controller.loadbalancer.paralleldb.ParallelDB, org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRR, と org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDBを実装しています.

abstract java.sql.ResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execReadRequest SelectRequest  request  )  throws SQLException [pure virtual]
 

Implementation specific load balanced read execution.

引数:
request an SelectRequest
戻り値:
the corresponding java.sql.ResultSet
例外:
SQLException if an error occurs

org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています.

org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_RR, と org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRRを実装しています.

ResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execReadStoredProcedure StoredProcedure  proc  )  throws SQLException [virtual]
 

参照:
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.execReadStoredProcedure(org.objectweb.cjdbc.common.sql.StoredProcedure)

org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています.

RAIDb2.java816 行で定義されています。

参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask.getResult().

00818   {
00819     ReadStoredProcedureTask task = (ReadStoredProcedureTask) callStoredProcedure(
00820         proc, true);
00821     return task.getResult();
00822   }

java.sql.ResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.executeRequestOnBackend SelectRequest  request,
DatabaseBackend  backend
throws SQLException, UnreachableBackendException [protected]
 

Execute a read request on the selected backend.

引数:
request the request to execute
backend the backend that will execute the request
戻り値:
the ResultSet
例外:
SQLException if an error occurs

RAIDb2.java492 行で定義されています。

00494   {
00495     // Handle macros
00496     request.setSQL(RequestManager.handleSQLMacros(request.getSQL(),
00497         timestampResolution, false));
00498 
00499     // Ok, we have a backend, let's execute the request
00500     AbstractConnectionManager cm = backend.getConnectionManager(request
00501         .getLogin());
00502 
00503     // Sanity check
00504     if (cm == null)
00505     {
00506       String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00507           new String[]{request.getLogin(), backend.getName()});
00508       logger.error(msg);
00509       throw new SQLException(msg);
00510     }
00511 
00512     // Execute the query
00513     if (request.isAutoCommit())
00514     {
00515       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00516         // We could do something finer grain here by waiting
00517         // only for writes that depend on the tables we need
00518         // but is that really worth the overhead ?
00519         waitForAllWritesToComplete(backend);
00520 
00521       ResultSet rs = null;
00522       boolean badConnection;
00523       do
00524       {
00525         badConnection = false;
00526         // Use a connection just for this request
00527         Connection c = null;
00528         try
00529         {
00530           c = cm.getConnection();
00531         }
00532         catch (UnreachableBackendException e1)
00533         {
00534           logger.error(Translate.get(
00535               "loadbalancer.backend.disabling.unreachable", backend.getName()));
00536           backend.disable();
00537           throw new UnreachableBackendException(Translate.get(
00538               "loadbalancer.backend.unreacheable", backend.getName()));
00539         }
00540 
00541         // Sanity check
00542         if (c == null)
00543           throw new UnreachableBackendException(
00544               "No more connections on backend " + backend.getName());
00545 
00546         // Execute Query
00547         try
00548         {
00549           rs = executeStatementOnBackend(request, backend, c);
00550           cm.releaseConnection(c);
00551         }
00552         catch (SQLException e)
00553         {
00554           cm.releaseConnection(c);
00555           throw new SQLException(Translate.get(
00556               "loadbalancer.request.failed.on.backend", new String[]{
00557                   request.getSQLShortForm(vdb.getSQLShortFormLength()),
00558                   backend.getName(), e.getMessage()}));
00559         }
00560         catch (BadConnectionException e)
00561         { // Get rid of the bad connection
00562           cm.deleteConnection(c);
00563           badConnection = true;
00564         }
00565       }
00566       while (badConnection);
00567       if (logger.isDebugEnabled())
00568         logger.debug(Translate.get("loadbalancer.execute.on", new String[]{
00569             String.valueOf(request.getId()), backend.getName()}));
00570       return rs;
00571     }
00572     else
00573     { // Inside a transaction
00574       Connection c;
00575       long tid = request.getTransactionId();
00576       Long lTid = new Long(tid);
00577 
00578       // Wait for previous writes to complete
00579       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00580         waitForAllWritesToComplete(backend, request.getTransactionId());
00581 
00582       if (!backend.isStartedTransaction(lTid))
00583       { // transaction has not been started yet on this backend
00584         try
00585         {
00586           c = cm.getConnection(tid);
00587         }
00588         catch (UnreachableBackendException e1)
00589         {
00590           logger.error(Translate.get(
00591               "loadbalancer.backend.disabling.unreachable", backend.getName()));
00592           backend.disable();
00593           throw new UnreachableBackendException(Translate.get(
00594               "loadbalancer.backend.unreacheable", backend.getName()));
00595         }
00596 
00597         // Sanity check
00598         if (c == null)
00599           throw new SQLException(Translate.get(
00600               "loadbalancer.unable.get.connection", new String[]{
00601                   String.valueOf(tid), backend.getName()}));
00602 
00603         // begin transaction
00604         backend.startTransaction(lTid);
00605         c.setAutoCommit(false);
00606       }
00607       else
00608       { // Re-use the connection used by this transaction
00609         c = cm.retrieveConnection(tid);
00610 
00611         // Sanity check
00612         if (c == null)
00613           throw new SQLException(Translate.get(
00614               "loadbalancer.unable.retrieve.connection", new String[]{
00615                   String.valueOf(tid), backend.getName()}));
00616       }
00617 
00618       // Execute Query
00619       ResultSet rs = null;
00620       try
00621       {
00622         rs = executeStatementOnBackend(request, backend, c);
00623       }
00624       catch (SQLException e)
00625       {
00626         throw new SQLException(Translate.get(
00627             "loadbalancer.request.failed.on.backend", new String[]{
00628                 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00629                 backend.getName(), e.getMessage()}));
00630       }
00631       catch (BadConnectionException e)
00632       { // Connection failed, so did the transaction
00633         // Disable the backend.
00634         cm.deleteConnection(tid);
00635         String msg = Translate.get(
00636             "loadbalancer.backend.disabling.connection.failure", backend
00637                 .getName());
00638         logger.error(msg);
00639         backend.disable();
00640         throw new SQLException(msg);
00641       }
00642       if (logger.isDebugEnabled())
00643         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00644             new String[]{String.valueOf(tid), String.valueOf(request.getId()),
00645                 backend.getName()}));
00646       return rs;
00647     }
00648   }

ResultSet org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.executeStatementOnBackend SelectRequest  request,
DatabaseBackend  backend,
Connection  c
throws SQLException, BadConnectionException [protected, inherited]
 

Execute a statement on a backend. If the execution fails, the connection is checked for validity. If the connection was not valid, the query is automatically retried on a new connection.

引数:
request the request to execute
backend the backend on which the request is executed
c connection used to create the statement
戻り値:
the ResultSet
例外:
SQLException if an error occurs

AbstractLoadBalancer.java251 行で定義されています。

参照先 java.sql.Statement.executeQuery(), java.sql.Statement.setCursorName(), java.sql.Statement.setFetchSize(), java.sql.Statement.setMaxRows(), java.sql.Statement.setQueryTimeout(), org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetCursorName, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetFetchSize, org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetMaxRows, と org.objectweb.cjdbc.controller.backend.DriverCompliance.supportSetQueryTimeout.

参照元 org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDB.execReadRequest().

00254   {
00255     ResultSet rs = null;
00256     try
00257     {
00258       backend.addPendingReadRequest(request);
00259       String sql = request.getSQL();
00260       // Rewrite the query if needed
00261       sql = backend.rewriteQuery(sql);
00262       // Execute the query
00263       Statement s = c.createStatement();
00264       DriverCompliance driverCompliance = backend.getDriverCompliance();
00265       if (driverCompliance.supportSetQueryTimeout())
00266         s.setQueryTimeout(request.getTimeout());
00267       if ((request.getCursorName() != null)
00268           && (driverCompliance.supportSetCursorName()))
00269         s.setCursorName(request.getCursorName());
00270       if ((request.getFetchSize() != 0)
00271           && driverCompliance.supportSetFetchSize())
00272         s.setFetchSize(request.getFetchSize());
00273       if ((request.getMaxRows() > 0) && driverCompliance.supportSetMaxRows())
00274         s.setMaxRows(request.getMaxRows());
00275       rs = s.executeQuery(sql);
00276     }
00277     catch (SQLException e)
00278     { // Something bad happened
00279       if (backend.isValidConnection(c))
00280         throw e; // Connection is valid, throw the exception
00281       else
00282         throw new BadConnectionException();
00283     }
00284     finally
00285     {
00286       backend.removePendingRequest(request);
00287     }
00288     return rs;
00289   }

java.sql.ResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.executeStoredProcedureOnBackend StoredProcedure  proc,
DatabaseBackend  backend
throws SQLException, UnreachableBackendException [protected]
 

Execute a stored procedure on the selected backend.

引数:
proc the stored procedure to execute
backend the backend that will execute the request
戻り値:
the ResultSet
例外:
SQLException if an error occurs

RAIDb2.java658 行で定義されています。

00661   {
00662     // Handle macros
00663     proc.setSQL(RequestManager.handleSQLMacros(proc.getSQL(),
00664         timestampResolution, false));
00665 
00666     // Ok, we have a backend, let's execute the request
00667     AbstractConnectionManager cm = backend
00668         .getConnectionManager(proc.getLogin());
00669 
00670     // Sanity check
00671     if (cm == null)
00672     {
00673       String msg = Translate.get("loadbalancer.connectionmanager.not.found",
00674           new String[]{proc.getLogin(), backend.getName()});
00675       logger.error(msg);
00676       throw new SQLException(msg);
00677     }
00678 
00679     // Execute the query
00680     if (proc.isAutoCommit())
00681     {
00682       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00683         // We could do something finer grain here by waiting
00684         // only for writes that depend on the tables we need
00685         // but is that really worth the overhead ?
00686         waitForAllWritesToComplete(backend);
00687 
00688       // Use a connection just for this request
00689       Connection c = null;
00690       try
00691       {
00692         c = cm.getConnection();
00693       }
00694       catch (UnreachableBackendException e1)
00695       {
00696         logger.error(Translate.get(
00697             "loadbalancer.backend.disabling.unreachable", backend.getName()));
00698         backend.disable();
00699         throw new UnreachableBackendException(Translate.get(
00700             "loadbalancer.backend.unreacheable", backend.getName()));
00701       }
00702 
00703       // Sanity check
00704       if (c == null)
00705         throw new SQLException(Translate.get(
00706             "loadbalancer.backend.no.connection", backend.getName()));
00707 
00708       // Execute Query
00709       ResultSet rs = null;
00710       try
00711       {
00712         // We suppose here that the request does not modify the schema since
00713         // it is a read-only query.
00714         CallableStatement cs = c.prepareCall(proc.getSQL());
00715         if (backend.getDriverCompliance().supportSetQueryTimeout())
00716           cs.setQueryTimeout(proc.getTimeout());
00717         if ((proc.getMaxRows() > 0)
00718             && backend.getDriverCompliance().supportSetMaxRows())
00719           cs.setMaxRows(proc.getMaxRows());
00720         rs = cs.executeQuery();
00721       }
00722       catch (SQLException e)
00723       {
00724         throw new SQLException(Translate.get(
00725             "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00726                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00727                 backend.getName(), e.getMessage()}));
00728       }
00729       finally
00730       {
00731         cm.releaseConnection(c);
00732       }
00733       if (logger.isDebugEnabled())
00734         logger.debug(Translate.get("loadbalancer.storedprocedure.on",
00735             new String[]{String.valueOf(proc.getId()), backend.getName()}));
00736       return rs;
00737     }
00738     else
00739     { // Inside a transaction
00740       Connection c;
00741       long tid = proc.getTransactionId();
00742       Long lTid = new Long(tid);
00743 
00744       // Wait for previous writes to complete
00745       if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00746         waitForAllWritesToComplete(backend, proc.getTransactionId());
00747 
00748       if (!backend.isStartedTransaction(lTid))
00749       { // transaction has not been started yet on this backend
00750         try
00751         {
00752           c = cm.getConnection(tid);
00753         }
00754         catch (UnreachableBackendException e1)
00755         {
00756           logger.error(Translate.get(
00757               "loadbalancer.backend.disabling.unreachable", backend.getName()));
00758           backend.disable();
00759           throw new SQLException(Translate.get(
00760               "loadbalancer.backend.unreacheable", backend.getName()));
00761         }
00762 
00763         // Sanity check
00764         if (c == null)
00765           throw new SQLException(Translate.get(
00766               "loadbalancer.unable.get.connection", new String[]{
00767                   String.valueOf(tid), backend.getName()}));
00768 
00769         // begin transaction
00770         backend.startTransaction(lTid);
00771         c.setAutoCommit(false);
00772       }
00773       else
00774       { // Re-use the connection used by this transaction
00775         c = cm.retrieveConnection(tid);
00776 
00777         // Sanity check
00778         if (c == null)
00779           throw new SQLException(Translate.get(
00780               "loadbalancer.unable.retrieve.connection", new String[]{
00781                   String.valueOf(tid), backend.getName()}));
00782       }
00783 
00784       // Execute Query
00785       ResultSet rs;
00786       try
00787       {
00788         // We suppose here that the request does not modify the schema since
00789         // it is a read-only query.
00790         CallableStatement cs = c.prepareCall(proc.getSQL());
00791         if (backend.getDriverCompliance().supportSetQueryTimeout())
00792           cs.setQueryTimeout(proc.getTimeout());
00793         if ((proc.getMaxRows() > 0)
00794             && backend.getDriverCompliance().supportSetMaxRows())
00795           cs.setMaxRows(proc.getMaxRows());
00796         rs = cs.executeQuery();
00797       }
00798       catch (SQLException e)
00799       {
00800         throw new SQLException(Translate.get(
00801             "loadbalancer.storedprocedure.failed.on.backend", new String[]{
00802                 proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00803                 backend.getName(), e.getMessage()}));
00804       }
00805       if (logger.isDebugEnabled())
00806         logger.debug(Translate.get("loadbalancer.execute.transaction.on",
00807             new String[]{String.valueOf(tid), String.valueOf(proc.getId()),
00808                 backend.getName()}));
00809       return rs;
00810     }
00811   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest AbstractWriteRequest  request,
boolean  useKeys
throws AllBackendsFailedException, SQLException [private]
 

Common code for execWriteRequest(AbstractWriteRequest) and execWriteRequestWithKeys(AbstractWriteRequest). The result is given back using member variables execWriteRequestResult and execWriteRequestWithKeysResult defined above.

引数:
request the request to execute
useKeys true if this must give an auto generated keys ResultSet
例外:
AllBackendsFailedException if all backends failed to execute the request
SQLException if an error occurs

RAIDb2.java209 行で定義されています。

参照先 org.objectweb.cjdbc.common.util.ReadPrioritaryFIFOWriteLock.acquireRead(), org.objectweb.cjdbc.common.log.Trace.debug(), org.objectweb.cjdbc.common.log.Trace.error(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getBackends(), org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTableRule.getBackends(), org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy.getDefaultRule(), org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy.getPolicy(), org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase.getSQLShortFormLength(), org.objectweb.cjdbc.controller.loadbalancer.policies.createtable.CreateTablePolicy.getTableRule(), org.objectweb.cjdbc.controller.backend.DatabaseBackend.hasTable(), org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.waitForAllWritesToComplete(), と org.objectweb.cjdbc.common.log.Trace.warn().

00211   {
00212     ArrayList backendThreads;
00213     ReadPrioritaryFIFOWriteLock lock;
00214 
00215     // Handle macros
00216     request.setSQL(RequestManager.handleSQLMacros(request.getSQL(),
00217         timestampResolution, false));
00218 
00219     // Determine which list (blocking or not) to use
00220     if (request.mightBlock())
00221     { // Blocking
00222       backendThreads = backendBlockingThreads;
00223       lock = backendBlockingThreadsRWLock;
00224     }
00225     else
00226     { // Non-blocking
00227       backendThreads = backendNonBlockingThreads;
00228       lock = backendNonBlockingThreadsRWLock;
00229       if ((waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
00230           && (request.getTransactionId() != 0))
00231         waitForAllWritesToComplete(request.getTransactionId());
00232     }
00233 
00234     try
00235     {
00236       lock.acquireRead();
00237     }
00238     catch (InterruptedException e)
00239     {
00240       String msg = Translate.get(
00241           "loadbalancer.backendlist.acquire.readlock.failed", e);
00242       logger.error(msg);
00243       throw new SQLException(msg);
00244     }
00245 
00246     int nbOfThreads = backendThreads.size();
00247     ArrayList writeList = new ArrayList();
00248     String tableName = request.getTableName();
00249 
00250     if (request.isCreate())
00251     { // Choose the backend according to the defined policy
00252       CreateTableRule rule = createTablePolicy.getTableRule(request
00253           .getTableName());
00254       if (rule == null)
00255         rule = createTablePolicy.getDefaultRule();
00256 
00257       // Ask the rule to pickup the backends
00258       ArrayList chosen;
00259       try
00260       {
00261         chosen = rule.getBackends(vdb.getBackends());
00262       }
00263       catch (CreateTableException e)
00264       {
00265         throw new SQLException(Translate.get(
00266             "loadbalancer.create.table.rule.failed", e.getMessage()));
00267       }
00268 
00269       // Build the thread list from the backend list
00270       if (chosen != null)
00271       {
00272         for (int i = 0; i < nbOfThreads; i++)
00273         {
00274           BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00275               .get(i);
00276           if (chosen.contains(thread.getBackend()))
00277             writeList.add(thread);
00278         }
00279       }
00280     }
00281     else
00282     { // Build the list of backends that need to execute this request
00283       for (int i = 0; i < nbOfThreads; i++)
00284       {
00285         BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00286             .get(i);
00287         if (thread.getBackend().hasTable(tableName))
00288           writeList.add(thread);
00289       }
00290     }
00291 
00292     nbOfThreads = writeList.size();
00293     if (nbOfThreads == 0)
00294     {
00295       String msg = Translate.get("loadbalancer.execute.no.backend.found",
00296           request.getSQLShortForm(vdb.getSQLShortFormLength()));
00297       logger.warn(msg);
00298       throw new SQLException(msg);
00299     }
00300     else
00301       logger.debug(Translate.get("loadbalancer.execute.on.several",
00302           new String[]{String.valueOf(request.getId()),
00303               String.valueOf(nbOfThreads)}));
00304 
00305     // Create the task
00306     AbstractTask task;
00307     if (useKeys)
00308       task = new WriteRequestWithKeysTask(getNbToWait(nbOfThreads),
00309           nbOfThreads, request);
00310     else
00311       task = new WriteRequestTask(getNbToWait(nbOfThreads), nbOfThreads,
00312           request);
00313 
00314     synchronized (task)
00315     {
00316       if (waitForCompletionPolicy.getPolicy() == WaitForCompletionPolicy.ALL)
00317       { // Post the task in each backendThread tasklist and wakeup the threads
00318         for (int i = 0; i < nbOfThreads; i++)
00319         {
00320           BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00321               .get(i);
00322           synchronized (thread)
00323           {
00324             thread.addTask(task);
00325             thread.notify();
00326           }
00327         }
00328       }
00329       else
00330       {
00331         // We have to first post the request on each backend before letting the
00332         // first backend to execute the request. Therefore we have 2 phases:
00333         //   1. post the task in each thread queue
00334         //   2. notify each thread to execute the query
00335 
00336         // 1. Post the task
00337         if (request.mightBlock())
00338         {
00339           for (int i = 0; i < nbOfThreads; i++)
00340           {
00341             BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00342                 .get(i);
00343             synchronized (thread)
00344             {
00345               thread.addTask(task, request.getTransactionId());
00346             }
00347           }
00348         }
00349         else
00350         {
00351           for (int i = 0; i < nbOfThreads; i++)
00352           {
00353             BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00354                 .get(i);
00355             synchronized (thread)
00356             {
00357               thread.addTask(task);
00358             }
00359           }
00360         }
00361 
00362         // 2. Start the task execution on each backend
00363         for (int i = 0; i < nbOfThreads; i++)
00364         {
00365           BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00366               .get(i);
00367           synchronized (thread)
00368           {
00369             thread.notify();
00370           }
00371         }
00372       }
00373 
00374       lock.releaseRead();
00375 
00376       // Wait for completion (notified by the task)
00377       try
00378       {
00379         // Wait on task
00380         long timeout = request.getTimeout() * 1000;
00381         if (timeout > 0)
00382         {
00383           long start = System.currentTimeMillis();
00384           task.wait(timeout);
00385           long end = System.currentTimeMillis();
00386           long remaining = timeout - (end - start);
00387           if (remaining <= 0)
00388           {
00389             String msg = Translate.get("loadbalancer.request.timeout",
00390                 new String[]{String.valueOf(request.getId()),
00391                     String.valueOf(task.getSuccess()),
00392                     String.valueOf(task.getFailed())});
00393 
00394             // Try to remove the request from the task list
00395             lock.acquireRead();
00396             nbOfThreads = backendThreads.size();
00397             for (int i = 0; i < nbOfThreads; i++)
00398             {
00399               BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00400                   .get(i);
00401               synchronized (thread)
00402               {
00403                 thread.removeTask(task);
00404               }
00405             }
00406             lock.releaseRead();
00407 
00408             logger.warn(msg);
00409             throw new SQLException(msg);
00410           }
00411           // No need to update request timeout since the execution is finished
00412         }
00413         else
00414           task.wait();
00415       }
00416       catch (InterruptedException e)
00417       {
00418         // Try to remove the request from the task list
00419         try
00420         {
00421           lock.acquireRead();
00422           nbOfThreads = backendThreads.size();
00423         }
00424         catch (InterruptedException ignore)
00425         {
00426           nbOfThreads = 0; // Give up
00427         }
00428         for (int i = 0; i < nbOfThreads; i++)
00429         {
00430           BackendWorkerThread thread = (BackendWorkerThread) backendThreads
00431               .get(i);
00432           synchronized (thread)
00433           {
00434             thread.removeTask(task);
00435           }
00436         }
00437         lock.releaseRead();
00438 
00439         throw new SQLException(Translate.get("loadbalancer.request.timeout",
00440             new String[]{String.valueOf(request.getId()),
00441                 String.valueOf(task.getSuccess()),
00442                 String.valueOf(task.getFailed())}));
00443       }
00444 
00445       if (task.getSuccess() > 0)
00446       {
00447         if (useKeys)
00448           execWriteRequestWithKeysResult = ((WriteRequestWithKeysTask) task)
00449               .getResult();
00450         else
00451           execWriteRequestResult = ((WriteRequestTask) task).getResult();
00452 
00453       }
00454       else
00455       { // All tasks failed
00456         ArrayList exceptions = task.getExceptions();
00457         if (exceptions == null)
00458           throw new AllBackendsFailedException(Translate.get(
00459               "loadbalancer.request.failed.all", request.getId()));
00460         else
00461         {
00462           String errorMsg = Translate.get("loadbalancer.request.failed.stack",
00463               request.getId())
00464               + "\n";
00465           for (int i = 0; i < exceptions.size(); i++)
00466             errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
00467           logger.error(errorMsg);
00468           throw new SQLException(errorMsg);
00469         }
00470       }
00471     }
00472   }

int org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest AbstractWriteRequest  request  )  throws AllBackendsFailedException, SQLException [virtual]
 

Performs a write request. This request is broadcasted to all nodes that owns the table to be written.

引数:
request an AbstractWriteRequest
戻り値:
number of rows affected by the request
例外:
AllBackendsFailedException if all backends failed to execute the request
SQLException if an error occurs

org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています.

RAIDb2.java174 行で定義されています。

参照先 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestResult.

参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeys().

00176   {
00177     execWriteRequest(request, false);
00178     return execWriteRequestResult;
00179   }

ResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeys AbstractWriteRequest  request  )  throws AllBackendsFailedException, SQLException [virtual]
 

Perform a write request and return the auto generated keys.

引数:
request the request to execute
戻り値:
auto generated keys.
例外:
AllBackendsFailedException if all backends failed to execute the request
SQLException if an error occurs

org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています.

RAIDb2.java190 行で定義されています。

参照先 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest(), と org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeysResult.

00192   {
00193     execWriteRequest(request, true);
00194     return execWriteRequestWithKeysResult;
00195   }

int org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteStoredProcedure StoredProcedure  proc  )  throws SQLException [virtual]
 

参照:
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.execWriteStoredProcedure(org.objectweb.cjdbc.common.sql.StoredProcedure)

org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています.

RAIDb2.java827 行で定義されています。

参照先 org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask.getResult().

00828   {
00829     WriteStoredProcedureTask task = (WriteStoredProcedureTask) callStoredProcedure(
00830         proc, false);
00831     return task.getResult();
00832   }

abstract String org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getInformation  )  [pure virtual, inherited]
 

Get information about the Request Load Balancer

戻り値:
String containing information

org.objectweb.cjdbc.controller.loadbalancer.paralleldb.ParallelDB_LPRF, org.objectweb.cjdbc.controller.loadbalancer.paralleldb.ParallelDB_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRR, と org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDBを実装しています.

int org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.getNbToWait int  nbOfThreads  )  [private]
 

Returns the number of nodes to wait for according to the defined waitForCompletion policy.

引数:
nbOfThreads total number of threads
戻り値:
int number of threads to wait for

RAIDb2.java140 行で定義されています。

参照先 org.objectweb.cjdbc.controller.loadbalancer.policies.WaitForCompletionPolicy.getPolicy(), と org.objectweb.cjdbc.common.log.Trace.warn().

00141   {
00142     int nbToWait;
00143     switch (waitForCompletionPolicy.getPolicy())
00144     {
00145       case WaitForCompletionPolicy.FIRST :
00146         nbToWait = 1;
00147         break;
00148       case WaitForCompletionPolicy.MAJORITY :
00149         nbToWait = nbOfThreads / 2 + 1;
00150         break;
00151       default :
00152         logger
00153             .warn(Translate.get("loadbalancer.waitforcompletion.unsupported"));
00154       case WaitForCompletionPolicy.ALL :
00155         nbToWait = nbOfThreads;
00156         break;
00157     }
00158     return nbToWait;
00159   }

int org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getParsingGranularity  )  [inherited]
 

Get the needed query parsing granularity.

戻り値:
needed query parsing granularity

AbstractLoadBalancer.java151 行で定義されています。

参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.setLoadBalancer().

00152   {
00153     return parsingGranularity;
00154   }

abstract String org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.getRaidb2Xml  )  [pure virtual]
 

return xml formatted information about this raidb2 load balancer

戻り値:
xml formatted string

org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_LPRF, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_RR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_RR, と org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRRを実装しています.

int org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getRAIDbLevel  )  [inherited]
 

Returns the RAIDbLevel.

戻り値:
int the RAIDb level

AbstractLoadBalancer.java131 行で定義されています。

参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.raidbLevel.

参照元 org.objectweb.cjdbc.controller.requestmanager.RequestManager.assignAndCheckSchedulerLoadBalancerValidity().

00132   {
00133     return raidbLevel;
00134   }

String org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getXml  )  [inherited]
 

参照:
org.objectweb.cjdbc.common.xml.XmlComponent.getXml()

AbstractLoadBalancer.java385 行で定義されています。

参照先 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getXmlImpl().

00386   {
00387     StringBuffer info = new StringBuffer();
00388     info.append("<" + DatabasesXmlTags.ELT_LoadBalancer + ">");
00389     info.append(getXmlImpl());
00390     info.append("</" + DatabasesXmlTags.ELT_LoadBalancer + ">");
00391     return info.toString();
00392   }

String org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.getXmlImpl  )  [virtual]
 

参照:
org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getXmlImpl

org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています.

org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。

RAIDb2.java1515 行で定義されています。

01516   {
01517     StringBuffer info = new StringBuffer();
01518     info.append("<" + DatabasesXmlTags.ELT_RAIDb_2 + " "
01519         + DatabasesXmlTags.ATT_timestampResolution + "=\""
01520         + timestampResolution + "\" >");
01521     if (createTablePolicy != null)
01522       info.append(createTablePolicy.getXml());
01523     if (waitForCompletionPolicy != null)
01524       info.append(waitForCompletionPolicy.getXml());
01525     this.getRaidb2Xml();
01526     info.append("</" + DatabasesXmlTags.ELT_RAIDb_2 + ">");
01527     return info.toString();
01528   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.rollback TransactionMarkerMetaData  tm  )  throws SQLException [virtual]
 

Rollbacks a transaction.

引数:
tm the transaction marker metadata
例外:
SQLException if an error occurs

org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerに実装されています.

RAIDb2.java1131 行で定義されています。

参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.addTask(), org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getFailed(), org.objectweb.cjdbc.controller.loadbalancer.tasks.AbstractTask.getSuccess(), と org.objectweb.cjdbc.controller.backend.DatabaseBackend.isStartedTransaction().

01132   {
01133     if (waitForCompletionPolicy.getPolicy() != WaitForCompletionPolicy.ALL)
01134       waitForAllWritesToComplete(tm.getTransactionId());
01135 
01136     try
01137     {
01138       backendNonBlockingThreadsRWLock.acquireRead();
01139     }
01140     catch (InterruptedException e)
01141     {
01142       String msg = Translate.get(
01143           "loadbalancer.backendlist.acquire.readlock.failed", e);
01144       logger.error(msg);
01145       throw new SQLException(msg);
01146     }
01147     int nbOfThreads = backendNonBlockingThreads.size();
01148     ArrayList rollbackList = new ArrayList();
01149     Long iTid = new Long(tm.getTransactionId());
01150 
01151     // Build the list of backend that need to rollback this transaction
01152     for (int i = 0; i < nbOfThreads; i++)
01153     {
01154       BackendWorkerThread thread = (BackendWorkerThread) backendNonBlockingThreads
01155           .get(i);
01156       if (thread.getBackend().isStartedTransaction(iTid))
01157         rollbackList.add(thread);
01158     }
01159 
01160     nbOfThreads = rollbackList.size();
01161     if (nbOfThreads == 0)
01162     {
01163       backendNonBlockingThreadsRWLock.releaseRead();
01164       return;
01165     }
01166 
01167     // Create the task
01168     RollbackTask task = new RollbackTask(getNbToWait(nbOfThreads), nbOfThreads,
01169         tm.getTimeout(), tm.getLogin(), tm.getTransactionId());
01170 
01171     synchronized (task)
01172     {
01173       // Post the task in each backendThread tasklist and wakeup the threads
01174       for (int i = 0; i < nbOfThreads; i++)
01175       {
01176         BackendWorkerThread thread = (BackendWorkerThread) rollbackList.get(i);
01177         synchronized (thread)
01178         {
01179           thread.addTask(task);
01180           thread.notify();
01181         }
01182       }
01183 
01184       backendNonBlockingThreadsRWLock.releaseRead();
01185 
01186       // Wait for completion (notified by the task)
01187       try
01188       {
01189         // Wait on task
01190         long timeout = tm.getTimeout();
01191         if (timeout > 0)
01192         {
01193           long start = System.currentTimeMillis();
01194           task.wait(timeout);
01195           long end = System.currentTimeMillis();
01196           long remaining = timeout - (end - start);
01197           if (remaining <= 0)
01198           {
01199             String msg = Translate.get("loadbalancer.rollback.timeout",
01200                 new String[]{String.valueOf(tm.getTransactionId()),
01201                     String.valueOf(task.getSuccess()),
01202                     String.valueOf(task.getFailed())});
01203             logger.warn(msg);
01204             throw new SQLException(msg);
01205           }
01206         }
01207         else
01208           task.wait();
01209       }
01210       catch (InterruptedException e)
01211       {
01212         throw new SQLException(Translate.get("loadbalancer.rollback.timeout",
01213             new String[]{String.valueOf(tm.getTransactionId()),
01214                 String.valueOf(task.getSuccess()),
01215                 String.valueOf(task.getFailed())}));
01216       }
01217 
01218       if (task.getSuccess() > 0)
01219         return;
01220       else
01221       { // All tasks failed
01222         ArrayList exceptions = task.getExceptions();
01223         if (exceptions == null)
01224           throw new SQLException(Translate.get(
01225               "loadbalancer.rollback.all.failed", tm.getTransactionId()));
01226         else
01227         {
01228           String errorMsg = Translate.get("loadbalancer.rollback.failed.stack",
01229               tm.getTransactionId())
01230               + "\n";
01231           for (int i = 0; i < exceptions.size(); i++)
01232             errorMsg += ((SQLException) exceptions.get(i)).getMessage() + "\n";
01233           logger.error(errorMsg);
01234           throw new SQLException(errorMsg);
01235         }
01236       }
01237     }
01238   }

void org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.setParsingGranularity int  parsingGranularity  )  [inherited]
 

Set the needed query parsing granularity.

引数:
parsingGranularity the granularity to set

AbstractLoadBalancer.java161 行で定義されています。

00162   {
00163     this.parsingGranularity = parsingGranularity;
00164   }

void org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.setRAIDbLevel int  raidbLevel  )  [inherited]
 

Sets the RAIDbLevel.

引数:
raidbLevel The RAIDb level to set

AbstractLoadBalancer.java141 行で定義されています。

00142   {
00143     this.raidbLevel = raidbLevel;
00144   }

void org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.setWeight String  name,
int  w
throws SQLException [inherited]
 

Associate a weight to a backend identified by its logical name.

引数:
name the backend name
w the weight
例外:
SQLException if an error occurs

org.objectweb.cjdbc.controller.loadbalancer.raidb0.RAIDb0, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb1.RAIDb1ec_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2_WRR, org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ec_WRR, と org.objectweb.cjdbc.controller.loadbalancer.singledb.SingleDBで再定義されています。

AbstractLoadBalancer.java359 行で定義されています。

00360   {
00361     throw new SQLException("Weight is not supported by this load balancer");
00362   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.waitForAllWritesToComplete DatabaseBackend  backend  )  throws SQLException [protected]
 

Waits for all writes in the blocking thread queue of the given backend to complete.

参照:
executeRequestOnBackend

RAIDb2.java1312 行で定義されています。

参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().

01314   {
01315     try
01316     {
01317       backendBlockingThreadsRWLock.acquireRead();
01318     }
01319     catch (InterruptedException e)
01320     {
01321       String msg = Translate.get(
01322           "loadbalancer.backendlist.acquire.readlock.failed", e);
01323       logger.error(msg);
01324       throw new SQLException(msg);
01325     }
01326 
01327     int nbOfThreads = backendBlockingThreads.size();
01328 
01329     for (int i = 0; i < nbOfThreads; i++)
01330     {
01331       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01332           .get(i);
01333       if (thread.getBackend() == backend)
01334         thread.waitForAllTasksToComplete();
01335     }
01336 
01337     backendBlockingThreadsRWLock.releaseRead();
01338   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.waitForAllWritesToComplete DatabaseBackend  backend,
long  transactionId
throws SQLException [protected]
 

Waits for all writes of the given transaction in the blocking thread queue of the given backend to complete before being able to complete the transaction.

参照:
executeRequestOnBackend

RAIDb2.java1278 行で定義されています。

参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.getBackend(), と org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().

01280   {
01281     try
01282     {
01283       backendBlockingThreadsRWLock.acquireRead();
01284     }
01285     catch (InterruptedException e)
01286     {
01287       String msg = Translate.get(
01288           "loadbalancer.backendlist.acquire.readlock.failed", e);
01289       logger.error(msg);
01290       throw new SQLException(msg);
01291     }
01292 
01293     int nbOfThreads = backendBlockingThreads.size();
01294 
01295     for (int i = 0; i < nbOfThreads; i++)
01296     {
01297       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01298           .get(i);
01299       if (thread.getBackend() == backend)
01300         thread.waitForAllTasksToComplete(transactionId);
01301     }
01302 
01303     backendBlockingThreadsRWLock.releaseRead();
01304   }

void org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.waitForAllWritesToComplete long  transactionId  )  throws SQLException [protected]
 

Waits for all writes of the given transaction in the blocking thread queue to complete before being able to complete the transaction.

RAIDb2.java1244 行で定義されています。

参照先 org.objectweb.cjdbc.controller.loadbalancer.BackendWorkerThread.waitForAllTasksToComplete().

参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest().

01246   {
01247     try
01248     {
01249       backendBlockingThreadsRWLock.acquireRead();
01250     }
01251     catch (InterruptedException e)
01252     {
01253       String msg = Translate.get(
01254           "loadbalancer.backendlist.acquire.readlock.failed", e);
01255       logger.error(msg);
01256       throw new SQLException(msg);
01257     }
01258 
01259     int nbOfThreads = backendBlockingThreads.size();
01260 
01261     for (int i = 0; i < nbOfThreads; i++)
01262     {
01263       BackendWorkerThread thread = (BackendWorkerThread) backendBlockingThreads
01264           .get(i);
01265       thread.waitForAllTasksToComplete(transactionId);
01266     }
01267 
01268     backendBlockingThreadsRWLock.releaseRead();
01269   }


変数

ArrayList org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.backendBlockingThreads [protected]
 

RAIDb2.java87 行で定義されています。

ReadPrioritaryFIFOWriteLock org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.backendBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock() [protected]
 

RAIDb2.java89 行で定義されています。

ArrayList org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.backendNonBlockingThreads [protected]
 

RAIDb2.java88 行で定義されています。

ReadPrioritaryFIFOWriteLock org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.backendNonBlockingThreadsRWLock = new ReadPrioritaryFIFOWriteLock() [protected]
 

RAIDb2.java90 行で定義されています。

CreateTablePolicy org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.createTablePolicy [protected]
 

RAIDb2.java93 行で定義されています。

int org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestResult = 0 [private]
 

RAIDb2.java161 行で定義されています。

参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequest().

ResultSet org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeysResult = null [private]
 

RAIDb2.java162 行で定義されています。

参照元 org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.execWriteRequestWithKeys().

Trace org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.logger [static, protected]
 

初期値:

 Trace
                                                                            .getLogger("org.objectweb.cjdbc.controller.loadbalancer.raidb2")

org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancerを再定義しています。

org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2ecで再定義されています。

RAIDb2.java96 行で定義されています。

int org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.parsingGranularity [protected, inherited]
 

AbstractLoadBalancer.java74 行で定義されています。

int org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.raidbLevel [protected, inherited]
 

AbstractLoadBalancer.java73 行で定義されています。

参照元 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.AbstractLoadBalancer(), と org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.getRAIDbLevel().

long org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.timestampResolution [private]
 

RAIDb2.java94 行で定義されています。

VirtualDatabase org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.vdb [protected, inherited]
 

AbstractLoadBalancer.java72 行で定義されています。

参照元 org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer.AbstractLoadBalancer().

WaitForCompletionPolicy org.objectweb.cjdbc.controller.loadbalancer.raidb2.RAIDb2.waitForCompletionPolicy [protected]
 

RAIDb2.java92 行で定義されています。


このクラスの説明は次のファイルから生成されました:
CJDBCversion1.0rc6に対してWed May 5 18:02:39 2004に生成されました。 doxygen 1.3.6