KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > tribe > channel > tcp > TcpChannelPool


1 /**
2  * Tribe: Group communication library.
3  * Copyright (C) 2004 French National Institute For Research In Computer
4  * Science And Control (INRIA).
5  * Contact: tribe@objectweb.org
6  *
7  * This library is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License as published by the
9  * Free Software Foundation; either version 2.1 of the License, or any later
10  * version.
11  *
12  * This library is distributed in the hope that it will be useful, but WITHOUT
13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
15  * for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this library; if not, write to the Free Software Foundation,
19  * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
20  *
21  * Initial developer(s): Emmanuel Cecchet.
22  * Contributor(s): ______________________.
23  */

24
25 package org.objectweb.tribe.channel.tcp;
26
27 import java.io.IOException JavaDoc;
28 import java.util.HashMap JavaDoc;
29
30 import org.objectweb.tribe.channel.AbstractChannelPool;
31 import org.objectweb.tribe.channel.AbstractReliableFifoChannel;
32 import org.objectweb.tribe.channel.AbstractServerChannel;
33 import org.objectweb.tribe.common.Address;
34 import org.objectweb.tribe.common.IpAddress;
35 import org.objectweb.tribe.common.log.Trace;
36 import org.objectweb.tribe.exceptions.ChannelException;
37
38 /**
39  * This class defines a TcpChannelPool
40  *
41  * @author <a HREF="mailto:Emmanuel.Cecchet@inria.fr">Emmanuel Cecchet </a>
42  * @version 1.0
43  */

44 public class TcpChannelPool extends AbstractChannelPool
45 {
46
47   private static TcpChannelPool pool = new TcpChannelPool();
48   private HashMap JavaDoc readerThreads;
49   private HashMap JavaDoc accepterThreads;
50   private static Trace logger = Trace
51                                            .getLogger("org.objectweb.tribe.channel");
52
53   /**
54    * Creates a new <code>TcpChannelPool</code> object
55    */

56   public TcpChannelPool()
57   {
58     super();
59     readerThreads = new HashMap JavaDoc();
60     accepterThreads = new HashMap JavaDoc();
61   }
62
63   /**
64    * @see org.objectweb.tribe.serverSocket.pool.AbstractChannelPool#getChannel(org.objectweb.tribe.common.Address)
65    */

66   public AbstractReliableFifoChannel getChannel(Address destination)
67       throws ChannelException
68   {
69     synchronized (channels)
70     {
71       AbstractReliableFifoChannel channel = (AbstractReliableFifoChannel) channels
72           .get(destination);
73       if (channel == null)
74       {
75         try
76         {
77           if (logger.isDebugEnabled())
78             logger.debug("Getting new channel for " + destination);
79           channel = new TcpChannel();
80           channel.connect(destination);
81           channels.put(destination, channel);
82           TcpReaderThread thread = new TcpReaderThread((TcpChannel) channel,
83               keyBuffers);
84           thread.start();
85           synchronized (readerThreads)
86           {
87             readerThreads.put(channel, thread);
88           }
89         }
90         catch (IOException JavaDoc e)
91         {
92           throw new ChannelException("Failed to create a new serverSocket to "
93               + destination, e);
94         }
95       }
96       return channel;
97     }
98   }
99
100   /**
101    * @see org.objectweb.tribe.channel.AbstractChannelPool#getServerChannel(org.objectweb.tribe.common.Address)
102    */

103   public AbstractServerChannel getServerChannel(Address serverAddress)
104       throws ChannelException
105   {
106     synchronized (serverChannels)
107     {
108       AbstractServerChannel channel = (AbstractServerChannel) serverChannels
109           .get(serverAddress);
110       if (channel == null)
111       {
112         try
113         {
114           if (logger.isDebugEnabled())
115             logger.debug("Getting new server channel for " + serverAddress);
116           int port = ((IpAddress) serverAddress).getPort();
117           if (port == 0)
118             channel = new TcpServerChannel(port);
119           else
120           {
121             channel = new TcpServerChannel();
122             channel.bind(serverAddress);
123           }
124           serverChannels.put(serverAddress, channel);
125           TcpServerAccepterThread thread = new TcpServerAccepterThread(channel,
126               channels, readerThreads, keyBuffers);
127           thread.start();
128           synchronized (accepterThreads)
129           {
130             accepterThreads.put(channel, thread);
131           }
132         }
133         catch (IOException JavaDoc e)
134         {
135           throw new ChannelException(
136               "Failed to create a new server serverSocket on " + serverAddress,
137               e);
138         }
139       }
140       return channel;
141     }
142   }
143
144   /**
145    * Returns a static instance of the serverSocket pool.
146    *
147    * @return a <code>TcpChannelPool</code>
148    */

149   public static AbstractChannelPool getChannelPool()
150   {
151     return pool;
152   }
153
154   /**
155    * Terminates the TCP reader thread as well.
156    *
157    * @see org.objectweb.tribe.channel.AbstractChannelPool#removeChannelFromPool(org.objectweb.tribe.channel.AbstractReliableFifoChannel)
158    */

159   public boolean removeChannelFromPool(AbstractReliableFifoChannel channel)
160   {
161     TcpReaderThread thread;
162     synchronized (readerThreads)
163     {
164       thread = (TcpReaderThread) readerThreads.remove(channel);
165     }
166     if (thread != null)
167       thread.kill();
168     return super.removeChannelFromPool(channel);
169   }
170
171   /**
172    * Terminates the TCP accepter thread as well.
173    *
174    * @see org.objectweb.tribe.channel.AbstractChannelPool#removeServerChannelFromPool(org.objectweb.tribe.channel.AbstractServerChannel)
175    */

176   public boolean removeServerChannelFromPool(AbstractServerChannel channel)
177   {
178     TcpServerAccepterThread thread;
179     synchronized (accepterThreads)
180     {
181       thread = (TcpServerAccepterThread) accepterThreads.remove(channel);
182     }
183     if (thread != null)
184       thread.kill();
185     return super.removeServerChannelFromPool(channel);
186   }
187 }
Popular Tags