KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > tribe > channel > tcp > TcpChannel


1 /**
2  * Tribe: Group communication library.
3  * Copyright (C) 2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: tribe@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): ______________________.
23  */

24
25 package org.objectweb.tribe.channel.tcp;
26
27 import java.io.BufferedInputStream JavaDoc;
28 import java.io.BufferedOutputStream JavaDoc;
29 import java.io.DataInputStream JavaDoc;
30 import java.io.DataOutputStream JavaDoc;
31 import java.io.IOException JavaDoc;
32 import java.net.InetSocketAddress JavaDoc;
33 import java.net.Socket JavaDoc;
34
35 import org.objectweb.tribe.channel.AbstractReliableFifoChannel;
36 import org.objectweb.tribe.common.Address;
37 import org.objectweb.tribe.common.IpAddress;
38 import org.objectweb.tribe.exceptions.ChannelException;
39 import org.objectweb.tribe.exceptions.NotConnectedException;
40 import org.objectweb.tribe.messages.ChannelMessage;
41
42 /**
43  * This class defines a TcpChannel
44  *
45  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
46  * @version 1.0
47  */

48 public class TcpChannel extends AbstractReliableFifoChannel
49 {
50   // underlying TCP socket
51
private Socket JavaDoc socket;
52   private IpAddress destinationAddress = null;
53   private IpAddress sourceAddress = null;
54   private DataInputStream JavaDoc inStream;
55   private DataOutputStream JavaDoc outStream;
56   private boolean isClosed;
57
58   /**
59    * Creates a new <code>TcpChannel</code> using the local host IP and a port
60    * number choosen by the system.
61    *
62    * @throws IOException if an error occurs
63    */

64   public TcpChannel() throws IOException JavaDoc
65   {
66     socket = new Socket JavaDoc();
67     sourceAddress = new IpAddress(socket.getLocalAddress(), socket
68         .getLocalPort());
69     initializeStreams();
70   }
71
72   /**
73    * Creates a new <code>TcpChannel</code> binding the source endpoint to the
74    * given address.
75    *
76    * @param sourceAddress address to bind the socket to
77    * @throws IOException if an error occurs
78    */

79   public TcpChannel(IpAddress sourceAddress) throws IOException JavaDoc
80   {
81     socket = new Socket JavaDoc(sourceAddress.getAddress(), sourceAddress.getPort(),
82         true);
83     this.sourceAddress = sourceAddress;
84     initializeStreams();
85   }
86
87   /**
88    * Creates a new <code>TcpChannel</code> from an existing socket.
89    *
90    * @param socket a TCP socket
91    * @throws IOException if an error occurs
92    */

93   public TcpChannel(Socket JavaDoc socket) throws IOException JavaDoc
94   {
95     this.socket = socket;
96     sourceAddress = new IpAddress(socket.getLocalAddress(), socket
97         .getLocalPort());
98     destinationAddress = new IpAddress(socket.getInetAddress(), socket
99         .getPort());
100     initializeStreams();
101   }
102
103   /**
104    * Disable the Nagle algorithm for better latency and create object input and
105    * output streams.
106    *
107    * @throws IOException if an error occurs
108    */

109   private void initializeStreams() throws IOException JavaDoc
110   {
111     socket.setTcpNoDelay(false);
112     isClosed = false;
113   }
114
115   /**
116    * @see org.objectweb.tribe.channel.AbstractReliableFifoChannel#send(org.objectweb.tribe.messages.ChannelMessage)
117    */

118   public synchronized void send(ChannelMessage msg) throws ChannelException,
119       NotConnectedException
120   {
121     // Sanity checks
122
if (isClosed || (socket == null))
123       throw new NotConnectedException();
124     if (outStream == null)
125     { // Create the stream if it does not exist
126
try
127       {
128         outStream = new DataOutputStream JavaDoc(new BufferedOutputStream JavaDoc(socket
129             .getOutputStream()));
130       }
131       catch (IOException JavaDoc e1)
132       {
133         throw new ChannelException("Unable to create output stream", e1);
134       }
135     }
136
137     // Send the message
138
try
139     {
140       byte[] msgInBytes = msg.getByteArray();
141       // Note that we do not send the message as an Object but directly in its
142
// serialized form to prevent multiple serializations of the object.
143
outStream.writeInt(msgInBytes.length);
144       outStream.write(msgInBytes);
145       outStream.flush();
146     }
147     catch (IOException JavaDoc e)
148     {
149       throw new ChannelException("Error while sending message on socket", e);
150     }
151   }
152
153   /**
154    * @see org.objectweb.tribe.channel.AbstractReliableFifoChannel#close()
155    */

156   public void close() throws ChannelException
157   {
158     if (isClosed)
159       return;
160     try
161     {
162       socket.close();
163     }
164     catch (IOException JavaDoc e)
165     {
166       throw new ChannelException("Error while closing the socket", e);
167     }
168     finally
169     {
170       isClosed = true;
171     }
172   }
173
174   /**
175    * @see org.objectweb.tribe.channel.AbstractReliableFifoChannel#connect(org.objectweb.tribe.common.Address)
176    */

177   public void connect(Address JavaDoc destination) throws ChannelException
178   {
179     if (!(destination instanceof IpAddress))
180       throw new ChannelException("TCP Channels require IP addresses.");
181     destinationAddress = (IpAddress) destination;
182     try
183     {
184       socket.connect(new InetSocketAddress JavaDoc(destinationAddress.getAddress(),
185           destinationAddress.getPort()));
186     }
187     catch (IOException JavaDoc e)
188     {
189       inStream = null;
190       outStream = null;
191       throw new ChannelException("Error while connecting the socket", e);
192     }
193   }
194
195   /**
196    * @see org.objectweb.tribe.channel.AbstractReliableFifoChannel#getDestinationAddress()
197    */

198   public Address JavaDoc getDestinationAddress()
199   {
200     return destinationAddress;
201   }
202
203   /**
204    * @see org.objectweb.tribe.channel.AbstractReliableFifoChannel#getSourceAddress()
205    */

206   public Address JavaDoc getSourceAddress()
207   {
208     return sourceAddress;
209   }
210
211   /**
212    * Returns the inStream value.
213    *
214    * @return Returns an input stream on the underlying socket or null if no
215    * input stream can be created.
216    */

217   protected DataInputStream JavaDoc getInStream()
218   {
219     if (socket == null)
220       return null;
221     if (inStream == null)
222       try
223       {
224         inStream = new DataInputStream JavaDoc(new BufferedInputStream JavaDoc(socket
225             .getInputStream()));
226       }
227       catch (IOException JavaDoc e)
228       {
229         return null;
230       }
231
232     return inStream;
233   }
234 }
Popular Tags