KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jboss > resource > adapter > jms > inflow > JmsServerSessionPool


1 /*
2 * JBoss, Home of Professional Open Source
3 * Copyright 2005, JBoss Inc., and individual contributors as indicated
4 * by the @authors tag. See the copyright.txt in the distribution for a
5 * full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */

22 package org.jboss.resource.adapter.jms.inflow;
23
24 import java.util.ArrayList JavaDoc;
25
26 import javax.jms.Connection JavaDoc;
27 import javax.jms.ConnectionConsumer JavaDoc;
28 import javax.jms.JMSException JavaDoc;
29 import javax.jms.Queue JavaDoc;
30 import javax.jms.ServerSession JavaDoc;
31 import javax.jms.ServerSessionPool JavaDoc;
32 import javax.jms.Topic JavaDoc;
33
34 import org.jboss.logging.Logger;
35
36 /**
37  * A generic jms session pool.
38  *
39  * @author <a HREF="adrian@jboss.com">Adrian Brock</a>
40  * @version $Revision: 58488 $
41  */

42 public class JmsServerSessionPool implements ServerSessionPool JavaDoc
43 {
44    /** The logger */
45    private static final Logger log = Logger.getLogger(JmsServerSessionPool.class);
46       
47    /** The activation */
48    JmsActivation activation;
49
50    /** The consumer */
51    ConnectionConsumer JavaDoc consumer;
52
53    /** The server sessions */
54    ArrayList JavaDoc serverSessions = new ArrayList JavaDoc();
55    
56    /** Whether the pool is stopped */
57    boolean stopped = false;
58    
59    /** The number of sessions */
60    int sessionCount = 0;
61    
62    
63    /**
64     * Create a new session pool
65     *
66     * @param activation the jms activation
67     */

68    public JmsServerSessionPool(JmsActivation activation)
69    {
70       this.activation = activation;
71    }
72
73    /**
74     * @return the activation
75     */

76    public JmsActivation getActivation()
77    {
78       return activation;
79    }
80    
81    /**
82     * Start the server session pool
83     *
84     * @throws Exeption for any error
85     */

86    public void start() throws Exception JavaDoc
87    {
88       setupSessions();
89       setupConsumer();
90    }
91
92    /**
93     * Stop the server session pool
94     */

95    public void stop()
96    {
97       teardownConsumer();
98       teardownSessions();
99    }
100    
101    public ServerSession JavaDoc getServerSession() throws JMSException JavaDoc
102    {
103       boolean trace = log.isTraceEnabled();
104       if (trace)
105          log.trace("getServerSession");
106
107       ServerSession JavaDoc result = null;
108       
109       try
110       {
111          synchronized (serverSessions)
112          {
113             while (true)
114             {
115                int sessionsSize = serverSessions.size();
116                
117                if (stopped)
118                   throw new Exception JavaDoc("Cannot get a server session after the pool is stopped");
119                
120                else if (sessionsSize > 0)
121                {
122                   result = (ServerSession JavaDoc) serverSessions.remove(sessionsSize-1);
123                   break;
124                }
125                
126                else
127                {
128                   try
129                   {
130                      serverSessions.wait();
131                   }
132                   catch (InterruptedException JavaDoc ignored)
133                   {
134                   }
135                }
136             }
137          }
138       }
139       catch (Throwable JavaDoc t)
140       {
141          log.error("Unable to get a server session", t);
142          throw new JMSException JavaDoc("Unable to get a server session " + t);
143       }
144       
145       if (trace)
146          log.trace("Returning server session " + result);
147       
148       return result;
149    }
150
151    /**
152     * Return the server session
153     *
154     * @param session the session
155     */

156    protected void returnServerSession(JmsServerSession session)
157    {
158       synchronized (serverSessions)
159       {
160          if (stopped)
161          {
162             session.teardown();
163             --sessionCount;
164          }
165          else
166             serverSessions.add(session);
167          serverSessions.notifyAll();
168       }
169    }
170    
171    /**
172     * Setup the sessions
173     *
174     * @throws Exeption for any error
175     */

176    protected void setupSessions() throws Exception JavaDoc
177    {
178       JmsActivationSpec spec = activation.getActivationSpec();
179
180       // Create the sessions
181
synchronized (serverSessions)
182       {
183          for (int i = 0; i < spec.getMaxSessionInt(); ++i)
184          {
185             JmsServerSession session = new JmsServerSession(this);
186             serverSessions.add(session);
187          }
188          sessionCount = serverSessions.size();
189       }
190       
191       // Start the sessions
192
ArrayList JavaDoc clonedSessions = (ArrayList JavaDoc) serverSessions.clone();
193       for (int i = 0; i < clonedSessions.size(); ++ i)
194       {
195          JmsServerSession session = (JmsServerSession) serverSessions.get(i);
196          session.setup();
197       }
198    }
199
200    /**
201     * Stop the sessions
202     */

203    protected void teardownSessions()
204    {
205       synchronized (serverSessions)
206       {
207          // Disallow any new sessions
208
stopped = true;
209          serverSessions.notifyAll();
210          
211          // Stop inactive sessions
212
for (int i = 0; i < serverSessions.size(); ++i)
213          {
214             JmsServerSession session = (JmsServerSession) serverSessions.get(i);
215             session.teardown();
216          }
217
218          sessionCount -= serverSessions.size();
219          serverSessions.clear();
220
221          // Wait for inuse sessions
222
while (sessionCount > 0)
223          {
224             try
225             {
226                serverSessions.wait();
227             }
228             catch (InterruptedException JavaDoc ignore)
229             {
230             }
231          }
232       }
233    }
234    
235    /**
236     * Setup the connection consumer
237     *
238     * @throws Exeption for any error
239     */

240    protected void setupConsumer() throws Exception JavaDoc
241    {
242       Connection JavaDoc connection = activation.getConnection();
243       JmsActivationSpec spec = activation.getActivationSpec();
244       String JavaDoc selector = spec.getMessageSelector();
245       int maxMessages = spec.getMaxMessagesInt();
246       if (spec.isTopic())
247       {
248          Topic JavaDoc topic = (Topic JavaDoc) activation.getDestination();
249          String JavaDoc subscriptionName = spec.getSubscriptionName();
250          if (spec.isDurable())
251             consumer = connection.createDurableConnectionConsumer(topic, subscriptionName, selector, this, maxMessages);
252          else
253             consumer = connection.createConnectionConsumer(topic, selector, this, maxMessages);
254       }
255       else
256       {
257          Queue JavaDoc queue = (Queue JavaDoc) activation.getDestination();
258          consumer = connection.createConnectionConsumer(queue, selector, this, maxMessages);
259       }
260       log.debug("Created consumer " + consumer);
261    }
262
263    /**
264     * Stop the connection consumer
265     */

266    protected void teardownConsumer()
267    {
268       try
269       {
270          if (consumer != null)
271          {
272             log.debug("Closing the " + consumer);
273             consumer.close();
274          }
275       }
276       catch (Throwable JavaDoc t)
277       {
278          log.debug("Error closing the consumer " + consumer, t);
279       }
280    }
281
282 }
Popular Tags