KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > tribe > channel > JGroupsReliableChannelWithGms


1 /**
2  * C-JDBC: Clustered JDBC.
3  * Copyright (C) 2002-2005 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: c-jdbc@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): ______________________.
23  */

24
25 package org.objectweb.tribe.channel;
26
27 import java.io.ByteArrayInputStream JavaDoc;
28 import java.io.ObjectInputStream JavaDoc;
29 import java.io.Serializable JavaDoc;
30 import java.util.ArrayList JavaDoc;
31 import java.util.HashMap JavaDoc;
32
33 import org.jgroups.Address;
34 import org.jgroups.BlockEvent;
35 import org.jgroups.ChannelClosedException;
36 import org.jgroups.ChannelNotConnectedException;
37 import org.jgroups.JChannel;
38 import org.jgroups.Message;
39 import org.jgroups.SuspectEvent;
40 import org.jgroups.TimeoutException;
41 import org.jgroups.View;
42 import org.objectweb.tribe.common.Group;
43 import org.objectweb.tribe.common.GroupIdentifier;
44 import org.objectweb.tribe.common.Member;
45 import org.objectweb.tribe.exceptions.AlreadyMemberException;
46 import org.objectweb.tribe.exceptions.ChannelException;
47 import org.objectweb.tribe.exceptions.NotConnectedException;
48 import org.objectweb.tribe.gms.JGroupsMembershipService;
49 import org.objectweb.tribe.messages.FragmentedMessage;
50 import org.objectweb.tribe.messages.GroupMessage;
51
52 /**
53  * This class defines a JGroupsReliableChannelWithGms which is a
54  * ReliableGroupChannelWithGms wrapper on top of a JGroups JChannel that uses
55  * the GMS service provided by JGroups.
56  *
57  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
58  * @version 1.0
59  */

