src/org/objectweb/cjdbc/controller/recoverylog/JDBCRecoveryLog.java

説明を見る。
00001 00025 package org.objectweb.cjdbc.controller.recoverylog; 00026 00027 import java.sql.Connection; 00028 import java.sql.DatabaseMetaData; 00029 import java.sql.PreparedStatement; 00030 import java.sql.ResultSet; 00031 import java.sql.SQLException; 00032 import java.sql.Statement; 00033 import java.util.ArrayList; 00034 00035 import org.objectweb.cjdbc.common.i18n.Translate; 00036 import org.objectweb.cjdbc.common.shared.BackendState; 00037 import org.objectweb.cjdbc.common.sql.AbstractRequest; 00038 import org.objectweb.cjdbc.common.sql.AbstractWriteRequest; 00039 import org.objectweb.cjdbc.common.sql.AlterRequest; 00040 import org.objectweb.cjdbc.common.sql.CreateRequest; 00041 import org.objectweb.cjdbc.common.sql.DeleteRequest; 00042 import org.objectweb.cjdbc.common.sql.DropRequest; 00043 import org.objectweb.cjdbc.common.sql.InsertRequest; 00044 import org.objectweb.cjdbc.common.sql.StoredProcedure; 00045 import org.objectweb.cjdbc.common.sql.UpdateRequest; 00046 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags; 00047 import org.objectweb.cjdbc.controller.connection.DriverManager; 00048 import org.objectweb.cjdbc.controller.loadbalancer.tasks.BeginTask; 00049 import org.objectweb.cjdbc.controller.loadbalancer.tasks.CommitTask; 00050 import org.objectweb.cjdbc.controller.loadbalancer.tasks.ReadStoredProcedureTask; 00051 import org.objectweb.cjdbc.controller.loadbalancer.tasks.RollbackTask; 00052 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteRequestTask; 00053 import org.objectweb.cjdbc.controller.loadbalancer.tasks.WriteStoredProcedureTask; 00054 import org.objectweb.cjdbc.controller.requestmanager.TransactionMarkerMetaData; 00055 00064 public class JDBCRecoveryLog extends AbstractRecoveryLog 00065 { 00067 private String driverClassName; 00068 00070 private String driverName; 00071 00073 private String url; 00074 00076 private String login; 00077 00079 private String password; 00080 00082 private String logTableName; 00083 private String logTableCreateStatement; 00084 // keep record of values 00085 private String idType; 00086 private String vloginType; 00087 private String sqlType; 00088 private String transactionIdType; 00089 private String logextraStatement; 00090 00092 private String checkpointTableName; 00093 private String checkpointTableCreateStatement; 00094 00100 private String backendTableCreateStatement; 00101 private String backendTableName; 00102 00103 // Keep record of the values 00104 private String nameType; 00105 private String requestIdType; 00106 private String checkextraStatement; 00107 00109 private Connection connection; 00110 00112 private PreparedStatement pstmt; 00113 00115 private long logTableId = 0; 00116 00118 private int timeout; 00119 00120 private JDBCLoggerThread loggerThread; 00121 00132 public JDBCRecoveryLog(String driverName, String driverClassName, String url, 00133 String login, String password, int requestTimeout) 00134 { 00135 this.driverName = driverName; 00136 this.driverClassName = driverClassName; 00137 this.url = url; 00138 this.login = login; 00139 this.password = password; 00140 this.timeout = requestTimeout; 00141 00142 // Connect to the database 00143 try 00144 { 00145 connectToDatabase(); 00146 } 00147 catch (SQLException e) 00148 { 00149 throw new RuntimeException("Unable to connect to the database: " + e); 00150 } 00151 00152 // Logger thread will be created in checkRecoveryLogTables() 00153 // after database has been initialized 00154 } 00155 00160 public void checkRecoveryLogTables() 00161 { 00162 try 00163 { 00164 intializeDatabase(); 00165 pstmt = connection.prepareStatement("INSERT INTO " + logTableName 00166 + " VALUES(?,?,?,?)"); 00167 } 00168 catch (SQLException e) 00169 { 00170 throw new RuntimeException("Unable to initialize the database: " + e); 00171 } 00172 00173 // Start the logger thread 00174 loggerThread = new JDBCLoggerThread(pstmt, logger); 00175 loggerThread.start(); 00176 } 00177 00181 public long getLastTransactionId() throws SQLException 00182 { 00183 Statement stmt = connection.createStatement(); 00184 ResultSet rs = stmt.executeQuery("select max(transaction_id) from " 00185 + logTableName); 00186 if (rs.next()) 00187 return rs.getInt(1); 00188 else 00189 return 0; 00190 } 00191 00202 public void setLogTableCreateStatement(String tableName, String idType, 00203 String vloginType, String sqlType, String transactionIdType, 00204 String extraStatement) 00205 { 00206 this.logTableName = tableName; 00207 this.idType = idType; 00208 this.vloginType = vloginType; 00209 this.sqlType = sqlType; 00210 this.transactionIdType = transactionIdType; 00211 this.logextraStatement = extraStatement; 00212 String logTableCreateStatement = "CREATE TABLE " + tableName + " (id " 00213 + idType + ",vlogin " + vloginType + ",sql " + sqlType 00214 + ",transaction_id " + transactionIdType + extraStatement + ")"; 00215 if (logger.isDebugEnabled()) 00216 logger.debug(Translate.get("recovery.jdbc.logtable.statement", 00217 logTableCreateStatement)); 00218 00219 this.logTableCreateStatement = logTableCreateStatement; 00220 } 00221 00230 public void setCheckpointTableCreateStatement(String tableName, 00231 String nameType, String requestIdType, String extraStatement) 00232 { 00233 this.checkpointTableName = tableName; 00234 this.nameType = nameType; 00235 this.requestIdType = requestIdType; 00236 this.checkextraStatement = extraStatement; 00237 // CREATE TABLE tableName ( 00238 // name checkpointNameColumnType, 00239 // request_id requestIdColumnType, 00240 // extraStatement) 00241 00242 String checkpointTableCreateStatement = "CREATE TABLE " + tableName 00243 + " (name " + nameType + ",request_id " + requestIdType 00244 + extraStatement + ")"; 00245 if (logger.isDebugEnabled()) 00246 logger.debug(Translate.get("recovery.jdbc.checkpointtable.statement", 00247 checkpointTableCreateStatement)); 00248 00249 this.checkpointTableCreateStatement = checkpointTableCreateStatement; 00250 } 00251 00262 public void setBackendTableCreateStatement(String tableName, 00263 String checkpointNameType, String backendNameType, 00264 String backendStateType, String databaseNameType, String extraStatement) 00265 { 00266 this.backendTableName = tableName; 00267 this.backendTableCreateStatement = "CREATE TABLE " + backendTableName 00268 + " (databaseName " + databaseNameType + ", backendName " 00269 + backendNameType + ",backendState " + backendStateType 00270 + ", checkpointName " + checkpointNameType + " " + extraStatement + ")"; 00271 00272 if (logger.isDebugEnabled()) 00273 logger.debug(Translate.get("recovery.jdbc.backendtable.statement", 00274 backendTableCreateStatement)); 00275 } 00276 00280 private synchronized long incrementLogTableId() 00281 { 00282 logTableId++; 00283 return logTableId; 00284 } 00285 00290 private void intializeDatabase() throws SQLException 00291 { 00292 boolean createLogTable = true; 00293 boolean createCheckpointTable = true; 00294 boolean createBackendTable = true; 00295 // Check if tables exist 00296 try 00297 { 00298 // Get DatabaseMetaData 00299 DatabaseMetaData metaData = connection.getMetaData(); 00300 00301 // Get a description of tables matching the catalog, schema, table name 00302 // and type. 00303 // Sending in null for catalog and schema drop them from the selection 00304 // criteria. Replace the last argument in the getTables method with 00305 // types below to obtain only database tables. (Sending in null 00306 // retrieves all types). 00307 String[] types = {"TABLE", "VIEW"}; 00308 ResultSet rs = metaData.getTables(null, null, "%", types); 00309 00310 // Get tables metadata 00311 String tableName; 00312 while (rs.next()) 00313 { 00314 // 1 is table catalog, 2 is table schema, 3 is table name, 4 is type 00315 tableName = rs.getString(3); 00316 if (logger.isDebugEnabled()) 00317 logger.debug(Translate.get("recovery.jdbc.table.found", tableName)); 00318 if (tableName.equalsIgnoreCase(logTableName)) 00319 { 00320 if (tableName.compareTo(logTableName) != 0) 00321 logger.warn(Translate.get("recovery.jdbc.logtable.case.mismatch", 00322 new String[]{logTableName, tableName})); 00323 createLogTable = false; 00324 // initialize logTableId 00325 PreparedStatement p = null; 00326 try 00327 { 00328 ResultSet result = null; 00329 p = connection.prepareStatement("SELECT MAX(id) AS max_id FROM " 00330 + logTableName); 00331 result = p.executeQuery(); 00332 if (result.next()) 00333 logTableId = result.getInt("max_id"); 00334 else 00335 logTableId = 0; 00336 p.close(); 00337 } 00338 catch (SQLException e) 00339 { 00340 try 00341 { 00342 if (p != null) 00343 p.close(); 00344 } 00345 catch (Exception ignore) 00346 { 00347 } 00348 throw new RuntimeException(Translate.get( 00349 "recovery.jdbc.logtable.getvalue.failed", e)); 00350 } 00351 00352 } 00353 if (tableName.equalsIgnoreCase(checkpointTableName)) 00354 { 00355 if (tableName.compareTo(checkpointTableName) != 0) 00356 logger.warn(Translate.get( 00357 "recovery.jdbc.checkpointtable.case.mismatch", new String[]{ 00358 checkpointTableName, tableName})); 00359 createCheckpointTable = false; 00360 } 00361 if (tableName.equalsIgnoreCase(backendTableName)) 00362 { 00363 if (tableName.compareTo(backendTableName) != 0) 00364 logger.warn(Translate.get( 00365 "recovery.jdbc.backendtable.case.mismatch", new String[]{ 00366 backendTableName, tableName})); 00367 createBackendTable = false; 00368 } 00369 } 00370 } 00371 catch (SQLException e) 00372 { 00373 logger.error(Translate.get("recovery.jdbc.table.no.description"), e); 00374 throw e; 00375 } 00376 00377 // Create the missing tables 00378 Statement stmt = null; 00379 if (createLogTable) 00380 { 00381 if (logger.isInfoEnabled()) 00382 logger.info(Translate 00383 .get("recovery.jdbc.logtable.create", logTableName)); 00384 try 00385 { 00386 stmt = connection.createStatement(); 00387 stmt.executeUpdate(logTableCreateStatement); 00388 stmt.close(); 00389 } 00390 catch (SQLException e) 00391 { 00392 throw new SQLException(Translate.get( 00393 "recovery.jdbc.logtable.create.failed", new String[]{logTableName, 00394 e.getMessage()})); 00395 } 00396 } 00397 if (createCheckpointTable) 00398 { 00399 if (logger.isInfoEnabled()) 00400 logger.info(Translate.get("recovery.jdbc.checkpointtable.create", 00401 checkpointTableName)); 00402 try 00403 { 00404 stmt = connection.createStatement(); 00405 stmt.executeUpdate(checkpointTableCreateStatement); 00406 stmt.close(); 00407 } 00408 catch (SQLException e) 00409 { 00410 throw new SQLException(Translate.get( 00411 "recovery.jdbc.checkpointtable.create.failed", new String[]{ 00412 logTableName, e.getMessage()})); 00413 } 00414 } 00415 if (createBackendTable) 00416 { 00417 if (logger.isInfoEnabled()) 00418 logger.info(Translate.get("recovery.jdbc.backendtable.create", 00419 backendTableName)); 00420 try 00421 { 00422 stmt = connection.createStatement(); 00423 stmt.executeUpdate(backendTableCreateStatement); 00424 stmt.close(); 00425 } 00426 catch (SQLException e) 00427 { 00428 throw new SQLException(Translate.get( 00429 "recovery.jdbc.backendtable.create.failed", new String[]{ 00430 logTableName, e.getMessage()})); 00431 } 00432 } 00433 00434 } 00435 00441 private void connectToDatabase() throws SQLException 00442 { 00443 // Get a connection 00444 try 00445 { 00446 if (logger.isDebugEnabled()) 00447 logger.debug(Translate.get("recovery.jdbc.connect", new String[]{url, 00448 login})); 00449 connection = DriverManager.getConnection(url, login, password, 00450 driverName, driverClassName); 00451 } 00452 catch (RuntimeException e) 00453 { 00454 String msg = Translate.get("recovery.jdbc.connect.failed", e); 00455 logger.debug(msg, e); 00456 throw new SQLException(msg); 00457 } 00458 catch (SQLException e) 00459 { 00460 String msg = Translate.get("recovery.jdbc.connect.failed", e); 00461 logger.debug(msg, e); 00462 throw new SQLException(msg); 00463 } 00464 } 00465 00469 public void logRequest(AbstractWriteRequest request) throws SQLException 00470 { 00471 loggerThread.log(incrementLogTableId(), request.getLogin(), request 00472 .getSQL(), request.getTransactionId(), request.getEscapeProcessing()); 00473 } 00474 00478 public void logRequest(StoredProcedure proc, boolean isRead) 00479 throws SQLException 00480 { 00481 if (isRead) 00482 loggerThread.log(incrementLogTableId(), proc.getLogin(), proc.getSQL(), 00483 proc.getTransactionId(), proc.getEscapeProcessing()); 00484 else 00485 { // Reverse the first bracket so that we can identify a write call 00486 StringBuffer writeCall = new StringBuffer(proc.getSQL()); 00487 writeCall.setCharAt(0, '}'); 00488 loggerThread.log(incrementLogTableId(), proc.getLogin(), writeCall 00489 .toString(), proc.getTransactionId(), proc.getEscapeProcessing()); 00490 } 00491 } 00492 00496 public void begin(TransactionMarkerMetaData tm) throws SQLException 00497 { 00498 // Store the begin in the database 00499 loggerThread.log(incrementLogTableId(), tm.getLogin(), "begin", tm 00500 .getTransactionId(), false); 00501 } 00502 00506 public void commit(TransactionMarkerMetaData tm) throws SQLException 00507 { 00508 // Some backends started a recovery process, log the commit 00509 loggerThread.log(incrementLogTableId(), tm.getLogin(), "commit", tm 00510 .getTransactionId(), false); 00511 } 00512 00516 public void rollback(TransactionMarkerMetaData tm) throws SQLException 00517 { 00518 00519 long transactionId = tm.getTransactionId(); 00520 if (isRecovering()) 00521 { 00522 // Some backends started a recovery process, log the rollback 00523 loggerThread.log(incrementLogTableId(), tm.getLogin(), "rollback", 00524 transactionId, false); 00525 } 00526 else 00527 { 00528 // The transaction failed 00529 // Remove the requests with this transactionId from the database 00530 loggerThread.rollbackTransaction(transactionId); 00531 PreparedStatement stmt = null; 00532 try 00533 { 00534 stmt = connection.prepareStatement("DELETE FROM " + logTableName 00535 + " WHERE transaction_id=?"); 00536 stmt.setLong(1, transactionId); 00537 stmt.executeUpdate(); 00538 stmt.close(); 00539 } 00540 catch (SQLException e) 00541 { 00542 try 00543 { 00544 if (stmt != null) 00545 stmt.close(); 00546 } 00547 catch (Exception ignore) 00548 { 00549 } 00550 throw new SQLException(Translate.get( 00551 "recovery.jdbc.transaction.remove.failed", new String[]{ 00552 String.valueOf(transactionId), e.getMessage()})); 00553 } 00554 } 00555 } 00556 00560 public void storeCheckpoint(String checkpointName) throws SQLException 00561 { 00562 storeCheckpoint(checkpointName, logTableId); 00563 } 00564 00572 private boolean validCheckpointName(String checkpointName) 00573 throws SQLException 00574 { 00575 PreparedStatement stmt = null; 00576 ResultSet rs = null; 00577 try 00578 { 00579 stmt = connection.prepareStatement("SELECT * FROM " + checkpointTableName 00580 + " WHERE name LIKE ?"); 00581 stmt.setString(1, checkpointName); 00582 rs = stmt.executeQuery(); 00583 00584 // If the query returned any rows, the checkpoint name is already 00585 // in use and therefore invalid. 00586 return !rs.next(); 00587 } 00588 catch (SQLException e) 00589 { 00590 throw new SQLException(Translate.get( 00591 "recovery.jdbc.checkpoint.check.failed", e)); 00592 } 00593 finally 00594 { 00595 try 00596 { 00597 if (stmt != null) 00598 stmt.close(); 00599 } 00600 catch (SQLException ignore) 00601 { 00602 } 00603 } 00604 } 00605 00610 public void storeCheckpoint(String checkpointName, long requestId) 00611 throws SQLException 00612 { 00613 PreparedStatement stmt = null; 00614 00615 // Check if a checkpoint with the name checkpointName already exists 00616 if (!validCheckpointName(checkpointName)) 00617 { 00618 throw new SQLException(Translate.get( 00619 "recovery.jdbc.checkpoint.duplicate", checkpointName)); 00620 } 00621 00622 // Wait for transaction to finish 00623 waitForTransactionsEnd(true); 00624 00625 try 00626 { 00627 stmt = connection.prepareStatement("INSERT INTO " + checkpointTableName 00628 + " VALUES(?,?)"); 00629 stmt.setString(1, checkpointName); 00630 stmt.setLong(2, requestId); 00631 stmt.executeUpdate(); 00632 stmt.close(); 00633 } 00634 catch (SQLException e) 00635 { 00636 try 00637 { 00638 if (stmt != null) 00639 stmt.close(); 00640 } 00641 catch (Exception ignore) 00642 { 00643 } 00644 throw new SQLException(Translate.get( 00645 "recovery.jdbc.checkpoint.store.failed", new String[]{checkpointName, 00646 e.getMessage()})); 00647 } 00648 } 00649 00650 private void waitForTransactionsEnd(boolean forceRollback) 00651 { 00652 if (forceRollback) 00653 { 00654 loggerThread.rollbackTransactions(); 00655 } 00656 else 00657 { 00658 synchronized (loggerThread) 00659 { 00660 while (!loggerThread.getLogQueueIsEmpty()) 00661 { 00662 try 00663 { 00664 wait(100); 00665 } 00666 catch (Exception e) 00667 { 00668 logger.warn("Exception " + e 00669 + " while waiting for end of transactions"); 00670 } 00671 } 00672 } 00673 } 00674 } 00675 00679 public long getCheckpointRequestId(String checkpointName) throws SQLException 00680 { 00681 long requestId = -1; 00682 PreparedStatement stmt = null; 00683 try 00684 { 00685 ResultSet rs = null; 00686 stmt = connection.prepareStatement("SELECT request_id FROM " 00687 + checkpointTableName + " WHERE name LIKE ?"); 00688 stmt.setString(1, checkpointName); 00689 rs = stmt.executeQuery(); 00690 00691 if (rs.next()) 00692 { 00693 requestId = rs.getInt("request_id"); 00694 } 00695 stmt.close(); 00696 } 00697 catch (SQLException e) 00698 { 00699 try 00700 { 00701 if (stmt != null) 00702 stmt.close(); 00703 } 00704 catch (Exception ignore) 00705 { 00706 } 00707 throw new SQLException(Translate.get( 00708 "recovery.jdbc.checkpoint.not.found", new String[]{checkpointName, 00709 e.getMessage()})); 00710 } 00711 return requestId; 00712 } 00713 00717 public RecoveryTask recoverNextRequest(long previousRequestId) 00718 throws SQLException 00719 { 00720 RecoveryTask task = null; 00721 00722 // Get the request with the id after previousRequestId. 00723 PreparedStatement stmt = null; 00724 try 00725 { 00726 ResultSet rs = null; 00727 boolean emptyResult; 00728 00729 do 00730 { 00731 // Take a window of 2 requests if there is a small hole 00732 // induced by a removed begin/commit/rollback command. 00733 stmt = connection.prepareStatement("SELECT * FROM " + logTableName 00734 + " WHERE id>? AND id<=? ORDER BY id"); 00735 stmt.setLong(1, previousRequestId); 00736 stmt.setLong(2, previousRequestId + 2); 00737 rs = stmt.executeQuery(); 00738 previousRequestId += 2; // Shift the window 00739 emptyResult = !rs.next(); 00740 } 00741 while (emptyResult && (previousRequestId <= logTableId)); 00742 00743 // No more request after this one 00744 if (emptyResult) 00745 return null; 00746 00747 String sql = rs.getString("sql"); 00748 String user = rs.getString("vlogin"); 00749 int transactionId = rs.getInt("transaction_id"); 00750 int id = rs.getInt("id"); 00751 stmt.close(); 00752 00753 // Construct the request object according to its type 00754 boolean escapeProcessing = true; 00755 sql = sql.trim(); 00756 // Check that the command starts with (only 2 letters are needed) 00757 // in[sert]/up[date]/de[lete]/cr[eate]/dr[op]/be[gin]/co[mmit]/ro[llback]/{c[all]/}c[all] 00758 // (write stored procedure call) 00759 String lower = sql.substring(0, 2).toLowerCase(); 00760 if (lower.equals("in")) 00761 { // insert 00762 AbstractWriteRequest wr = new InsertRequest(sql, escapeProcessing, 00763 timeout, "\n"); 00764 wr.setLogin(user); 00765 if (logger.isDebugEnabled()) 00766 logger.debug("insert request: " + sql); 00767 setDriverProcessedAndSkeleton(wr); 00768 if (transactionId != 0) 00769 { 00770 wr.setIsAutoCommit(false); 00771 wr.setTransactionId(transactionId); 00772 } 00773 else 00774 wr.setIsAutoCommit(true); 00775 task = new RecoveryTask(transactionId, id, new WriteRequestTask(1, 1, 00776 wr)); 00777 } 00778 else if (lower.equals("up")) 00779 { // update 00780 AbstractWriteRequest wr = new UpdateRequest(sql, escapeProcessing, 00781 timeout, "\n"); 00782 wr.setLogin(user); 00783 setDriverProcessedAndSkeleton(wr); 00784 if (logger.isDebugEnabled()) 00785 logger.debug("update request: " + sql); 00786 if (transactionId != 0) 00787 { 00788 wr.setIsAutoCommit(false); 00789 wr.setTransactionId(transactionId); 00790 } 00791 else 00792 wr.setIsAutoCommit(true); 00793 task = new RecoveryTask(transactionId, id, new WriteRequestTask(1, 1, 00794 wr)); 00795 } 00796 else if (lower.equals("de")) 00797 { // delete 00798 AbstractWriteRequest wr = new DeleteRequest(sql, escapeProcessing, 00799 timeout, "\n"); 00800 wr.setLogin(user); 00801 setDriverProcessedAndSkeleton(wr); 00802 if (logger.isDebugEnabled()) 00803 logger.debug("delete request: " + sql); 00804 if (transactionId != 0) 00805 { 00806 wr.setIsAutoCommit(false); 00807 wr.setTransactionId(transactionId); 00808 } 00809 else 00810 wr.setIsAutoCommit(true); 00811 task = new RecoveryTask(transactionId, id, new WriteRequestTask(1, 1, 00812 wr)); 00813 } 00814 else if (lower.equals("cr")) 00815 { // create 00816 AbstractWriteRequest wr = new CreateRequest(sql, escapeProcessing, 00817 timeout, "\n"); 00818 wr.setLogin(user); 00819 setDriverProcessedAndSkeleton(wr); 00820 if (logger.isDebugEnabled()) 00821 logger.debug("create request: " + sql); 00822 if (transactionId != 0) 00823 { 00824 wr.setIsAutoCommit(false); 00825 wr.setTransactionId(transactionId); 00826 } 00827 else 00828 wr.setIsAutoCommit(true); 00829 task = new RecoveryTask(transactionId, id, new WriteRequestTask(1, 1, 00830 wr)); 00831 } 00832 else if (lower.equals("al")) 00833 { // alter 00834 AbstractWriteRequest wr = new AlterRequest(sql, escapeProcessing, 00835 timeout, "\n"); 00836 wr.setLogin(user); 00837 setDriverProcessedAndSkeleton(wr); 00838 if (logger.isDebugEnabled()) 00839 logger.debug("alter request: " + sql); 00840 if (transactionId != 0) 00841 { 00842 wr.setIsAutoCommit(false); 00843 wr.setTransactionId(transactionId); 00844 } 00845 else 00846 wr.setIsAutoCommit(true); 00847 task = new RecoveryTask(transactionId, id, new WriteRequestTask(1, 1, 00848 wr)); 00849 } 00850 else if (lower.equals("dr")) 00851 { // drop 00852 AbstractWriteRequest wr = new DropRequest(sql, escapeProcessing, 00853 timeout, "\n"); 00854 wr.setLogin(user); 00855 setDriverProcessedAndSkeleton(wr); 00856 if (logger.isDebugEnabled()) 00857 logger.debug("drop request: " + sql); 00858 if (transactionId != 0) 00859 { 00860 wr.setIsAutoCommit(false); 00861 wr.setTransactionId(transactionId); 00862 } 00863 else 00864 wr.setIsAutoCommit(true); 00865 task = new RecoveryTask(transactionId, id, new WriteRequestTask(1, 1, 00866 wr)); 00867 } 00868 else if (lower.equals("be")) 00869 { // begin 00870 task = new RecoveryTask(transactionId, id, new BeginTask(1, 1, 00871 (long) timeout * 1000, user, transactionId)); 00872 if (logger.isDebugEnabled()) 00873 logger.debug("begin transaction: " + transactionId); 00874 } 00875 else if (lower.equals("co")) 00876 { // commit 00877 task = new RecoveryTask(transactionId, id, new CommitTask(1, 1, 00878 (long) timeout * 1000, user, transactionId)); 00879 if (logger.isDebugEnabled()) 00880 logger.debug("commit transaction: " + transactionId); 00881 } 00882 else if (lower.equals("ro")) 00883 { // rollback 00884 task = new RecoveryTask(transactionId, id, new RollbackTask(1, 1, 00885 (long) timeout * 1000, user, transactionId)); 00886 if (logger.isDebugEnabled()) 00887 logger.debug("rollback transaction: " + transactionId); 00888 } 00889 else if (lower.equals("{c")) 00890 { // read stored procedure call "{call ...}" 00891 StoredProcedure proc = new StoredProcedure(sql, escapeProcessing, 00892 timeout, "\n"); 00893 proc.setLogin(user); 00894 setDriverProcessedAndSkeleton(proc); 00895 if (logger.isDebugEnabled()) 00896 logger.debug("read stored procedure call: " + sql); 00897 if (transactionId != 0) 00898 { 00899 proc.setIsAutoCommit(false); 00900 proc.setTransactionId(transactionId); 00901 } 00902 else 00903 proc.setIsAutoCommit(true); 00904 task = new RecoveryTask(transactionId, id, new ReadStoredProcedureTask( 00905 1, 1, proc, null)); 00906 } 00907 else if (lower.equals("}c")) 00908 { // write stored procedure call, 00909 // we must replace "}call ...}" with "{call ...}" 00910 StringBuffer writeCall = new StringBuffer(sql); 00911 writeCall.setCharAt(0, '{'); 00912 StoredProcedure proc = new StoredProcedure(writeCall.toString(), 00913 escapeProcessing, timeout, "\n"); 00914 proc.setLogin(user); 00915 setDriverProcessedAndSkeleton(proc); 00916 if (logger.isDebugEnabled()) 00917 logger.debug("write stored procedure call: " + sql); 00918 if (transactionId != 0) 00919 { 00920 proc.setIsAutoCommit(false); 00921 proc.setTransactionId(transactionId); 00922 } 00923 else 00924 proc.setIsAutoCommit(true); 00925 task = new RecoveryTask(transactionId, id, 00926 new WriteStoredProcedureTask(1, 1, proc)); 00927 } 00928 else 00929 throw new SQLException(Translate.get("recovery.jdbc.sql.unkwown", sql)); 00930 } 00931 catch (SQLException e) 00932 { 00933 try 00934 { 00935 if (stmt != null) 00936 stmt.close(); 00937 } 00938 catch (Exception ignore) 00939 { 00940 } 00941 throw new SQLException(Translate.get("recovery.jdbc.recover.failed", e)); 00942 } 00943 return task; 00944 } 00945 00952 private void setDriverProcessedAndSkeleton(AbstractRequest request) 00953 { 00954 String sql = request.getSQL(); 00955 boolean isDriverProcessed = sql 00956 .indexOf(org.objectweb.cjdbc.driver.Connection.endParamTag) == -1; 00957 request.setDriverProcessed(isDriverProcessed); 00958 if (isDriverProcessed) 00959 return; // No need to set the skeleton 00960 00961 request.setSqlSkeleton(recreateSkeleton(sql)); 00962 } 00963 00970 private String recreateSkeleton(String sql) 00971 { 00972 // Here we have to rebuild the query skeleton to be able to call setXXX on 00973 // the PreparedStatement 00974 StringBuffer skeleton = new StringBuffer(); 00975 int start = 0; 00976 int end; 00977 while ((end = sql.indexOf( 00978 org.objectweb.cjdbc.driver.Connection.startParamTag, start)) != -1) 00979 { 00980 skeleton.append(sql.substring(start, end)).append('?'); 00981 start = sql.indexOf(org.objectweb.cjdbc.driver.Connection.endParamTag, 00982 end); 00983 if (start == -1) 00984 throw new RuntimeException("Malformed query in recovery log: " + sql); 00985 else 00986 start += org.objectweb.cjdbc.driver.Connection.endParamTag.length(); 00987 } 00988 if (start < sql.length()) 00989 skeleton.append(sql.substring(start)); 00990 return skeleton.toString(); 00991 } 00992 00998 public void cleanRecoveryLog() throws SQLException 00999 { 01000 PreparedStatement stmt = null; 01001 01002 // Remove the rollback statements and associated requests from the database 01003 ResultSet rs = null; 01004 try 01005 { 01006 // Get the list of transaction ids on which a rollback occurred 01007 stmt = connection.prepareStatement("SELECT transaction_id FROM " 01008 + logTableName + " WHERE sql LIKE ?"); 01009 stmt.setString(1, "rollback"); 01010 rs = stmt.executeQuery(); 01011 } 01012 catch (SQLException e) 01013 { 01014 try 01015 { 01016 if (stmt != null) 01017 stmt.close(); 01018 } 01019 catch (Exception ignore) 01020 { 01021 } 01022 throw new SQLException("Unable get rollback statements : " + e); 01023 } 01024 PreparedStatement p = null; 01025 long transactionId = -1; 01026 try 01027 { 01028 // remove the rollbacked transaction from the database 01029 while (rs.next()) 01030 { 01031 transactionId = rs.getLong("transaction_Id"); 01032 p = connection.prepareStatement("DELETE FROM " + logTableName 01033 + " WHERE transaction_id=?"); 01034 p.setLong(1, transactionId); 01035 p.executeUpdate(); 01036 p.close(); 01037 } 01038 stmt.close(); 01039 } 01040 catch (SQLException e) 01041 { 01042 try 01043 { 01044 if (stmt != null) 01045 stmt.close(); 01046 if (p != null) 01047 p.close(); 01048 } 01049 catch (Exception ignore) 01050 { 01051 } 01052 throw new SQLException(Translate.get( 01053 "recovery.jdbc.transaction.remove.failed", new String[]{ 01054 String.valueOf(transactionId), e.getMessage()})); 01055 } 01056 } 01057 01061 public void removeCheckpoint(String checkpointName) throws SQLException 01062 { 01063 PreparedStatement stmt = null; 01064 01065 // Wait for transaction to finish 01066 waitForTransactionsEnd(true); 01067 01068 try 01069 { 01070 // 1. Get the reference point to delete 01071 stmt = connection.prepareStatement("SELECT * from " + checkpointTableName 01072 + " WHERE name LIKE ?"); 01073 stmt.setString(1, checkpointName); 01074 ResultSet rs = stmt.executeQuery(); 01075 if (!rs.next()) 01076 throw new SQLException("Checkpoint " + checkpointName 01077 + " does not exist"); 01078 01079 int requestId = rs.getInt("request_id"); 01080 01081 // Delete all entries below 01082 stmt = connection.prepareStatement("DELETE FROM " + logTableName 01083 + " WHERE ID <= ?"); 01084 stmt.setInt(1, requestId); 01085 stmt.executeUpdate(); 01086 01087 // Delete checkpoint name 01088 stmt = connection.prepareStatement("DELETE FROM " + checkpointTableName 01089 + " WHERE name like ?"); 01090 stmt.setString(1, checkpointName); 01091 stmt.executeUpdate(); 01092 stmt.close(); 01093 } 01094 catch (SQLException e) 01095 { 01096 try 01097 { 01098 if (stmt != null) 01099 stmt.close(); 01100 } 01101 catch (Exception ignore) 01102 { 01103 } 01104 throw new SQLException(Translate.get( 01105 "recovery.jdbc.checkpoint.remove.failed", new String[]{ 01106 checkpointName, e.getMessage()})); 01107 } 01108 01109 } 01110 01114 public ArrayList getCheckpointNames() throws SQLException 01115 { 01116 PreparedStatement stmt = null; 01117 01118 // Wait for transaction to finish 01119 waitForTransactionsEnd(true); 01120 01121 try 01122 { 01123 stmt = connection.prepareStatement("SELECT name from " 01124 + checkpointTableName); 01125 ResultSet rs = stmt.executeQuery(); 01126 ArrayList list = new ArrayList(); 01127 while (rs.next()) 01128 { 01129 list.add(rs.getString("name")); 01130 } 01131 return list; 01132 } 01133 catch (Exception e) 01134 { 01135 throw new SQLException(Translate.get( 01136 "recovery.jdbc.checkpoint.list.failed", e)); 01137 } 01138 01139 } 01140 01145 public void storeBackendInfo(String databaseName, String backendName, 01146 String checkpoint, int state) throws SQLException 01147 { 01148 PreparedStatement stmt = null; 01149 PreparedStatement stmt2 = null; 01150 checkpoint = (checkpoint == null) ? "" : checkpoint; 01151 01152 try 01153 { 01154 // 1. Get the reference point to delete 01155 stmt = connection.prepareStatement("SELECT * from " + backendTableName 01156 + " WHERE backendName LIKE ? and databaseName LIKE ?"); 01157 stmt.setString(1, backendName); 01158 stmt.setString(2, databaseName); 01159 ResultSet rs = stmt.executeQuery(); 01160 if (!rs.next()) 01161 { 01162 stmt2 = connection.prepareStatement("INSERT INTO " + backendTableName 01163 + " values('" + databaseName + "','" + backendName + "' ," + state 01164 + ",'" + checkpoint + "')"); 01165 if (stmt2.executeUpdate() != 1) 01166 throw new SQLException( 01167 "Error while inserting new backend reference. Incorrect number of rows"); 01168 } 01169 else 01170 { 01171 stmt2 = connection.prepareStatement("UPDATE " + backendTableName 01172 + " set backendState=" + state + ", checkpointName='" + checkpoint 01173 + "' where backendName='" + backendName + "' and databaseName='" 01174 + databaseName + "'"); 01175 if (stmt2.executeUpdate() != 1) 01176 throw new SQLException( 01177 "Error while updating backend reference. Incorrect number of rows"); 01178 } 01179 01180 stmt.close(); 01181 stmt2.close(); 01182 } 01183 catch (SQLException e) 01184 { 01185 e.printStackTrace(); 01186 try 01187 { 01188 if (stmt != null) 01189 stmt.close(); 01190 if (stmt2 != null) 01191 stmt2.close(); 01192 } 01193 catch (Exception ignore) 01194 { 01195 } 01196 throw new SQLException("Unable to update checkpoint '" + checkpoint 01197 + "' for backend:" + backendName); 01198 } 01199 } 01200 01205 public BackendRecoveryInfo getBackendInfo(String databaseName, 01206 String backendName) 01207 { 01208 PreparedStatement stmt = null; 01209 String checkpoint = null; 01210 int backendState = BackendState.UNKNOWN; 01211 try 01212 { 01213 // 1. Get the reference point to delete 01214 stmt = connection.prepareStatement("SELECT * from " + backendTableName 01215 + " WHERE backendName LIKE ? and databaseName LIKE ?"); 01216 stmt.setString(1, backendName); 01217 stmt.setString(2, databaseName); 01218 ResultSet rs = stmt.executeQuery(); 01219 01220 if (rs.next()) 01221 { 01222 checkpoint = rs.getString("checkpointName"); 01223 backendState = rs.getInt("backendState"); 01224 } 01225 stmt.close(); 01226 } 01227 catch (SQLException e) 01228 { 01229 try 01230 { 01231 if (stmt != null) 01232 stmt.close(); 01233 } 01234 catch (Exception ignore) 01235 { 01236 } 01237 } 01238 return new BackendRecoveryInfo(backendName, checkpoint, backendState, 01239 databaseName); 01240 } 01241 01245 public String getXmlImpl() 01246 { 01247 StringBuffer info = new StringBuffer(); 01248 info.append("<" + DatabasesXmlTags.ELT_JDBCRecoveryLog + " " 01249 + DatabasesXmlTags.ATT_driver + "=\"" + driverClassName + "\" " 01250 + DatabasesXmlTags.ATT_url + "=\"" + url + "\" "); 01251 if (driverName != null) 01252 { 01253 info.append(DatabasesXmlTags.ATT_driverPath + "=\"" + driverName + "\" "); 01254 } 01255 info.append(DatabasesXmlTags.ATT_login + "=\"" + login + "\" " 01256 + DatabasesXmlTags.ATT_password + "=\"" + password + "\" " 01257 + DatabasesXmlTags.ATT_requestTimeout + "=\"" + (timeout / 1000) 01258 + "\">"); 01259 // Recovery Log table 01260 info.append("<" + DatabasesXmlTags.ELT_RecoveryLogTable + " " 01261 + DatabasesXmlTags.ATT_tableName + "=\"" + logTableName + "\"" + " " 01262 + DatabasesXmlTags.ATT_idColumnType + "=\"" + idType + "\"" + " " 01263 + DatabasesXmlTags.ATT_vloginColumnType + "=\"" + vloginType + "\"" 01264 + " " + DatabasesXmlTags.ATT_sqlColumnType + "=\"" + sqlType + "\"" 01265 + " " + DatabasesXmlTags.ATT_transactionIdColumnType + "=\"" 01266 + transactionIdType + "\"" + " " 01267 + DatabasesXmlTags.ATT_extraStatementDefinition + "=\"" 01268 + logextraStatement + "\"/>"); 01269 // Checkpoint table 01270 info.append("<" + DatabasesXmlTags.ELT_CheckpointTable + " " 01271 + DatabasesXmlTags.ATT_tableName + "=\"" + checkpointTableName + "\"" 01272 + " " + DatabasesXmlTags.ATT_checkpointNameColumnType + "=\"" 01273 + nameType + "\"" + " " + DatabasesXmlTags.ATT_requestIdColumnType 01274 + "=\"" + requestIdType + "\"" + " " 01275 + DatabasesXmlTags.ATT_extraStatementDefinition + "=\"" 01276 + checkextraStatement + "\"" + "/>"); 01277 // BackendLog table 01278 info.append("<" + DatabasesXmlTags.ELT_BackendTable + " " 01279 + DatabasesXmlTags.ATT_tableName + "=\"" + backendTableName + "\"" 01280 + "/>"); 01281 01282 info.append("</" + DatabasesXmlTags.ELT_JDBCRecoveryLog + ">"); 01283 01284 return info.toString(); 01285 } 01286 01292 public String getBackendTableName() 01293 { 01294 return backendTableName; 01295 } 01296 01302 public String getCheckpointTableName() 01303 { 01304 return checkpointTableName; 01305 } 01306 01312 public String getLogTableName() 01313 { 01314 return logTableName; 01315 } 01316 01320 public String[][] getData() 01321 { 01322 try 01323 { 01324 ResultSet rs = connection.createStatement().executeQuery( 01325 "Select * from " + logTableName); 01326 ArrayList list = new ArrayList(); 01327 while (rs.next()) 01328 { 01329 // 3: Query 2: User 1: ID 4: TID 01330 list.add(new String[]{rs.getString(3), rs.getString(2), 01331 rs.getString(1), rs.getString(4)}); 01332 } 01333 String[][] result = new String[list.size()][4]; 01334 for (int i = 0; i < list.size(); i++) 01335 result[i] = (String[]) list.get(i); 01336 return result; 01337 } 01338 catch (SQLException e) 01339 { 01340 return null; 01341 } 01342 } 01343 }

CJDBCversion1.0.4に対してTue Oct 12 15:16:03 2004に生成されました。 doxygen 1.3.8