00001
00025
package org.objectweb.cjdbc.controller.requestmanager;
00026
00027
import java.sql.SQLException;
00028
import java.util.ArrayList;
00029
import java.util.Hashtable;
00030
00031
import org.objectweb.cjdbc.common.exceptions.BackupException;
00032
import org.objectweb.cjdbc.common.exceptions.ExceptionTypes;
00033
import org.objectweb.cjdbc.common.exceptions.NoMoreBackendException;
00034
import org.objectweb.cjdbc.common.exceptions.OctopusException;
00035
import org.objectweb.cjdbc.common.exceptions.RollbackException;
00036
import org.objectweb.cjdbc.common.i18n.Translate;
00037
import org.objectweb.cjdbc.common.log.Trace;
00038
import org.objectweb.cjdbc.common.shared.BackendState;
00039
import org.objectweb.cjdbc.common.shared.BackupListener;
00040
import org.objectweb.cjdbc.common.sql.AbstractWriteRequest;
00041
import org.objectweb.cjdbc.common.sql.AlterRequest;
00042
import org.objectweb.cjdbc.common.sql.CreateRequest;
00043
import org.objectweb.cjdbc.common.sql.ParsingGranularities;
00044
import org.objectweb.cjdbc.common.sql.RequestType;
00045
import org.objectweb.cjdbc.common.sql.SelectRequest;
00046
import org.objectweb.cjdbc.common.sql.StoredProcedure;
00047
import org.objectweb.cjdbc.common.sql.UpdateRequest;
00048
import org.objectweb.cjdbc.common.sql.schema.DatabaseSchema;
00049
import org.objectweb.cjdbc.common.sql.schema.DatabaseTable;
00050
import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00051
import org.objectweb.cjdbc.common.xml.XmlComponent;
00052
import org.objectweb.cjdbc.controller.backend.BackendStateListener;
00053
import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00054
import org.objectweb.cjdbc.controller.backup.BackupManager;
00055
import org.objectweb.cjdbc.controller.cache.metadata.MetadataCache;
00056
import org.objectweb.cjdbc.controller.cache.parsing.ParsingCache;
00057
import org.objectweb.cjdbc.controller.cache.result.AbstractResultCache;
00058
import org.objectweb.cjdbc.controller.cache.result.entries.CacheEntry;
00059
import org.objectweb.cjdbc.controller.loadbalancer.AbstractLoadBalancer;
00060
import org.objectweb.cjdbc.controller.loadbalancer.AllBackendsFailedException;
00061
import org.objectweb.cjdbc.controller.recoverylog.AbstractRecoveryLog;
00062
import org.objectweb.cjdbc.controller.recoverylog.BackendRecoveryInfo;
00063
import org.objectweb.cjdbc.controller.recoverylog.JDBCRecoverThread;
00064
import org.objectweb.cjdbc.controller.scheduler.AbstractScheduler;
00065
import org.objectweb.cjdbc.controller.virtualdatabase.ControllerResultSet;
00066
import org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase;
00067
00080 public class RequestManager implements XmlComponent
00081 {
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00095 protected long beginTimeout;
00097 protected long commitTimeout;
00099 protected long rollbackTimeout;
00100
00102 protected VirtualDatabase
vdb;
00103
00105 protected AbstractScheduler
scheduler;
00106
00108 protected AbstractResultCache
resultCache;
00109
00111 protected AbstractLoadBalancer
loadBalancer;
00112
00114 protected AbstractRecoveryLog
recoveryLog;
00115
00117 protected BackupManager
backupManager;
00118
00119
00120 protected DatabaseSchema
dbs;
00121 private boolean schemaIsStatic =
false;
00122 private boolean isCaseSensitiveParsing =
false;
00123 protected ParsingCache
parsingCache = null;
00124 private MetadataCache
metadataCache = null;
00125
00126
00127
00128 protected int schedulerParsingranularity =
ParsingGranularities.NO_PARSING;
00129 private int cacheParsingranularity =
ParsingGranularities.NO_PARSING;
00130 private int loadBalancerParsingranularity =
ParsingGranularities.NO_PARSING;
00131 protected int requiredGranularity =
ParsingGranularities.NO_PARSING;
00133 protected Hashtable
tidLoginTable;
00134
00135 protected static Trace
logger = null;
00136 private BackendStateListener
backendStateListener;
00137
00138
00139
00140
00141
00155 public RequestManager(VirtualDatabase vdb, AbstractScheduler scheduler,
00156 AbstractResultCache cache, AbstractLoadBalancer loadBalancer,
00157 AbstractRecoveryLog recoveryLog,
long beginTimeout,
long commitTimeout,
00158
long rollbackTimeout)
throws SQLException
00159 {
00160
assignAndCheckSchedulerLoadBalancerValidity(
scheduler,
loadBalancer);
00161
00162
this.resultCache = cache;
00163
this.vdb =
vdb;
00164
if (
resultCache != null)
00165 {
00166
cacheParsingranularity = cache.getParsingGranularity();
00167
if (
cacheParsingranularity >
requiredGranularity)
00168
requiredGranularity =
cacheParsingranularity;
00169 }
00170
setRecoveryLog(
recoveryLog);
00171
initRequestManagerVariables(
vdb,
beginTimeout,
commitTimeout,
00172
rollbackTimeout);
00173
setBackendsLastKnownCheckpointFromRecoveryLog();
00174
logger.info(
Translate.get(
"requestmanager.parsing.granularity",
00175
ParsingGranularities.getInformation(
requiredGranularity)));
00176 }
00177
00182 private void setBackendsLastKnownCheckpointFromRecoveryLog()
00183 {
00184
00185
if (
recoveryLog == null)
00186
return;
00187 String databaseName =
vdb.getName();
00188 ArrayList backends =
vdb.getBackends();
00189
int size = backends.size();
00190
DatabaseBackend backend;
00191
BackendRecoveryInfo info;
00192
for (
int i = 0; i < size; i++)
00193 {
00194 backend = (
DatabaseBackend) backends.get(i);
00195
try
00196 {
00197 info =
recoveryLog.getBackendInfo(databaseName, backend.
getName());
00198 backend.
setLastKnownCheckpoint(info.
getLastCheckpoint());
00199 }
00200
catch (SQLException e)
00201 {
00202
logger.error(
Translate.get(
"requestmanager.checkpoint.not.found",
00203 backend.
getName()), e);
00204 }
00205 }
00206
00207 }
00208
00217 private void assignAndCheckSchedulerLoadBalancerValidity(
00218 AbstractScheduler scheduler, AbstractLoadBalancer loadBalancer)
00219
throws SQLException
00220 {
00221
if (
scheduler == null)
00222
throw new SQLException(
Translate.get(
"requestmanager.null.scheduler"));
00223
00224
if (
loadBalancer == null)
00225
throw new SQLException(
Translate.get(
"requestmanager.null.loadbalancer"));
00226
00227
if (
scheduler.getRAIDbLevel() !=
loadBalancer.getRAIDbLevel())
00228
throw new SQLException(
Translate.get(
00229
"requestmanager.incompatible.raidb.levels",
00230
new String[]{
"" +
scheduler.getRAIDbLevel(),
00231
"" +
loadBalancer.getRAIDbLevel()}));
00232
00233
00234
this.scheduler = scheduler;
00235 schedulerParsingranularity = scheduler.getParsingGranularity();
00236 requiredGranularity = schedulerParsingranularity;
00237
this.loadBalancer = loadBalancer;
00238 loadBalancerParsingranularity = loadBalancer.getParsingGranularity();
00239
if (loadBalancerParsingranularity > requiredGranularity)
00240 requiredGranularity = loadBalancerParsingranularity;
00241 }
00242
00251 private void initRequestManagerVariables(VirtualDatabase vdb,
00252
long beginTimeout,
long commitTimeout,
long rollbackTimeout)
00253 {
00254
this.tidLoginTable =
new Hashtable();
00255
this.beginTimeout = beginTimeout;
00256
this.commitTimeout = commitTimeout;
00257
this.rollbackTimeout = rollbackTimeout;
00258
this.vdb = vdb;
00259 logger = Trace.getLogger(
"org.objectweb.cjdbc.controller.RequestManager."
00260 + vdb.getDatabaseName());
00261 }
00262
00263
00264
00265
00266
00275 public ControllerResultSet execReadRequest(
SelectRequest request)
00276
throws SQLException
00277 {
00278
00279
if (!request.isAutoCommit())
00280 {
00281
long tid = request.getTransactionId();
00282
if (!tidLoginTable.containsKey(
new Long(tid)))
00283
throw new SQLException(
Translate.get(
"transaction.not.started", tid));
00284 }
00285
00286
00287
00288
00289
if ((requiredGranularity !=
ParsingGranularities.NO_PARSING)
00290 && (!request.isParsed()))
00291 {
00292
if (parsingCache == null)
00293 request.parse(dbs, requiredGranularity, isCaseSensitiveParsing);
00294
else
00295 parsingCache.getParsingFromCache(request);
00296 }
00297
00298
00299
00300
00301
00302
00303
00304
00305
if ((schedulerParsingranularity !=
ParsingGranularities.NO_PARSING)
00306 && !request.isParsed())
00307 {
00308
if (parsingCache == null)
00309 request.parse(dbs, requiredGranularity, isCaseSensitiveParsing);
00310
else
00311 parsingCache.getParsingFromCacheAndParseIfMissing(request);
00312 }
00313
00314
00315 scheduler.scheduleReadRequest(request);
00316
00317
if (logger.isDebugEnabled())
00318 logger.debug(
Translate.get(
"requestmanager.read.request",
new String[]{
00319 request.getId() +
"",
00320 request.getSQLShortForm(vdb.getSQLShortFormLength())}));
00321
00322
00323
00324
00325
00326
ControllerResultSet result = null;
00327
try
00328 {
00329
if (resultCache != null)
00330 {
00331
CacheEntry qce = resultCache.getFromCache(request,
true);
00332
if (qce != null)
00333 {
00334 result = qce.
getResult();
00335
if (result != null)
00336 {
00337
if (vdb.getSQLMonitor() != null)
00338 vdb.getSQLMonitor().logCacheHit(request);
00339
00340 scheduler.readCompleted(request);
00341
return result;
00342 }
00343 }
00344 }
00345
00346
00347
00348
00349
00350
00351
00352
00353
if ((loadBalancerParsingranularity != ParsingGranularities.NO_PARSING)
00354 && !request.isParsed())
00355 {
00356
if (parsingCache == null)
00357 request.parse(dbs, requiredGranularity, isCaseSensitiveParsing);
00358
else
00359 parsingCache.getParsingFromCacheAndParseIfMissing(request);
00360 }
00361
00362
00363 result = loadBalancer.execReadRequest(request, metadataCache);
00364
00365
00366
00367
00368
00369
00370
if ((resultCache != null)
00371 && (request.getCacheAbility() != RequestType.UNCACHEABLE))
00372 {
00373
if (!request.isParsed()
00374 && (cacheParsingranularity != ParsingGranularities.NO_PARSING))
00375 {
00376
00377 {
00378
if (parsingCache == null)
00379 request.parse(dbs, requiredGranularity, isCaseSensitiveParsing);
00380
else
00381 parsingCache.getParsingFromCacheAndParseIfMissing(request);
00382 }
00383 }
00384 resultCache.addToCache(request, result);
00385 }
00386 }
00387
catch (Exception failed)
00388 {
00389
if (resultCache != null)
00390 resultCache.removeFromPendingQueries(request);
00391 scheduler.readCompleted(request);
00392
if (failed instanceof NoMoreBackendException)
00393
throw (NoMoreBackendException) failed;
00394 String msg = Translate.get(
"requestmanager.request.failed",
new String[]{
00395 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00396 failed.getMessage()});
00397
if (failed instanceof RuntimeException)
00398 logger.warn(msg, failed);
00399
else
00400 logger.warn(msg);
00401
throw new SQLException(msg);
00402 }
00403
00404
00405 scheduler.readCompleted(request);
00406
00407
return result;
00408 }
00409
00419 public int execWriteRequest(
AbstractWriteRequest request)
throws SQLException
00420 {
00421 scheduleExecWriteRequest(request);
00422
int execWriteRequestResult = 0;
00423
try
00424 {
00425 execWriteRequestResult = loadBalanceExecWriteRequest(request);
00426 }
00427
catch (
AllBackendsFailedException e)
00428 {
00429 logger.fatal(
Translate
00430 .get(
"requestmanager.write.request.failed.unexpected"), e);
00431 }
00432
catch (
NoMoreBackendException e)
00433 {
00434 logger.fatal(
Translate
00435 .get(
"requestmanager.write.request.failed.no.more.backends"), e);
00436 }
00437 updateAndNotifyExecWriteRequest(request,
true);
00438
return execWriteRequestResult;
00439 }
00440
00450 public ControllerResultSet execWriteRequestWithKeys(
00451
AbstractWriteRequest request)
throws SQLException
00452 {
00453 scheduleExecWriteRequest(request);
00454
ControllerResultSet execWriteRequestWithKeysResult = null;
00455
try
00456 {
00457 execWriteRequestWithKeysResult = loadBalanceExecWriteRequestWithKeys(request);
00458 }
00459
catch (
AllBackendsFailedException e)
00460 {
00461 logger.fatal(
Translate
00462 .get(
"requestmanager.write.request.keys.failed.unexpected"), e);
00463 }
00464 updateAndNotifyExecWriteRequest(request,
true);
00465
return execWriteRequestWithKeysResult;
00466 }
00467
00474 public void scheduleExecWriteRequest(
AbstractWriteRequest request)
00475
throws SQLException
00476 {
00477
00478
if (!request.isAutoCommit())
00479 {
00480
long tid = request.getTransactionId();
00481
if (!tidLoginTable.containsKey(
new Long(tid)))
00482
throw new SQLException(
Translate.get(
"transaction.not.started", tid));
00483 }
00484
00485
00486
00487
00488
if ((requiredGranularity !=
ParsingGranularities.NO_PARSING)
00489 && (!request.isParsed()))
00490 {
00491
if (parsingCache == null)
00492 request.parse(dbs, requiredGranularity, isCaseSensitiveParsing);
00493
else
00494 parsingCache.getParsingFromCache(request);
00495 }
00496
00497
00498
00499
00500
00501
00502
00503
00504
if ((schedulerParsingranularity !=
ParsingGranularities.NO_PARSING)
00505 && !request.isParsed())
00506 {
00507
if (parsingCache == null)
00508 request.parse(dbs, requiredGranularity, isCaseSensitiveParsing);
00509
else
00510 parsingCache.getParsingFromCacheAndParseIfMissing(request);
00511 }
00512
00513
00514
try
00515 {
00516 scheduler.scheduleWriteRequest(request);
00517 }
00518
catch (
RollbackException e)
00519 {
00520 rollback(request.getTransactionId());
00521
throw new SQLException(e.getMessage());
00522 }
00523
00524
if (logger.isDebugEnabled())
00525 logger.debug(
Translate.get(
"requestmanager.write.request",
new String[]{
00526 String.valueOf(request.getId()),
00527 request.getSQLShortForm(vdb.getSQLShortFormLength())}));
00528
00529
00530
00531
00532
00533
try
00534 {
00535
if ((requiredGranularity !=
ParsingGranularities.NO_PARSING)
00536 && !request.isParsed())
00537 {
00538
if (parsingCache == null)
00539 request.parse(dbs, requiredGranularity, isCaseSensitiveParsing);
00540
else
00541 parsingCache.getParsingFromCacheAndParseIfMissing(request);
00542 }
00543 }
00544
catch (SQLException e)
00545 {
00546
00547 scheduler.writeCompleted(request);
00548
throw e;
00549 }
00550 }
00551
00561 public ControllerResultSet loadBalanceExecWriteRequestWithKeys(
00562
AbstractWriteRequest request)
throws AllBackendsFailedException,
00563 SQLException
00564 {
00565
try
00566 {
00567
return loadBalancer.execWriteRequestWithKeys(request, metadataCache);
00568 }
00569
catch (Exception failed)
00570 {
00571 scheduler.writeCompleted(request);
00572 String msg =
Translate.get(
"requestmanager.request.failed",
new String[]{
00573 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00574 failed.getMessage()});
00575
if (failed instanceof RuntimeException)
00576 logger.warn(msg, failed);
00577
else
00578 logger.warn(msg);
00579
if (failed instanceof AllBackendsFailedException)
00580
throw (AllBackendsFailedException) failed;
00581
else
00582
throw new SQLException(msg);
00583 }
00584 }
00585
00595 public int loadBalanceExecWriteRequest(
AbstractWriteRequest request)
00596
throws AllBackendsFailedException, SQLException
00597 {
00598
try
00599 {
00600
if (request.isUpdate() && (resultCache != null))
00601 {
00602
00603
if (!resultCache.isUpdateNecessary((
UpdateRequest) request))
00604
return 0;
00605 }
00606
return loadBalancer.execWriteRequest(request);
00607 }
00608
catch (Exception failed)
00609 {
00610 scheduler.writeCompleted(request);
00611 String msg =
Translate.get(
"requestmanager.request.failed",
new String[]{
00612 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00613 failed.getMessage()});
00614
if (failed instanceof RuntimeException)
00615 logger.warn(msg, failed);
00616
else
00617 logger.warn(msg);
00618
if (failed instanceof AllBackendsFailedException)
00619
throw (AllBackendsFailedException) failed;
00620
else
00621
throw new SQLException(msg);
00622 }
00623 }
00624
00637 public void updateAndNotifyExecWriteRequest(
AbstractWriteRequest request,
00638
boolean notifyScheduler)
throws SQLException
00639 {
00640
00641
try
00642 {
00643
if (resultCache != null)
00644 {
00645 resultCache.writeNotify(request);
00646 }
00647
00648
00649
if (recoveryLog != null)
00650 recoveryLog.logRequest(request);
00651
00652
00653
if (requiredGranularity !=
ParsingGranularities.NO_PARSING)
00654 {
00655
if (request.isCreate())
00656 {
00657 dbs.addTable(((
CreateRequest) request).getDatabaseTable());
00658
if (logger.isDebugEnabled())
00659 logger.debug(
Translate.get(
"requestmanager.schema.add.table",
00660 request.
getTableName()));
00661 }
00662
else if (request.isDrop())
00663 {
00664 dbs.removeTable(dbs.getTable(request.getTableName()));
00665
if (logger.isDebugEnabled())
00666 logger.debug(
Translate.get(
"requestmanager.schema.remove.table",
00667 request.getTableName()));
00668 }
00669
else if (request.isAlter()
00670 && (requiredGranularity >
ParsingGranularities.TABLE))
00671 {
00672
00673
AlterRequest req = (
AlterRequest) request;
00674
if (req.
isDrop())
00675 dbs.getTable(req.
getTableName()).remove(req.
getColumn().getName());
00676
else if (req.
isAdd())
00677 {
00678 dbs.getTable(req.
getTableName()).addColumn(req.
getColumn());
00679 }
00680 }
00681 }
00682
00683
00684
if (notifyScheduler)
00685 scheduler.writeCompleted(request);
00686
00687 }
00688
catch (Exception failed)
00689 {
00690 scheduler.writeCompleted(request);
00691 String msg =
Translate.get(
"requestmanager.request.failed",
new String[]{
00692 request.getSQLShortForm(vdb.getSQLShortFormLength()),
00693 failed.getMessage()});
00694
if (failed instanceof RuntimeException)
00695 logger.warn(msg, failed);
00696
else
00697 logger.warn(msg);
00698
throw new SQLException(msg);
00699 }
00700 }
00701
00709 public ControllerResultSet execReadStoredProcedure(
StoredProcedure proc)
00710
throws SQLException
00711 {
00712
00713
if (!proc.isAutoCommit())
00714 {
00715
long tid = proc.getTransactionId();
00716
if (!tidLoginTable.containsKey(
new Long(tid)))
00717
throw new SQLException(
Translate.get(
"transaction.not.started", tid));
00718 }
00719
00720
00721
00722
00723
00724
00725
00726
00727
SelectRequest spanOnAllTables = null;
00728
if (requiredGranularity !=
ParsingGranularities.NO_PARSING)
00729 {
00730 String sql =
"SELECT * FROM";
00731 ArrayList tables = dbs.getTables();
00732
int size = tables.size();
00733
00734
00735
for (
int i = 0; i < size; i++)
00736 sql +=
" ," + ((
DatabaseTable) tables.get(i)).getName();
00737
00738 spanOnAllTables =
new SelectRequest(sql,
false, 0, proc
00739 .getLineSeparator());
00740 }
00741
else
00742 spanOnAllTables =
new SelectRequest(
"select * from x",
false, 0, proc
00743 .getLineSeparator());
00744
00745
00746 scheduler.scheduleReadRequest(spanOnAllTables);
00747
00748
if (logger.isDebugEnabled())
00749 logger.debug(
Translate.get(
"requestmanager.read.store.procedure",
00750
new String[]{String.valueOf(proc.getId()),
00751 proc.getSQLShortForm(vdb.getSQLShortFormLength())}));
00752
00753
00754
00755
00756
00757
00758
00759
00760
00761
if ((resultCache != null) && (!proc.isReadOnly()))
00762 resultCache.flushCache();
00763
00764
ControllerResultSet result = null;
00765
try
00766 {
00767
00768
00769
00770
00771
00772
if (proc.isReadOnly())
00773 result = loadBalancer.execReadOnlyReadStoredProcedure(proc,
00774 metadataCache);
00775
else
00776 result = loadBalancer.execReadStoredProcedure(proc, metadataCache);
00777
00778
00779
00780
00781
00782
if (recoveryLog != null)
00783 recoveryLog.logRequest(proc,
true);
00784
00785 }
00786
catch (Exception failed)
00787 {
00788 scheduler.readCompleted(spanOnAllTables);
00789 String msg = Translate.get(
"requestmanager.store.procedure.failed",
00790
new String[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00791 failed.getMessage()});
00792 logger.warn(msg);
00793
throw new SQLException(msg);
00794 }
00795
00796
00797 scheduler.readCompleted(spanOnAllTables);
00798
00799
return result;
00800 }
00801
00809 public int execWriteStoredProcedure(
StoredProcedure proc)
throws SQLException
00810 {
00811
00812
if (!proc.isAutoCommit())
00813 {
00814
long tid = proc.getTransactionId();
00815
if (!tidLoginTable.containsKey(
new Long(tid)))
00816
throw new SQLException(
Translate.get(
"transaction.not.started", String
00817 .valueOf(tid)));
00818 }
00819
00820
00821
00822
00823
00824
00825
00826
00827
SelectRequest spanOnAllTables = null;
00828
if (requiredGranularity !=
ParsingGranularities.NO_PARSING)
00829 {
00830 String sql =
"SELECT * FROM";
00831 ArrayList tables = dbs.getTables();
00832
int size = tables.size();
00833
00834
00835
for (
int i = 0; i < size; i++)
00836 sql +=
" ," + ((
DatabaseTable) tables.get(i)).getName();
00837
00838 spanOnAllTables =
new SelectRequest(sql,
false, 0, proc
00839 .getLineSeparator());
00840 }
00841
else
00842 spanOnAllTables =
new SelectRequest(
"select * from x",
false, 0, proc
00843 .getLineSeparator());
00844
00845
00846
00847
00848 scheduler.scheduleReadRequest(spanOnAllTables);
00849
00850
if (logger.isDebugEnabled())
00851 logger.debug(
Translate.get(
"requestmanager.write.store.procedure",
00852
new String[]{String.valueOf(proc.getId()),
00853 proc.getSQLShortForm(vdb.getSQLShortFormLength())}));
00854
00855
00856
00857
00858
00859
00860
if (resultCache != null)
00861 resultCache.flushCache();
00862
00863
int result;
00864
try
00865 {
00866
00867
00868
00869
00870
00871 result = loadBalancer.execWriteStoredProcedure(proc);
00872
00873
00874
00875
00876
00877
if (recoveryLog != null)
00878 recoveryLog.logRequest(proc,
false);
00879
00880 }
00881
catch (Exception failed)
00882 {
00883 scheduler.readCompleted(spanOnAllTables);
00884 String msg = Translate.get(
"requestmanager.store.procedure.failed",
00885
new String[]{proc.getSQLShortForm(vdb.getSQLShortFormLength()),
00886 failed.getMessage()});
00887 logger.warn(msg);
00888
throw new SQLException(msg);
00889 }
00890
00891
00892 scheduler.readCompleted(spanOnAllTables);
00893
00894
return result;
00895 }
00896
00897
00898
00899
00900
00910 public long begin(String login)
throws SQLException
00911 {
00912
try
00913 {
00914
TransactionMarkerMetaData tm =
new TransactionMarkerMetaData(0,
00915 beginTimeout, login);
00916
00917
00918
long tid = scheduler.begin(tm);
00919 tm.
setTransactionId(tid);
00920
00921
if (logger.isDebugEnabled())
00922 logger.debug(
Translate.get(
"transaction.begin", String.valueOf(tid)));
00923
00924
try
00925 {
00926
00927 loadBalancer.begin(tm);
00928
00929
00930
if (recoveryLog != null)
00931 {
00932 recoveryLog.begin(tm);
00933 }
00934 }
00935
catch (SQLException e)
00936 {
00937
throw e;
00938 }
00939 finally
00940 {
00941
00942 scheduler.beginCompleted(tid);
00943 }
00944
00945 tidLoginTable.put(
new Long(tid), tm);
00946
return tid;
00947 }
00948
catch (RuntimeException e)
00949 {
00950 logger.fatal(
Translate.get(
00951
"fatal.runtime.exception.requestmanager.begin", e));
00952
throw new SQLException(e.getMessage());
00953 }
00954 }
00955
00963 public TransactionMarkerMetaData getTransactionMarker(Long tid)
00964
throws SQLException
00965 {
00966
TransactionMarkerMetaData tm = (
TransactionMarkerMetaData) tidLoginTable
00967 .get(tid);
00968
00969
if (tm == null)
00970
throw new SQLException(
Translate.get(
"transaction.marker.not.found",
""
00971 + tid));
00972
00973 tm.
setTimeout(commitTimeout);
00974
return tm;
00975 }
00976
00982 public void completeTransaction(Long tid)
00983 {
00984 tidLoginTable.remove(tid);
00985 }
00986
00993 public void commit(
long transactionId)
throws SQLException
00994 {
00995
try
00996 {
00997 Long tid =
new Long(transactionId);
00998
TransactionMarkerMetaData tm = getTransactionMarker(tid);
00999
01000
01001 scheduler.commit(tm);
01002
01003
if (logger.isDebugEnabled())
01004 logger.debug(
Translate.get(
"transaction.commit", String.valueOf(tid)));
01005
01006
try
01007 {
01008
01009 loadBalancer.commit(tm);
01010
01011
01012
if (resultCache != null)
01013 resultCache.commit(tm.
getTransactionId());
01014
01015
01016
if (recoveryLog != null)
01017 recoveryLog.commit(tm);
01018 }
01019
catch (SQLException e)
01020 {
01021
throw e;
01022 }
01023
catch (AllBackendsFailedException e)
01024 {
01025 String msg =
"All backends failed to commit transaction "
01026 + transactionId +
" (" + e +
")";
01027 logger.error(msg);
01028
throw new SQLException(msg);
01029 }
01030 finally
01031 {
01032
01033 scheduler.commitCompleted(transactionId);
01034
01035 completeTransaction(tid);
01036 }
01037 }
01038
catch (RuntimeException e)
01039 {
01040 logger.fatal(
Translate.get(
01041
"fatal.runtime.exception.requestmanager.commit", e));
01042
throw new SQLException(e.getMessage());
01043 }
01044 }
01045
01052 public void rollback(
long transactionId)
throws SQLException
01053 {
01054
try
01055 {
01056 Long tid =
new Long(transactionId);
01057
TransactionMarkerMetaData tm = getTransactionMarker(tid);
01058
01059
01060 scheduler.rollback(tm);
01061
01062
if (logger.isDebugEnabled())
01063 logger.debug(
Translate.get(
"transaction.rollback", String
01064 .valueOf(transactionId)));
01065
01066
try
01067 {
01068
01069 loadBalancer.rollback(tm);
01070
01071
01072
if (
this.resultCache != null)
01073 resultCache.rollback(transactionId);
01074
01075
01076
if (recoveryLog != null)
01077 {
01078 recoveryLog.rollback(tm);
01079 }
01080 }
01081
catch (SQLException e)
01082 {
01083
throw e;
01084 }
01085
catch (AllBackendsFailedException e)
01086 {
01087 String msg =
Translate.get(
"requestmanager.rollback.failed.all",
01088
new String[]{String.valueOf(transactionId), e.getMessage()});
01089 logger.error(msg);
01090
throw new SQLException(msg);
01091 }
01092 finally
01093 {
01094
01095 scheduler.rollbackCompleted(transactionId);
01096
01097 completeTransaction(tid);
01098 }
01099 }
01100
catch (RuntimeException e)
01101 {
01102 logger.fatal(
Translate
01103 .get(
"fatal.runtime.exception.requestmanager.rollback"), e);
01104
throw new SQLException(e.getMessage());
01105 }
01106 }
01107
01108
01109
01110
01111
01123 public void enableBackend(
DatabaseBackend db)
throws SQLException
01124 {
01125 loadBalancer.enableBackend(db,
true);
01126 logger.info(
Translate.get(
"backend.state.enabled", db.getName()));
01127 }
01128
01140 public void enableBackendFromCheckpoint(
DatabaseBackend db,
01141 String checkpointName)
throws SQLException
01142 {
01143
01144
if (recoveryLog == null)
01145 {
01146 String msg =
Translate.get(
01147
"recovery.restore.checkpoint.failed.cause.null", checkpointName);
01148 logger.error(msg);
01149
throw new SQLException(msg);
01150 }
01151
01152 db.initializeConnections();
01153 recoveryLog.beginRecovery();
01154
01155
01156
long logIdx;
01157
try
01158 {
01159 logIdx = recoveryLog.getCheckpointRequestId(checkpointName);
01160 }
01161
catch (SQLException e)
01162 {
01163 recoveryLog.endRecovery();
01164 String msg =
Translate.get(
"recovery.cannot.get.checkpoint", e);
01165 logger.error(msg);
01166
throw new SQLException(msg);
01167 }
01168
01169
01170
JDBCRecoverThread recoverThread =
new JDBCRecoverThread(recoveryLog, db,
01171 logIdx, loadBalancer);
01172 recoverThread.
run();
01173
01174
if (recoverThread.
getException() != null)
01175
throw recoverThread.
getException();
01176
01177
01178 scheduler.suspendWrites();
01179
01180
01181 recoverThread.
setLogIdx(recoverThread.
getLogIdx() + 1);
01182 recoverThread.
run();
01183
01184
if (recoverThread.
getException() != null)
01185 {
01186 scheduler.resumeWrites();
01187
throw recoverThread.
getException();
01188 }
01189
01190
01191 loadBalancer.enableBackend(db,
true);
01192 scheduler.resumeWrites();
01193 logger.info(
Translate.get(
"backend.state.enabled", db.getName()));
01194 }
01195
01206 public void disableBackend(
DatabaseBackend db)
throws SQLException
01207 {
01208
if (db.isReadEnabled() || db.isWriteEnabled())
01209 {
01210 loadBalancer.disableBackend(db);
01211 logger.info(
Translate.get(
"backend.state.disabled", db.getName()));
01212 }
01213
else
01214 {
01215
throw new SQLException(
Translate.get(
"backend.already.disabled", db
01216 .getName()));
01217 }
01218 }
01219
01231 public void disableBackendForCheckpoint(
DatabaseBackend db,
01232 String checkpointName)
throws SQLException
01233 {
01234
01235
if (recoveryLog == null)
01236 {
01237 String msg =
Translate.get(
"recovery.store.checkpoint.failed.cause.null",
01238 checkpointName);
01239 logger.error(msg);
01240
throw new SQLException(msg);
01241 }
01242
01243
01244 logger.info(
Translate.get(
"requestmanager.wait.pending.writes"));
01245 scheduler.suspendWrites();
01246
01247
01248 recoveryLog.storeCheckpoint(checkpointName);
01249 logger.info(
Translate.get(
"recovery.checkpoint.stored", checkpointName));
01250
01251
01252 db.setState(
BackendState.DISABLING);
01253 logger.info(
Translate.get(
"backend.state.disabling", db.getName()));
01254
01255
01256 logger.info(
Translate.get(
"requestmanager.resume.pending.writes"));
01257 scheduler.resumeWrites();
01258
01259
01260 db.waitForAllTransactionsToComplete();
01261
01262
01263 db.setLastKnownCheckpoint(checkpointName);
01264 loadBalancer.disableBackend(db);
01265 logger.info(
Translate.get(
"backend.state.disabled", db.getName()));
01266 }
01267
01278 public void disableBackendsForCheckpoint(ArrayList backendsArrayList,
01279 String checkpointName)
throws SQLException
01280 {
01281
01282
if (recoveryLog == null)
01283 {
01284 String msg =
Translate.get(
"recovery.store.checkpoint.failed.cause.null",
01285 checkpointName);
01286 logger.error(msg);
01287
throw new SQLException(msg);
01288 }
01289
01290
01291 logger.info(
Translate.get(
"requestmanager.wait.pending.writes"));
01292 scheduler.suspendWrites();
01293
01294
01295 recoveryLog.storeCheckpoint(checkpointName);
01296 logger.info(
Translate.get(
"recovery.checkpoint.stored", checkpointName));
01297
01298
01299
DatabaseBackend db;
01300 ArrayList backendList = (ArrayList) backendsArrayList.clone();
01301
for (
int i = 0; i < backendList.size(); i++)
01302 {
01303 db = (
DatabaseBackend) backendList.get(i);
01304
if (!db.
isWriteEnabled())
01305 backendList.remove(i);
01306 }
01307
01308
01309
int size = backendList.size();
01310
for (
int i = 0; i < size; i++)
01311 {
01312 db = (
DatabaseBackend) backendList.get(i);
01313 db.
setState(
BackendState.DISABLING);
01314 logger.info(
Translate.get(
"backend.state.disabling", db.
getName()));
01315 }
01316
01317
01318 logger.info(
Translate.get(
"requestmanager.resume.pending.writes"));
01319 scheduler.resumeWrites();
01320
01321
01322
for (
int i = 0; i < size; i++)
01323 {
01324 db = (
DatabaseBackend) backendList.get(i);
01325 db.
waitForAllTransactionsToComplete();
01326 }
01327
01328
01329
for (
int i = 0; i < size; i++)
01330 {
01331 db = (
DatabaseBackend) backendList.get(i);
01332 db.
setLastKnownCheckpoint(checkpointName);
01333 loadBalancer.disableBackend(db);
01334 logger.info(
Translate.get(
"backend.state.disabled", db.
getName()));
01335 }
01336 }
01337
01351 public void callBackupManager(
boolean backup,
DatabaseBackend db,
01352 String checkpoint, ArrayList tables,
boolean enableAfter,
01353
BackupListener listener)
throws Exception
01354 {
01355
if (backup)
01356 backupBackendWithCheckpoint(db, checkpoint, tables, enableAfter,
false,
01357 listener);
01358
else
01359 restoreBackendFromBackupCheckpoint(db, checkpoint,
false, listener);
01360 }
01361
01374 public void backupBackendWithCheckpoint(
DatabaseBackend db,
01375 String checkpointName, ArrayList tables,
boolean enableAfter,
01376
boolean wait4Result,
BackupListener listener)
throws SQLException
01377 {
01378
01379
01380
if (db.isReadEnabled())
01381 {
01382 disableBackendForCheckpoint(db, checkpointName);
01383 logger.info(
Translate.get(
"backend.state.disabled", db.getName()));
01384 }
01385
01386
try
01387 {
01388 logger.info(
Translate
01389 .get(
"controller.backup.octopus.start", db.getName()));
01390 backupManager.backup(db, checkpointName, tables, listener);
01391 db.setLastKnownCheckpoint(checkpointName);
01392
if (wait4Result)
01393 backupManager.getResult(db, 0);
01394 }
01395
catch (
OctopusException e)
01396 {
01397 logger.error(
Translate.get(
"controller.backup.octopus.failed"), e);
01398
throw new SQLException(e.getMessage());
01399 }
01400
catch (
BackupException be)
01401 {
01402 logger.error(
Translate.get(
"controller.backup.failed"), be);
01403
throw new SQLException(be.getMessage());
01404 }
01405 logger.info(
Translate.get(
"controller.backup.complete", db.getName()));
01406
01407
if (enableAfter)
01408 {
01409 enableBackendFromCheckpoint(db, checkpointName);
01410 }
01411
01412 }
01413
01428 public void restoreBackendFromBackupCheckpoint(
DatabaseBackend db,
01429 String checkpointName,
boolean wait4Result,
BackupListener listener)
01430
throws OctopusException,
BackupException
01431 {
01432
try
01433 {
01434
01435
01436
if (db.isReadEnabled())
01437 loadBalancer.disableBackend(db);
01438
01439 backupManager.restore(db, checkpointName, null, listener);
01440
if (wait4Result)
01441 backupManager.getResult(db, 0);
01442 }
01443
catch (SQLException e1)
01444 {
01445
01446
throw new BackupException(
ExceptionTypes.BACKEND_CANNOT_BE_DISABLED);
01447 }
01448
catch (OctopusException e)
01449 {
01450 logger.error(
Translate.get(
"controller.octopus.recovery.failed"), e);
01451
throw e;
01452 }
01453
catch (
BackupException be)
01454 {
01455 logger.error(
Translate.get(
"controller.backup.recovery.failed"), be);
01456
throw be;
01457 }
01458 finally
01459 {
01460 logger.info(
Translate
01461 .get(
"controller.backup.recovery.done", db.getName()));
01462 }
01463 }
01464
01471 public void storeBackendsInfo(String databaseName, ArrayList backends)
01472 {
01473
if (recoveryLog == null)
01474
return;
01475
int size = backends.size();
01476
DatabaseBackend backend;
01477
for (
int i = 0; i < size; i++)
01478 {
01479 backend = (
DatabaseBackend) backends.get(i);
01480
try
01481 {
01482 recoveryLog.storeBackendInfo(databaseName, backend.
getName(), backend
01483 .
getLastKnownCheckpoint(), backend.
getStateValue());
01484 }
01485
catch (SQLException e)
01486 {
01487 logger.error(
Translate.get(
"recovery.store.checkpoint.failed",
01488
new String[]{backend.
getName(), e.getMessage()}), e);
01489 }
01490 }
01491 }
01492
01499 public void removeCheckpoint(String checkpointName)
throws SQLException
01500 {
01501 recoveryLog.removeCheckpoint(checkpointName);
01502 }
01503
01504
01505
01506
01507
01513 public VirtualDatabase getVirtualDatabase()
01514 {
01515
return vdb;
01516 }
01517
01525 public void setDatabaseSchema(DatabaseSchema schema,
boolean isStatic)
01526 {
01527
if (schemaIsStatic)
01528 {
01529
if (isStatic)
01530 {
01531 logger.warn(
Translate
01532 .get(
"requestmanager.schema.replace.static.with.new"));
01533
this.dbs = schema;
01534 }
01535
else
01536 logger.info(
Translate.get(
"requestmanager.schema.ignore.new.dynamic"));
01537 }
01538
else
01539 {
01540 schemaIsStatic = isStatic;
01541
this.dbs = schema;
01542 logger.info(
Translate
01543 .get(
"requestmanager.schema.set.new.virtualdatabase"));
01544 }
01545
01546
if (schedulerParsingranularity !=
ParsingGranularities.NO_PARSING)
01547 scheduler.setDatabaseSchema(dbs);
01548
01549
if (cacheParsingranularity !=
ParsingGranularities.NO_PARSING)
01550 resultCache.setDatabaseSchema(dbs);
01551
01552
01553 }
01554
01560 public void mergeDatabaseSchema(DatabaseSchema backendSchema)
01561 {
01562
try
01563 {
01564
if (dbs == null)
01565 setDatabaseSchema(
new DatabaseSchema(backendSchema),
false);
01566
else
01567 {
01568 dbs.mergeSchema(backendSchema);
01569 logger.info(
Translate
01570 .get(
"requestmanager.schema.virtualdatabase.merged.new"));
01571
01572
if (schedulerParsingranularity !=
ParsingGranularities.NO_PARSING)
01573 scheduler.mergeDatabaseSchema(dbs);
01574
01575
if (cacheParsingranularity !=
ParsingGranularities.NO_PARSING)
01576 resultCache.mergeDatabaseSchema(dbs);
01577 }
01578 }
01579
catch (SQLException e)
01580 {
01581 logger.error(
Translate.get(
"requestmanager.schema.merge.failed", e
01582 .getMessage()), e);
01583 }
01584 }
01585
01591 public void setBackupManager(BackupManager currentBackupManager)
01592 {
01593
this.backupManager = currentBackupManager;
01594 }
01595
01601 public BackupManager getBackupManager()
01602 {
01603
return backupManager;
01604 }
01605
01611 public DatabaseSchema getDatabaseSchema()
01612 {
01613
return dbs;
01614 }
01615
01621 public AbstractLoadBalancer getLoadBalancer()
01622 {
01623
return loadBalancer;
01624 }
01625
01631 public void setLoadBalancer(AbstractLoadBalancer loadBalancer)
01632 {
01633
this.loadBalancer = loadBalancer;
01634 loadBalancerParsingranularity = loadBalancer.getParsingGranularity();
01635
if (loadBalancerParsingranularity > requiredGranularity)
01636 requiredGranularity = loadBalancerParsingranularity;
01637 }
01638
01645 public AbstractResultCache getResultCache()
01646 {
01647
return resultCache;
01648 }
01649
01655 public MetadataCache getMetadataCache()
01656 {
01657
return metadataCache;
01658 }
01659
01665 public void setMetadataCache(MetadataCache metadataCache)
01666 {
01667
this.metadataCache = metadataCache;
01668 }
01669
01675 public void setParsingCache(ParsingCache parsingCache)
01676 {
01677 parsingCache.setRequestManager(
this);
01678 parsingCache.setGranularity(requiredGranularity);
01679 parsingCache.setCaseSensitiveParsing(isCaseSensitiveParsing);
01680
this.parsingCache = parsingCache;
01681 }
01682
01688 public AbstractRecoveryLog getRecoveryLog()
01689 {
01690
return recoveryLog;
01691 }
01692
01698 public void setRecoveryLog(AbstractRecoveryLog recoveryLog)
01699 {
01700
if(recoveryLog==null)
01701
return;
01702
this.recoveryLog = recoveryLog;
01703 ArrayList backends = vdb.getBackends();
01704
int size = backends.size();
01705 backendStateListener =
new BackendStateListener(vdb.getName(), recoveryLog);
01706
for (
int i = 0; i < size; i++)
01707 ((
DatabaseBackend) backends.get(i))
01708 .setStateListener(backendStateListener);
01709 }
01710
01716 public void setResultCache(AbstractResultCache cache)
01717 {
01718 resultCache = cache;
01719 cacheParsingranularity = cache.getParsingGranularity();
01720
if (cacheParsingranularity > requiredGranularity)
01721 requiredGranularity = cacheParsingranularity;
01722 }
01723
01730 public AbstractScheduler getScheduler()
01731 {
01732
return scheduler;
01733 }
01734
01740 public void setScheduler(AbstractScheduler scheduler)
01741 {
01742
this.scheduler = scheduler;
01743 schedulerParsingranularity = scheduler.getParsingGranularity();
01744
if (schedulerParsingranularity > requiredGranularity)
01745 requiredGranularity = schedulerParsingranularity;
01746 }
01747
01755 public void setCaseSensitiveParsing(
boolean isCaseSensitiveParsing)
01756 {
01757
this.isCaseSensitiveParsing = isCaseSensitiveParsing;
01758
if (parsingCache != null)
01759 parsingCache.setCaseSensitiveParsing(isCaseSensitiveParsing);
01760 }
01761
01765 public int getRequiredParsingGranularity()
01766 {
01767
return requiredGranularity;
01768 }
01769
01770
01771
01772
01773
01779 public String getXml()
01780 {
01781 StringBuffer info =
new StringBuffer();
01782 info.append(
"<" +
DatabasesXmlTags.ELT_RequestManager +
" "
01783 +
DatabasesXmlTags.ATT_caseSensitiveParsing +
"=\""
01784 + isCaseSensitiveParsing +
"\" " +
DatabasesXmlTags.ATT_beginTimeout
01785 +
"=\"" + beginTimeout / 1000 +
"\" "
01786 +
DatabasesXmlTags.ATT_commitTimeout +
"=\"" + commitTimeout / 1000
01787 +
"\" " +
DatabasesXmlTags.ATT_rollbackTimeout +
"=\""
01788 + rollbackTimeout / 1000 +
"\">");
01789
if (scheduler != null)
01790 info.append(scheduler.getXml());
01791
01792
if (metadataCache != null || parsingCache != null || resultCache != null)
01793 {
01794 info.append(
"<" +
DatabasesXmlTags.ELT_RequestCache +
">");
01795
if (metadataCache != null)
01796 info.append(metadataCache.getXml());
01797
if (parsingCache != null)
01798 info.append(parsingCache.getXml());
01799
if (resultCache != null)
01800 info.append(resultCache.getXml());
01801 info.append(
"</" +
DatabasesXmlTags.ELT_RequestCache +
">");
01802 }
01803
01804
if (loadBalancer != null)
01805 info.append(loadBalancer.getXml());
01806
if (recoveryLog != null)
01807 info.append(
this.recoveryLog.getXml());
01808 info.append(
"</" +
DatabasesXmlTags.ELT_RequestManager +
">");
01809
return info.toString();
01810 }
01811
01817 public BackendStateListener getBackendStateListener()
01818 {
01819
return backendStateListener;
01820 }
01821 }