60 public class JGroupsReliableChannelWithGms extends ReliableGroupChannelWithGms
61 {
62   // The underlying JGroups channel
63
private JChannel jgroupsChannel;
64   private JGroupsMembershipService jgroupsGMS;
65
66   // Fragment size for oversized messages
67
private static final int FRAGMENT_SIZE = 32000;
68   private HashMap JavaDoc fragmentList = new HashMap JavaDoc();
69
70   /**
71    * Creates a new <code>JGroupsReliableChannel</code> object
72    *
73    * @param jGroupsChannel JGroups channel to use
74    * @param gms the GMS service to use
75    */

76   public JGroupsReliableChannelWithGms(JGroupsMembershipService gms)
77   {
78     super(gms);
79     this.jgroupsGMS = gms;
80     this.jgroupsChannel = gms.getJGroupsChannel();
81   }
82
83   /**
84    * @see org.objectweb.tribe.channel.ReliableGroupChannel#join(org.objectweb.tribe.common.Group)
85    */

86   public void join(Group g) throws AlreadyMemberException, ChannelException,
87       NotConnectedException
88   {
89     if ((currentGroup != null) && currentGroup.equals(g))
90       throw new AlreadyMemberException();
91
92     // Quit the previous group first if needed
93
if (currentGroup != null)
94       try
95       {
96         quit();
97       }
98       catch (Exception JavaDoc ignore)
99       {
100       }
101
102     currentGroup = g;
103     me = jgroupsGMS.join(this, g.getGroupIdentifier());
104   }
105
106   /**
107    * @see org.objectweb.tribe.channel.ReliableGroupChannel#quit()
108    */

109   public void quit() throws ChannelException, NotConnectedException
110   {
111     if (currentGroup == null)
112       throw new NotConnectedException();
113     jgroupsChannel.close();
114     currentGroup = null;
115   }
116
117   /**
118    * @see org.objectweb.tribe.channel.ReliableGroupChannel#receive()
119    */

120   public Serializable JavaDoc receive() throws ChannelException, NotConnectedException
121   {
122     if (currentGroup == null)
123       throw new NotConnectedException();
124     while (true)
125     { // Loop to process/filter JGroups internal events
126
try
127       {
128         Object JavaDoc obj = jgroupsChannel.receive(0);
129
130         // JGroups "feature" that might deliver null messages for no reason (see
131
// JGroups' PushPullAdapter code).
132
if (obj == null)
133           continue;
134
135         if (obj instanceof View)
136         {
137           jgroupsGMS.viewAccepted((View) obj);
138           continue;
139         }
140         else if (obj instanceof SuspectEvent)
141         {
142           jgroupsGMS.suspect((Address) ((SuspectEvent) obj).getMember());
143           continue;
144         }
145         else if (obj instanceof BlockEvent)
146         {
147           jgroupsGMS.block();
148           continue;
149         }
150         else if (obj instanceof Message)
151         {
152           Object JavaDoc content = ((Message) obj).getObject();
153           GroupMessage groupMessage = null;
154           if (content instanceof FragmentedMessage)
155           {
156             FragmentedMessage fragment = (FragmentedMessage) content;
157             byte[] message;
158             if (fragment.getFragmentId() == 0)
159             { // First fragment, allocate message
160
message = new byte[fragment.getMessageSize()];
161               fragmentList.put(fragment.getMessageId(), message);
162             }
163             else
164             { // Retrieve message
165
message = (byte[]) fragmentList.get(fragment.getMessageId());
166             }
167
168             // Copy fragment
169
int offset = fragment.getFragmentId() * FRAGMENT_SIZE;
170             int fragmentLength = fragment.getFragmentData().length;
171             System.arraycopy(fragment.getFragmentData(), 0, message, offset,
172                 fragmentLength);
173
174             // Was it the last fragment?
175
if (offset + fragmentLength == fragment.getMessageSize())
176             { // Last fragment convert to a GroupMessage Object
177
try
178               {
179                 ObjectInputStream JavaDoc ois = new ObjectInputStream JavaDoc(
180                     new ByteArrayInputStream JavaDoc(message));
181                 groupMessage = (GroupMessage) ois.readObject();
182               }
183               catch (Exception JavaDoc e)
184               {
185                 e.printStackTrace();
186                 throw new ChannelException(
187                     "Failed to reassemble fragmented message", e);
188               }
189               finally
190               { // Remove message from list
191
fragmentList.remove(fragment.getMessageId());
192               }
193             }
194             else
195             { // Not last fragment
196
continue;
197             }
198           }
199           else if (content instanceof GroupMessage)
200             groupMessage = (GroupMessage) content;
201
202           // Do not else if here, if the last fragment was read, obj has been
203
// modified just above to contain the complete group message.
204
if (groupMessage == null)
205             return null;
206           else
207           { // Check that we should receive this message
208
ArrayList JavaDoc members = (ArrayList JavaDoc) groupMessage.getChunks().get(1);
209             if (members.contains(getLocalMembership()))
210               return groupMessage.getMessage();
211           }
212         }
213         else
214           System.out.println("Unhandled JGroups message type ("
215               + obj.getClass() + "): " + obj);
216       }
217       catch (ChannelNotConnectedException e)
218       {
219         throw new NotConnectedException(e);
220       }
221       catch (ChannelClosedException e)
222       {
223         throw new NotConnectedException(e);
224       }
225       catch (TimeoutException e)
226       {
227         throw new ChannelException(
228             "Timeout while retrieving message from channel", e);
229       }
230     }
231   }
232
233   /**
234    * @see org.objectweb.tribe.channel.ReliableGroupChannel#send(java.io.Serializable,
235    * org.objectweb.tribe.common.GroupIdentifier, java.util.ArrayList)
236    */

237   public ArrayList JavaDoc send(Serializable JavaDoc msg, GroupIdentifier gid, ArrayList JavaDoc members)
238       throws ChannelException, NotConnectedException
239   {
240     // JGroups channel can only send to the whole group else unicast will bypass
241
// the ordering layer. Therefore we embed the destination members in the
242
// message for filtering on the receiver side.
243
GroupMessage tribeMessage = new GroupMessage(msg, gid);
244     tribeMessage.addChunk(members);
245     int msgSize = tribeMessage.getByteArray().length;
246     try
247     {
248       if (msgSize > FRAGMENT_SIZE)
249       { // Fragmentation needed since JGroups does not do it properly
250
byte[] completeMessageInBytes = tribeMessage.getByteArray();
251         byte[] fragmentData = new byte[FRAGMENT_SIZE];
252         FragmentedMessage fragment = new FragmentedMessage(msgSize);
253         int nbOfFragments = msgSize / FRAGMENT_SIZE;
254         int currentFragment = 0;
255         while (currentFragment < nbOfFragments)
256         {
257           System.arraycopy(completeMessageInBytes, currentFragment
258               * FRAGMENT_SIZE, fragmentData, 0, FRAGMENT_SIZE);
259           fragment.setFragment(currentFragment, fragmentData);
260           jgroupsChannel.send(new Message(null, null, fragment));
261           currentFragment++;
262         }
263         // Send last fragment
264
int lastFragmentSize = msgSize % FRAGMENT_SIZE;
265         if (lastFragmentSize > 0)
266         {
267           // Adjust to the right size
268
fragmentData = new byte[lastFragmentSize];
269           System.arraycopy(completeMessageInBytes, currentFragment
270               * FRAGMENT_SIZE, fragmentData, 0, lastFragmentSize);
271           fragment.setFragment(currentFragment, fragmentData);
272           jgroupsChannel.send(new Message(null, null, fragment));
273         }
274       }
275       else
276       { // No fragmentation
277
Message jgroupsMsg = new Message(null, null, tribeMessage);
278         jgroupsChannel.send(jgroupsMsg);
279       }
280       // Note that JGroups channel does not offer the facility of finding which
281
// node have failed, it will be reported by a view change
282
return null;
283     }
284     catch (ChannelNotConnectedException e)
285     {
286       throw new NotConnectedException(e);
287     }
288     catch (ChannelClosedException e)
289     {
290       throw new NotConnectedException(e);
291     }
292   }
293
294   /**
295    * @see java.lang.Object#toString()
296    */

297   public String JavaDoc toString()
298   {
299     return "JGroups channel wrapper: " + jgroupsChannel;
300   }
301
302   /**
303    * Return the JGroups channel properties
304    *
305    * @return JGroups channel properties
306    */

307   public String JavaDoc getProperties()
308   {
309     return jgroupsChannel.getProperties();
310   }
311
312   /**
313    * @see org.objectweb.tribe.channel.ReliableGroupChannel#getLocalMembership()
314    */

315   public Member getLocalMembership()
316   {
317     return JGroupsMembershipService
318         .memberFromJGroupsAddress((org.jgroups.stack.IpAddress) jgroupsChannel
319             .getLocalAddress());
320   }
321
322   /**
323    * @see org.objectweb.tribe.channel.ReliableGroupChannel#getCurrentGroup()
324    */

325   public Group getCurrentGroup()
326   {
327     if (currentGroup == null)
328       return null;
329     else
330       return gms.getGroup(currentGroup.getGroupIdentifier());
331   }
332
333 }
Popular Tags