Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | Related Pages

DistributedVirtualDatabase.java

00001 /**
00002  * C-JDBC: Clustered JDBC.
00003  * Copyright (C) 2002-2005 French National Institute For Research In Computer
00004  * Science And Control (INRIA).
00005  * Contact: c-jdbc@objectweb.org
00006  * 
00007  * This library is free software; you can redistribute it and/or modify it
00008  * under the terms of the GNU Lesser General Public License as published by the
00009  * Free Software Foundation; either version 2.1 of the License, or any later
00010  * version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but WITHOUT
00013  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00014  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
00015  * for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public License
00018  * along with this library; if not, write to the Free Software Foundation,
00019  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
00020  *
00021  * Initial developer(s): Emmanuel Cecchet. 
00022  * Contributor(s): _________________________.
00023  */
00024 
00025 package org.objectweb.cjdbc.controller.virtualdatabase;
00026 
00027 import java.io.Serializable;
00028 import java.net.URL;
00029 import java.sql.SQLException;
00030 import java.util.ArrayList;
00031 import java.util.HashMap;
00032 import java.util.Hashtable;
00033 import java.util.Iterator;
00034 import java.util.Map.Entry;
00035 
00036 import javax.management.NotCompliantMBeanException;
00037 
00038 import org.dom4j.DocumentException;
00039 import org.objectweb.cjdbc.common.exceptions.ControllerException;
00040 import org.objectweb.cjdbc.common.exceptions.VirtualDatabaseException;
00041 import org.objectweb.cjdbc.common.i18n.Translate;
00042 import org.objectweb.cjdbc.common.jmx.JmxException;
00043 import org.objectweb.cjdbc.common.jmx.notifications.CjdbcNotificationList;
00044 import org.objectweb.cjdbc.common.shared.BackendInfo;
00045 import org.objectweb.cjdbc.common.sql.filters.AbstractBlobFilter;
00046 import org.objectweb.cjdbc.common.xml.DatabasesXmlTags;
00047 import org.objectweb.cjdbc.controller.authentication.AuthenticationManager;
00048 import org.objectweb.cjdbc.controller.backend.BackendRecoveryPolicy;
00049 import org.objectweb.cjdbc.controller.backend.DatabaseBackend;
00050 import org.objectweb.cjdbc.controller.core.Controller;
00051 import org.objectweb.cjdbc.controller.jmx.MBeanServerManager;
00052 import org.objectweb.cjdbc.controller.jmx.RmiConnector;
00053 import org.objectweb.cjdbc.controller.requestmanager.RequestManager;
00054 import org.objectweb.cjdbc.controller.requestmanager.distributed.DistributedRequestManager;
00055 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.BackendStatus;
00056 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.BackendTransfer;
00057 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.CJDBCGroupMessage;
00058 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Commit;
00059 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.ControllerName;
00060 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.DistributedRequest;
00061 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.EnableBackend;
00062 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.Rollback;
00063 import org.objectweb.cjdbc.controller.virtualdatabase.protocol.VirtualDatabaseConfiguration;
00064 import org.objectweb.tribe.adapters.MulticastRequestAdapter;
00065 import org.objectweb.tribe.adapters.MulticastRequestListener;
00066 import org.objectweb.tribe.adapters.MulticastResponse;
00067 import org.objectweb.tribe.channel.JGroupsReliableChannelWithGms;
00068 import org.objectweb.tribe.channel.ReliableGroupChannelWithGms;
00069 import org.objectweb.tribe.common.Address;
00070 import org.objectweb.tribe.common.Group;
00071 import org.objectweb.tribe.common.GroupIdentifier;
00072 import org.objectweb.tribe.common.Member;
00073 import org.objectweb.tribe.exceptions.ChannelException;
00074 import org.objectweb.tribe.exceptions.NotConnectedException;
00075 import org.objectweb.tribe.exceptions.TimeoutException;
00076 import org.objectweb.tribe.gms.JGroupsMembershipService;
00077 import org.objectweb.tribe.messages.MessageListener;
00078 
00079 /**
00080  * A <code>DistributedVirtualDatabase</code> is a virtual database hosted by
00081  * several controllers. Communication between the controllers is achieved with
00082  * reliable multicast provided by Javagroups.
00083  * 
00084  * @author <a href="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
00085  * @version 1.0
00086  */
00087 public class DistributedVirtualDatabase extends VirtualDatabase
00088     implements
00089       MessageListener,
00090       MulticastRequestListener
00091 {
00092   //
00093   // How the code is organized ?
00094   //
00095   // 1. Member variables
00096   // 2. Constructor(s)
00097   // 3. Request handling
00098   // 4. Transaction handling
00099   // 5. Database backend management
00100   // 6. Distribution management (multicast)
00101   // 7. Getter/Setter (possibly in alphabetical order)
00102   //
00103 
00104   // Distribution
00105 
00106   /** Group name */
00107   private String                      groupName      = null;
00108   /** Controller name mapping: jgroup Addr --> controller Name */
00109   private Hashtable                   controllersMap;
00110   /** backendName -><code>BackendRecoveryPolicy</code> */
00111   private HashMap                     backendRecoveryPolicy;
00112   /** JGroups Address -><code>ArrayList</code> of <code>DatabaseBackend</code> */
00113   private Hashtable                   backendsPerController;
00114 
00115   /** JGroups channel */
00116   private ReliableGroupChannelWithGms channel        = null;
00117   /** MessageDispatcher to communicate with the group */
00118   private MulticastRequestAdapter     multicastRequestAdapter;
00119   private Group                       currentGroup   = null;
00120   private ArrayList                   allMemberButUs = null;
00121 
00122   //
00123   // Constructors
00124   //
00125 
00126   /**
00127    * Creates a new <code>DistributedVirtualDatabase</code> instance.
00128    * 
00129    * @param controller the controller we belong to
00130    * @param name the virtual database name
00131    * @param groupName the virtual database group name
00132    * @param maxConnections maximum number of concurrent connections.
00133    * @param pool should we use a pool of threads for handling connections?
00134    * @param minThreads minimum number of threads in the pool
00135    * @param maxThreads maximum number of threads in the pool
00136    * @param maxThreadIdleTime maximum time a thread can remain idle before being
00137    *          removed from the pool.
00138    * @param sqlShortFormLength maximum number of characters of an SQL statement
00139    *          to diplay in traces or exceptions
00140    * @param blobFilter encoding method for blobs
00141    * @exception NotCompliantMBeanException in case the bean does not comply with
00142    *              jmx
00143    * @exception JmxException the bean could not be registeed
00144    */
00145   public DistributedVirtualDatabase(Controller controller, String name,
00146       String groupName, int maxConnections, boolean pool, int minThreads,
00147       int maxThreads, long maxThreadIdleTime, int sqlShortFormLength,
00148       AbstractBlobFilter blobFilter) throws NotCompliantMBeanException,
00149       JmxException
00150   {
00151     super(controller, name, maxConnections, pool, minThreads, maxThreads,
00152         maxThreadIdleTime, sqlShortFormLength, blobFilter);
00153 
00154     this.groupName = groupName;
00155     backendRecoveryPolicy = new HashMap();
00156     backendsPerController = new Hashtable();
00157     controllersMap = new Hashtable();
00158   }
00159 
00160   /**
00161    * Disconnect the channel and close it.
00162    * 
00163    * @see java.lang.Object#finalize()
00164    */
00165   protected void finalize() throws Throwable
00166   {
00167     quitChannel();
00168     super.finalize();
00169   }
00170 
00171   /**
00172    * @see org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase#addBackend(org.objectweb.cjdbc.controller.backend.DatabaseBackend)
00173    */
00174   public void addBackend(DatabaseBackend db) throws VirtualDatabaseException
00175   {
00176     // Add the backend to the virtual database.
00177     super.addBackend(db);
00178 
00179     // Send a group message if already joined group
00180     if (allMemberButUs != null)
00181     {
00182       try
00183       {
00184         broadcastBackendInformation(allMemberButUs);
00185       }
00186       catch (Exception e)
00187       {
00188         String msg = "Error while broadcasting backend information when adding backend";
00189         logger.error(msg, e);
00190         throw new VirtualDatabaseException(msg, e);
00191       }
00192     }
00193   }
00194 
00195   /**
00196    * Quit the jgroups channel
00197    * 
00198    * @throws NotConnectedException if the channel is not connected
00199    * @throws ChannelException if an error occured while closing the channel
00200    */
00201   public void quitChannel() throws ChannelException, NotConnectedException
00202   {
00203     if (channel != null)
00204     {
00205       channel.close();
00206     }
00207   }
00208 
00209   /**
00210    * Returns the controllerName value.
00211    * 
00212    * @return Returns the controllerName.
00213    */
00214   public String getControllerName()
00215   {
00216     return controller.getControllerName();
00217   }
00218 
00219   /**
00220    * Returns the group name this virtual database belongs to.
00221    * 
00222    * @return a <code>String</code> value. Returns <code>null</code> if this
00223    *         virtual database is standalone
00224    */
00225   public String getGroupName()
00226   {
00227     return groupName;
00228   }
00229 
00230   /**
00231    * Sets the group name used by the controllers hosting this virtual database.
00232    * 
00233    * @param groupName the group name to set
00234    */
00235   public void setGroupName(String groupName)
00236   {
00237     this.groupName = groupName;
00238   }
00239 
00240   /**
00241    * Add a BackendRecoveryPolicy
00242    * 
00243    * @param policy the policy to add
00244    */
00245   public void addBackendRecoveryPolicy(BackendRecoveryPolicy policy)
00246   {
00247     backendRecoveryPolicy.put(policy.getBackendName(), policy);
00248   }
00249 
00250   /**
00251    * Sets a new distributed request manager for this database.
00252    * 
00253    * @param requestManager the new request manager.
00254    */
00255   public void setRequestManager(RequestManager requestManager)
00256   {
00257     if (!(requestManager instanceof DistributedRequestManager))
00258       throw new RuntimeException(
00259           "A distributed virtual database can only work with a distributed request manager.");
00260     this.requestManager = requestManager;
00261   }
00262 
00263   //
00264   //  Distribution handling
00265   //
00266 
00267   /**
00268    * Makes this virtual database join a virtual database group. Those groups are
00269    * mapped to JavaGroups groups.
00270    * 
00271    * @exception Exception if an error occurs
00272    */
00273   public void joinGroup() throws Exception
00274   {
00275     try
00276     {
00277       // Read the protocol stack configuration from jgroups.xml
00278       URL jgroupConfigFile = DistributedVirtualDatabase.class
00279           .getResource("/jgroups.xml");
00280       if (jgroupConfigFile == null)
00281         logger.warn(Translate
00282             .get("virtualdatabase.distributed.jgroups.xml.not.found"));
00283       else
00284         logger.info(Translate.get("virtualdatabase.distributed.jgroups.using",
00285             jgroupConfigFile.toString()));
00286       JGroupsMembershipService gms = new JGroupsMembershipService(
00287           jgroupConfigFile);
00288       channel = new JGroupsReliableChannelWithGms(gms);
00289       if (logger.isDebugEnabled())
00290         logger.debug("Group communication channel is configured as follows: "
00291             + ((JGroupsReliableChannelWithGms) channel).getProperties());
00292 
00293       // Join the group
00294       channel.join(new Group(new GroupIdentifier(groupName)));
00295       multicastRequestAdapter = new MulticastRequestAdapter(channel // group
00296           // channel
00297           , this /* MessageListener */
00298           , this /* MulticastRequestListener */
00299       );
00300 
00301       // Let the MulticastRequestAdapter thread pump the membership out of the
00302       // JGroups channel.
00303       Thread.yield();
00304 
00305       logger.info("Group " + groupName + " connected to "
00306           + channel.getLocalMembership());
00307 
00308       // Add ourselves to the list of controllers
00309       controllersMap.put(channel.getLocalMembership(), controller.getJmxName());
00310 
00311       // Check if we are alone or not
00312       currentGroup = channel.getCurrentGroup();
00313       if (currentGroup.getMembers().size() == 1)
00314       {
00315         logger.info(Translate.get(
00316             "virtualdatabase.distributed.configuration.first.in.group",
00317             groupName));
00318         allMemberButUs = new ArrayList();
00319         return;
00320       }
00321 
00322       // Compute the group list without us
00323       allMemberButUs = (ArrayList) currentGroup.getMembers().clone();
00324 
00325       // Our controller id is our position in the membership.
00326       // This assumes that all controllers have the view in the same order.
00327       ((DistributedRequestManager) requestManager)
00328           .setControllerId(allMemberButUs.indexOf(channel.getLocalMembership()));
00329       allMemberButUs.remove(channel.getLocalMembership());
00330 
00331       // Check with the other controller that our config is compatible
00332       if (checkConfigurationCompatibility(allMemberButUs) == false)
00333       {
00334         String msg = Translate
00335             .get("virtualdatabase.distributed.configuration.not.compatible");
00336         logger.error(msg);
00337         throw new ControllerException(msg);
00338       }
00339 
00340       logger.info(Translate
00341           .get("virtualdatabase.distributed.configuration.compatible"));
00342 
00343       // Distribute backends among controllers knowing that at this point
00344       // there is no conflict on the backend distribution policies.
00345       broadcastBackendInformation(allMemberButUs);
00346     }
00347     catch (Exception e)
00348     {
00349       String msg = Translate.get("virtualdatabase.distributed.joingroup.error",
00350           groupName);
00351       if (e instanceof RuntimeException)
00352         logger.error(msg, e);
00353       throw new Exception(msg + " (" + e + ")");
00354     }
00355   }
00356 
00357   /**
00358    * Get the JGroups' channel used for group communications
00359    * 
00360    * @return a <code>JChannel</code>
00361    */
00362   public ReliableGroupChannelWithGms getChannel()
00363   {
00364     return channel;
00365   }
00366 
00367   /**
00368    * Return the group communication multicast request adapter.
00369    * 
00370    * @return the group communication multicast request adapter
00371    */
00372   public MulticastRequestAdapter getMulticastRequestAdapter()
00373   {
00374     return multicastRequestAdapter;
00375   }
00376 
00377   /**
00378    * Returns the currentGroup value.
00379    * 
00380    * @return Returns the currentGroup.
00381    */
00382   public Group getCurrentGroup()
00383   {
00384     return currentGroup;
00385   }
00386 
00387   /**
00388    * Returns the list of all members in the group except us.
00389    * 
00390    * @return Returns the allMemberButUs.
00391    */
00392   public ArrayList getAllMemberButUs()
00393   {
00394     return allMemberButUs;
00395   }
00396 
00397   /**
00398    * Send the configuration of this controller to remote controller. All remote
00399    * controllers must agree on the compatibility of the local controller
00400    * configuration with their own configuration. Compatibility checking include
00401    * Authentication Manager, Scheduler and Load Balancer settings.
00402    * 
00403    * @param dest List of <code>Address</code> to send the message to
00404    * @return true if the local config is compatible with the group, false
00405    *         otherwise.
00406    */
00407   private boolean checkConfigurationCompatibility(ArrayList dest)
00408   {
00409     if (logger.isDebugEnabled())
00410       logger.debug(Translate
00411           .get("virtualdatabase.distributed.configuration.checking"));
00412 
00413     // Send our configuration
00414     MulticastResponse rspList;
00415     try
00416     {
00417       rspList = multicastRequestAdapter.multicastMessage(dest,
00418           new VirtualDatabaseConfiguration(this),
00419           MulticastRequestAdapter.WAIT_ALL,
00420           CJDBCGroupMessage.defaultCastTimeOut);
00421     }
00422     catch (TimeoutException e)
00423     {
00424       logger.error(
00425           "Timeout occured while checking configuration compatibility", e);
00426       return false;
00427     }
00428     catch (ChannelException e)
00429     {
00430       logger
00431           .error(
00432               "Communication error occured while checking configuration compatibility",
00433               e);
00434       return false;
00435     }
00436     catch (NotConnectedException e)
00437     {
00438       logger.error(
00439           "Channel unavailable while checking configuration compatibility", e);
00440       return false;
00441     }
00442 
00443     // Check that everybody agreed
00444     HashMap results = rspList.getResults();
00445     int size = results.size();
00446     if (size == 0)
00447       logger.warn(Translate
00448           .get("virtualdatabase.distributed.configuration.checking.noanswer"));
00449     for (Iterator iter = results.values().iterator(); iter.hasNext();)
00450     {
00451       Object response = iter.next();
00452       if (response instanceof Boolean)
00453       {
00454         if (((Boolean) response).booleanValue() == false)
00455           return false;
00456       }
00457       else
00458       {
00459         logger
00460             .error("Unexpected response while checking configuration compatibility: "
00461                 + response);
00462         return false;
00463       }
00464     }
00465 
00466     // Ok, everybody replied true (config is compatible)
00467     return true;
00468   }
00469 
00470   /**
00471    * Broadcast backend information among controllers.
00472    * 
00473    * @param dest List of <code>Address</code> to send the message to
00474    * @throws NotConnectedException if the channel is not connected
00475    * @throws ChannelException if the channel reported an error
00476    * @throws TimeoutException if a timeout occured
00477    */
00478   private void broadcastBackendInformation(ArrayList dest)
00479       throws TimeoutException, ChannelException, NotConnectedException
00480   {
00481     logger
00482         .debug(Translate
00483             .get("virtualdatabase.distributed.configuration.querying.remote.status"));
00484 
00485     // Send our backend status
00486     MulticastResponse rspList;
00487     rspList = multicastRequestAdapter.multicastMessage(dest, new BackendStatus(
00488         getBackendsInfo(backends)), MulticastRequestAdapter.WAIT_ALL,
00489         CJDBCGroupMessage.defaultCastTimeOut);
00490 
00491     int size = dest.size();
00492     for (int i = 0; i < size; i++)
00493     {
00494       // Add the backend configuration of every remote controller
00495       Member m = (Member) dest.get(i);
00496       if (rspList.getResult(m) != null)
00497       {
00498         if (logger.isDebugEnabled())
00499           logger
00500               .debug(Translate
00501                   .get(
00502                       "virtualdatabase.distributed.configuration.updating.backend.list",
00503                       m.toString()));
00504       }
00505       else
00506         logger.warn(Translate.get(
00507             "virtualdatabase.distributed.unable.get.remote.status", m
00508                 .toString()));
00509     }
00510   }
00511 
00512   /**
00513    * Check if the given backend definition is compatible with the backend
00514    * definitions of this distributed virtual database. Not that if the given
00515    * backend does not exist in the current configuration, it is considered as
00516    * compatible. Incompatibility results from 2 backends with the same JDBC URL.
00517    * 
00518    * @param backend the backend to check
00519    * @return true if the backend is compatible with the local definition
00520    * @throws VirtualDatabaseException if locking the local backend list fails
00521    */
00522   public boolean isCompatibleBackend(BackendInfo backend)
00523       throws VirtualDatabaseException
00524   {
00525     try
00526     {
00527       acquireReadLockBackendLists();
00528     }
00529     catch (InterruptedException e)
00530     {
00531       String msg = "Unable to acquire read lock on backend list in isCompatibleBackend ("
00532           + e + ")";
00533       logger.error(msg);
00534       throw new VirtualDatabaseException(msg);
00535     }
00536 
00537     try
00538     {
00539       // Find the backend
00540       String backendURL = backend.getUrl();
00541       int size = backends.size();
00542       DatabaseBackend b = null;
00543       for (int i = 0; i < size; i++)
00544       {
00545         b = (DatabaseBackend) backends.get(i);
00546         if (b.getURL().equals(backendURL))
00547           return false;
00548       }
00549     }
00550     catch (RuntimeException re)
00551     {
00552       throw new VirtualDatabaseException(re);
00553     }
00554     finally
00555     {
00556       releaseReadLockBackendLists();
00557     }
00558     // This backend does not exist here
00559     return true;
00560   }
00561 
00562   //
00563   // Message dispatcher request handling
00564   //
00565 
00566   /**
00567    * @see org.objectweb.tribe.messages.MessageListener#receive(java.io.Serializable)
00568    */
00569   public void receive(Serializable msg)
00570   {
00571     logger.error("Distributed virtual database received unhandled message: "
00572         + msg);
00573   }
00574 
00575   /**
00576    * This method handle the scheduling part of the queries to be sure that the
00577    * query is scheduled in total order before letting other queries to execute.
00578    * 
00579    * @see org.objectweb.tribe.adapters.MulticastRequestListener#handleMessageSingleThreaded(java.io.Serializable,
00580    *      org.objectweb.tribe.common.Member)
00581    */
00582   public Object handleMessageSingleThreaded(Serializable msg, Member sender)
00583   {
00584     try
00585     {
00586       if (msg != null)
00587       {
00588         logger.debug("handleMessageSingleThreaded (" + msg.getClass() + "): "
00589             + msg);
00590         if (msg instanceof BackendTransfer)
00591         {
00592           logger.info(getControllerName() + ":Received transfer command");
00593           BackendTransfer transfer = (BackendTransfer) msg;
00594           BackendInfo info = transfer.getInfo();
00595           DatabaseBackend backend = new DatabaseBackend(info);
00596           try
00597           {
00598             this.addBackend(backend);
00599           }
00600           catch (Exception e1)
00601           {
00602             logger.error("Transfer failed", e1);
00603             return e1;
00604           }
00605           logger.info(getControllerName() + ":Enable backend");
00606           enableBackend(backend.getName());
00607           return Boolean.TRUE;
00608         }
00609         // Other message types will be handled in multithreaaded handler
00610       }
00611       else
00612       {
00613         String errorMsg = "Invalid null message";
00614         logger.error(errorMsg);
00615         return new ControllerException(errorMsg);
00616       }
00617       return null;
00618     }
00619     catch (Exception e)
00620     {
00621       if (e instanceof RuntimeException)
00622         logger.warn("Error while handling group message:" + msg.getClass(), e);
00623       return e;
00624     }
00625   }
00626 
00627   /**
00628    * @see org.objectweb.tribe.adapters.MulticastRequestListener#handleMessageMultiThreaded(Serializable,
00629    *      Member, Object)
00630    */
00631   public Serializable handleMessageMultiThreaded(Serializable msg,
00632       Member sender, Object handleMessageSingleThreadedResult)
00633   {
00634     try
00635     {
00636       if (msg != null)
00637       {
00638         logger.debug("handleMessageMultiThreaded (" + msg.getClass() + "): "
00639             + msg);
00640         if (msg instanceof DistributedRequest)
00641         { //Distributed request execution
00642           if (logger.isDebugEnabled())
00643             logger.debug(getControllerName() + ": DistributedRequest "
00644                 + ((DistributedRequest) msg).getRequest().getId() + " from "
00645                 + sender);
00646           ((DistributedRequest) msg)
00647               .scheduleRequest((DistributedRequestManager) this.requestManager);
00648           return ((Serializable) ((DistributedRequest) msg)
00649               .executeScheduledRequest((DistributedRequestManager) this.requestManager));
00650         }
00651         if (msg instanceof ControllerName)
00652         {
00653           String controllerName = ((ControllerName) msg).getJmxName();
00654           if (logger.isDebugEnabled())
00655             logger.debug(getControllerName() + ": New ControllerName "
00656                 + controllerName);
00657           controllersMap.put(sender, controllerName);
00658           return Boolean.TRUE;
00659         }
00660         if (msg instanceof Commit)
00661         { //Distributed commit execution
00662           if (logger.isDebugEnabled())
00663             logger.debug(getControllerName() + ": Commit from " + sender);
00664           return ((Serializable) ((Commit) msg)
00665               .commit((DistributedRequestManager) this.requestManager));
00666         }
00667         if (msg instanceof Rollback)
00668         { //Distributed commit execution
00669           if (logger.isDebugEnabled())
00670             logger.debug(getControllerName() + ": Rollback from " + sender);
00671           return ((Serializable) ((Rollback) msg)
00672               .rollback((DistributedRequestManager) this.requestManager));
00673         }
00674         else if (msg instanceof BackendTransfer)
00675           return (Serializable) handleMessageSingleThreadedResult;
00676         else if (msg instanceof VirtualDatabaseConfiguration)
00677         { // Check if given configuration is compatible with the local one
00678           VirtualDatabaseConfiguration vdc = (VirtualDatabaseConfiguration) msg;
00679           // Send notification
00680           if (MBeanServerManager.isJmxEnabled())
00681           {
00682             Hashtable data = new Hashtable();
00683             data.put("controllerName", vdc.getControllerName());
00684             data.put("rmiconnector", new String[]{vdc.getRmiHostname(),
00685                 vdc.getRmiPort()});
00686             RmiConnector.broadcastNotification(this,
00687                 CjdbcNotificationList.DISTRIBUTED_CONTROLLER_ADDED,
00688                 CjdbcNotificationList.NOTIFICATION_LEVEL_INFO, Translate.get(
00689                     "notification.distributed.controller.added", this
00690                         .getVirtualDatabaseName()), data);
00691           }
00692           controllersMap.put(sender, vdc.getControllerJmxName());
00693           if (logger.isDebugEnabled())
00694             logger.debug("VirtualDatabaseConfiguration from "
00695                 + vdc.getControllerName());
00696 
00697           // Send controller name to new comer
00698           ArrayList target = new ArrayList();
00699           target.add(sender);
00700           multicastRequestAdapter.multicastMessage(target, new ControllerName(
00701               controller.getControllerName(), controller.getJmxName()),
00702               MulticastRequestAdapter.WAIT_ALL,
00703               CJDBCGroupMessage.defaultCastTimeOut);
00704 
00705           // Broadcast backends
00706           multicastRequestAdapter.multicastMessage(target, new BackendStatus(
00707               getBackendsInfo(backends)), MulticastRequestAdapter.WAIT_ALL,
00708               CJDBCGroupMessage.defaultCastTimeOut);
00709 
00710           if (isLocalSender(sender))
00711             return Boolean.TRUE;
00712           else
00713             return new Boolean((vdc.isCompatible(this)));
00714         }
00715         else if (msg instanceof BackendStatus)
00716         { // Update backend list from sender
00717           ArrayList remoteBackendInfoList = ((BackendStatus) msg).getBackends();
00718           // Convert BackendInfo arraylist to real DatabaseBackend objects
00719           ArrayList remoteBackendList = new ArrayList(remoteBackendInfoList
00720               .size());
00721           for (Iterator iter = remoteBackendInfoList.iterator(); iter.hasNext();)
00722           {
00723             BackendInfo info = (BackendInfo) iter.next();
00724             remoteBackendList.add(info.getDatabaseBackend());
00725           }
00726           backendsPerController.put(sender, remoteBackendList);
00727           if (logger.isInfoEnabled())
00728             logger
00729                 .info(Translate
00730                     .get(
00731                         "virtualdatabase.distributed.configuration.updating.backend.list",
00732                         sender));
00733         }
00734         else if (msg instanceof EnableBackend)
00735         {
00736           ArrayList remoteBackendList = (ArrayList) backendsPerController
00737               .get(sender);
00738           if (remoteBackendList == null)
00739           { // This case was reported by Alessandro Gamboz on April 1, 2005.
00740             // It looks like the EnableBackend message arrives before membership
00741             // has been properly updated.
00742             logger.warn("No information has been found for remote controller "
00743                 + sender);
00744             remoteBackendList = new ArrayList();
00745             backendsPerController.put(sender, remoteBackendList);
00746           }
00747           DatabaseBackend enabledBackend = ((EnableBackend) msg)
00748               .getDatabaseBackend();
00749           int size = remoteBackendList.size();
00750           boolean backendFound = false;
00751           for (int i = 0; i < size; i++)
00752           {
00753             DatabaseBackend b = (DatabaseBackend) remoteBackendList.get(i);
00754             if (b.equals(enabledBackend))
00755             {
00756               logger.info("Backend " + b.getName() + " enabled on controller "
00757                   + sender);
00758               remoteBackendList.set(i, enabledBackend);
00759               backendFound = true;
00760               break;
00761             }
00762           }
00763           if (!backendFound)
00764           {
00765             logger
00766                 .warn("Updating backend list with unknown backend "
00767                     + enabledBackend.getName() + " enabled on controller "
00768                     + sender);
00769             remoteBackendList.add(enabledBackend);
00770           }
00771         }
00772         else
00773           logger.warn("Unhandled message type received: " + msg.getClass()
00774               + "(" + msg + ")");
00775       }
00776       else
00777       {
00778         String errorMsg = "Invalid null message";
00779         logger.error(errorMsg);
00780         return new ControllerException(errorMsg);
00781       }
00782       return null;
00783     }
00784     catch (Exception e)
00785     {
00786       if (e instanceof RuntimeException)
00787         logger.warn("Error while handling group message:" + msg.getClass(), e);
00788       return e;
00789     }
00790   }
00791 
00792   /**
00793    * Returns true if the given member is ourselves.
00794    * 
00795    * @param sender the sender
00796    * @return true if we are the sender, false otherwise
00797    */
00798   private boolean isLocalSender(Member sender)
00799   {
00800     return channel.getLocalMembership().equals(sender);
00801   }
00802 
00803   /**
00804    * Get the status of all remote controllers
00805    * 
00806    * @throws NotConnectedException if the channel is not connected
00807    * @throws ChannelException if the channel reported an error
00808    * @throws TimeoutException if a timeout occured
00809    */
00810   public void getBackendStatus() throws TimeoutException, ChannelException,
00811       NotConnectedException
00812   {
00813     if (logger.isDebugEnabled())
00814       logger.debug("Requesting remote controllers status");
00815     MulticastResponse rspList = multicastRequestAdapter.multicastMessage(null,
00816         new BackendStatus(getBackendsInfo(backends)),
00817         MulticastRequestAdapter.WAIT_ALL, CJDBCGroupMessage.defaultCastTimeOut);
00818 
00819     HashMap results = rspList.getResults();
00820     for (Iterator iter = results.values().iterator(); iter.hasNext();)
00821     {
00822       ArrayList b = (ArrayList) iter.next();
00823       int bSize = b.size();
00824       if (bSize == 0)
00825         logger.debug("No Database backends");
00826       else
00827         for (int j = 0; j < bSize; j++)
00828           logger.debug(((DatabaseBackend) b.get(j)).getXml());
00829     }
00830   }
00831 
00832   //
00833   // Membership support
00834   //
00835 
00836   /**
00837    * @see org.jgroups.MembershipListener#viewAccepted(org.jgroups.View)
00838    */
00839   public void viewAccepted(Group newGroup)
00840   {
00841     ArrayList newMembers = newGroup.getMembers();
00842     int newSize = newMembers.size();
00843     for (int i = 0; i < newSize; i++)
00844     {
00845       Member m = (Member) newMembers.get(i);
00846       if (!currentGroup.hasMember(m))
00847       {
00848         logger.info(controller.getControllerName() + ":New controller " + m.toString()
00849             + " detected.");
00850       }
00851     }
00852     ArrayList currentMembers = currentGroup.getMembers();
00853     logger.info(controller.getControllerName() + ":Current Members:" + currentMembers);
00854     int currentSize = currentMembers.size();
00855     for (int i = 0; i < currentSize; i++)
00856     {
00857       Member m = (Member) currentMembers.get(i);
00858       if (!newGroup.hasMember(m))
00859       {
00860         logger.warn("Controller " + m + " has failed.");
00861       }
00862     }
00863     currentGroup = newGroup;
00864     currentMembers = currentGroup.getMembers();
00865     logger.info(controller.getControllerName() + ":new Members:" + currentMembers);
00866     allMemberButUs = ((ArrayList) currentGroup.getMembers().clone());
00867     allMemberButUs.remove(channel.getLocalMembership());
00868   }
00869 
00870   //
00871   // Getter/Setter and tools (equals, ...)
00872   //
00873 
00874   /**
00875    * Get the backend distribution policies
00876    * 
00877    * @return an ArrayList of BackendRecoveryPolicy objects
00878    */
00879   public HashMap getBackendRecoveryPolicy()
00880   {
00881     return backendRecoveryPolicy;
00882   }
00883 
00884   /**
00885    * Is this virtual database distributed ?
00886    * 
00887    * @return true
00888    */
00889   public boolean isDistributed()
00890   {
00891     return true;
00892   }
00893 
00894   /**
00895    * Two virtual databases are equal if they have the same name, login and
00896    * password.
00897    * 
00898    * @param other an object
00899    * @return a <code>boolean</code> value
00900    */
00901   public boolean equals(Object other)
00902   {
00903     if ((other == null)
00904         || (!(other instanceof org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase)))
00905       return false;
00906     else
00907     {
00908       DistributedVirtualDatabase db = (org.objectweb.cjdbc.controller.virtualdatabase.DistributedVirtualDatabase) other;
00909       return name.equals(db.getDatabaseName())
00910           && groupName.equals(db.getGroupName());
00911     }
00912   }
00913 
00914   /**
00915    * Get the XML dump of the Distribution element if any.
00916    * 
00917    * @return XML dump of the Distribution element
00918    */
00919   protected String getDistributionXml()
00920   {
00921     StringBuffer info = new StringBuffer();
00922     info.append("<" + DatabasesXmlTags.ELT_Distribution + " "
00923         + DatabasesXmlTags.ATT_groupName + "=\"" + groupName + "\">");
00924 
00925     for (Iterator iter = backendRecoveryPolicy.values().iterator(); iter
00926         .hasNext();)
00927     {
00928       BackendRecoveryPolicy p = (BackendRecoveryPolicy) iter.next();
00929       info.append(p.getXml());
00930     }
00931 
00932     info.append("</" + DatabasesXmlTags.ELT_Distribution + ">");
00933     return info.toString();
00934   }
00935 
00936   /**
00937    * @see org.objectweb.cjdbc.common.jmx.mbeans.VirtualDatabaseMBean#viewControllerList()
00938    */
00939   public String[] viewControllerList()
00940   {
00941     if (logger.isInfoEnabled())
00942     {
00943       logger.info(channel.getLocalMembership() + " see members:"
00944           + currentGroup.getMembers() + " and has mapping:" + controllersMap);
00945     }
00946     String[] members = new String[controllersMap.keySet().size()];
00947     Iterator iter = controllersMap.keySet().iterator();
00948     int i = 0;
00949     while (iter.hasNext())
00950     {
00951       members[i] = (String) controllersMap.get(iter.next());
00952       i++;
00953     }
00954     return members;
00955   }
00956 
00957   /**
00958    * (non-Javadoc)
00959    * 
00960    * @see org.objectweb.cjdbc.controller.virtualdatabase.VirtualDatabase#setAuthenticationManager(org.objectweb.cjdbc.controller.authentication.AuthenticationManager)
00961    */
00962   public void setAuthenticationManager(
00963       AuthenticationManager authenticationManager)
00964   {
00965     super.setAuthenticationManager(authenticationManager);
00966   }
00967 
00968   /**
00969    * @see org.objectweb.cjdbc.common.jmx.mbeans.VirtualDatabaseMBean#viewGroupBackends()
00970    */
00971   public Hashtable viewGroupBackends() throws VirtualDatabaseException
00972   {
00973     Hashtable map = new Hashtable(controllersMap.size());
00974     Iterator iter = backendsPerController.keySet().iterator();
00975     Address addr;
00976     while (iter.hasNext())
00977     {
00978       addr = (Address) iter.next();
00979       map.put(controllersMap.get(addr), backendsPerController.get(addr));
00980     }
00981     return map;
00982   }
00983 
00984   /**
00985    * We have to convert the backends list from an array of
00986    * <code>DatabaseBackend</code> object to an ArrayList of
00987    * <code>BackendInfo</code> objects. The DatabaseBackend objects cannot be
00988    * serialized because they are used as MBean and notification emitters, so we
00989    * want to extract the info out of them.
00990    * 
00991    * @param backendsObject the list of DatabaseBackend object
00992    * @see BackendInfo
00993    * @return a list of BackendInfo objects
00994    */
00995   public ArrayList getBackendsInfo(ArrayList backendsObject)
00996   {
00997     int size = backendsObject.size();
00998     ArrayList infos = new ArrayList(size);
00999     DatabaseBackend backend;
01000     for (int i = 0; i < size; i++)
01001     {
01002       backend = (DatabaseBackend) backendsObject.get(i);
01003       infos.add(createBackendInfo(backend, false));
01004     }
01005     return infos;
01006   }
01007 
01008   /**
01009    * Create backend information object from a DatabaseBackend object This will
01010    * get only static information
01011    * 
01012    * @param backend the <code>DatabaseBackend</code> object to get info from
01013    * @param useXml should we use xml for extensive backend description
01014    * @return <code>BackendInfo</code>
01015    */
01016   public BackendInfo createBackendInfo(DatabaseBackend backend, boolean useXml)
01017   {
01018     if (useXml)
01019       try
01020       {
01021         return new BackendInfo(backend.getXml());
01022       }
01023       catch (DocumentException e)
01024       {
01025         // go to next method. cause we haven't returned yet
01026       }
01027     return new BackendInfo(backend);
01028   }
01029 
01030   /**
01031    * @see org.objectweb.cjdbc.common.jmx.mbeans.VirtualDatabaseMBean#removeBackend(java.lang.String)
01032    */
01033   public void removeBackend(String backend) throws VirtualDatabaseException
01034   {
01035     super.removeBackend(backend);
01036 
01037     try
01038     {
01039       // Send a group message to update backend list
01040       broadcastBackendInformation(allMemberButUs);
01041     }
01042     catch (Exception e)
01043     {
01044       String msg = "An error occured while multicasting new backedn information";
01045       logger.error(msg, e);
01046       throw new VirtualDatabaseException(msg, e);
01047     }
01048   }
01049 
01050   /**
01051    * @see org.objectweb.cjdbc.common.jmx.mbeans.VirtualDatabaseMBean#transferBackend(java.lang.String,
01052    *      java.lang.String)
01053    */
01054   public void transferBackend(String backend, String controllerDestination)
01055       throws VirtualDatabaseException
01056   {
01057     // Get the target controller
01058     ArrayList dest = new ArrayList(1);
01059     Iterator iter = controllersMap.entrySet().iterator();
01060     Entry entry;
01061     Member targetMember = null;
01062     while (iter.hasNext())
01063     {
01064       entry = (Entry) iter.next();
01065       if (entry.getValue().equals(controllerDestination))
01066         targetMember = (Member) entry.getKey();
01067     }
01068     if (targetMember == null)
01069       throw new VirtualDatabaseException("Cannot find controller:"
01070           + controllerDestination + " in group");
01071     dest.add(targetMember);
01072 
01073     // Get reference on backend
01074     DatabaseBackend db = getAndCheckBackend(backend, NO_CHECK_BACKEND);
01075 
01076     //  Disable backend
01077     try
01078     {
01079       DistributedRequestManager manager = ((DistributedRequestManager) requestManager);
01080       if (hasRecoveryLog())
01081         manager.disableBackendForCheckpoint(db, "transfer of " + backend
01082             + " to :" + controllerDestination);
01083       else
01084         manager.disableBackend(db);
01085     }
01086     catch (SQLException e)
01087     {
01088       throw new VirtualDatabaseException(e.getMessage());
01089     }
01090 
01091     try
01092     {
01093       //  Send backend transfer message
01094       if (logger.isDebugEnabled())
01095         logger.debug("Sending transfer message to:" + targetMember);
01096 
01097       multicastRequestAdapter.multicastMessage(dest, new BackendTransfer(
01098           controllerDestination, createBackendInfo(db, true)),
01099           MulticastRequestAdapter.WAIT_ALL,
01100           CJDBCGroupMessage.defaultCastTimeOut);
01101 
01102       // Remove backend from this controller
01103       removeBackend(db);
01104 
01105       // Broadcast updated backend list
01106       broadcastBackendInformation(allMemberButUs);
01107     }
01108     catch (Exception e)
01109     {
01110       String msg = "An error occured while transfering the backend";
01111       logger.error(msg, e);
01112       throw new VirtualDatabaseException(msg, e);
01113     }
01114   }
01115 
01116 }

Generated on Mon Apr 11 22:01:31 2005 for C-JDBC by  doxygen 1.3.9.1