KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > exolab > jms > server > SentMessageCache


1 /**
2  * Redistribution and use of this software and associated documentation
3  * ("Software"), with or without modification, are permitted provided
4  * that the following conditions are met:
5  *
6  * 1. Redistributions of source code must retain copyright
7  * statements and notices. Redistributions must also contain a
8  * copy of this document.
9  *
10  * 2. Redistributions in binary form must reproduce the
11  * above copyright notice, this list of conditions and the
12  * following disclaimer in the documentation and/or other
13  * materials provided with the distribution.
14  *
15  * 3. The name "Exolab" must not be used to endorse or promote
16  * products derived from this Software without prior written
17  * permission of Exoffice Technologies. For written permission,
18  * please contact info@exolab.org.
19  *
20  * 4. Products derived from this Software may not be called "Exolab"
21  * nor may "Exolab" appear in their names without prior written
22  * permission of Exoffice Technologies. Exolab is a registered
23  * trademark of Exoffice Technologies.
24  *
25  * 5. Due credit should be given to the Exolab Project
26  * (http://www.exolab.org/).
27  *
28  * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29  * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30  * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31  * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32  * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39  * OF THE POSSIBILITY OF SUCH DAMAGE.
40  *
41  * Copyright 2002-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42  *
43  * $Id: SentMessageCache.java,v 1.2 2005/03/18 04:07:02 tanderson Exp $
44  */

45 package org.exolab.jms.server;
46
47 import java.sql.Connection JavaDoc;
48 import java.util.Collections JavaDoc;
49 import java.util.Iterator JavaDoc;
50 import java.util.LinkedList JavaDoc;
51 import java.util.List JavaDoc;
52
53 import javax.jms.Session JavaDoc;
54 import javax.jms.JMSException JavaDoc;
55
56 import org.apache.commons.logging.Log;
57 import org.apache.commons.logging.LogFactory;
58
59 import org.exolab.jms.messagemgr.MessageHandle;
60 import org.exolab.jms.messagemgr.DestinationManager;
61 import org.exolab.jms.messagemgr.DestinationCache;
62 import org.exolab.jms.persistence.DatabaseService;
63 import org.exolab.jms.persistence.PersistenceException;
64 import org.exolab.jms.persistence.SQLHelper;
65
66
67 /**
68  * Helper class to cache all sent messages and unacked messages for a
69  * session. It also does some other processing like marking the message
70  * as sent to minimize the number of transactions.
71  * <p>
72  * Messages will only be added to the cache, if the session is transacted
73  * or the ack mode for the session is set to CLIENT_ACKNOWLEDGE
74  *
75  * @version $Revision: 1.2 $ $Date: 2005/03/18 04:07:02 $
76  * @author <a HREF="mailto:jima@exoffice.com">Jim Alateras</a>
77  * @author <a HREF="mailto:tma@netspace.net.au">Tim Anderson</a>
78  * @see JmsServerSession
79  */

80 class SentMessageCache {
81
82     /**
83      * The session which owns this instance
84      */

85     private JmsServerSession _session;
86
87     /**
88      * Holds a list of unacked messages in the order they were sent
89      */

90     private List JavaDoc _unackedMessages = Collections.synchronizedList(
91         new LinkedList JavaDoc());
92
93     /**
94      * The logger
95      */

96     private static final Log _log = LogFactory.getLog(SentMessageCache.class);
97
98
99     /**
100      * Construct a new <code>SentMessageCache</code>
101      *
102      * @param session the session which manages this
103      */

104     public SentMessageCache(JmsServerSession session) {
105         _session = session;
106     }
107
108     /**
109      * Mark the message as delivered and do one of the following.
110      * <p>
111      * If the session is transacted or the session ack mode is set to
112      * CLIENT_ACKNOWLEDGE then add this message to the list of unacked
113      * messages.
114      * <p>
115      * If the session ack mode is anything else then destroy the handle.
116      * <p>
117      * If the handle is a reference to a persistent message then conduct
118      * this work in the context of a transaction.
119      *
120      * @param handle the message handle that should be acked
121      */

122     public void process(MessageHandle handle) throws JMSException JavaDoc {
123         if (handle.isPersistent()) {
124             Connection JavaDoc connection = null;
125             try {
126                 connection = DatabaseService.getConnection();
127
128                 handle.setDelivered(true);
129                 if (_session.isTransacted() ||
130                     _session.getAckMode() == Session.CLIENT_ACKNOWLEDGE) {
131                     _unackedMessages.add(handle);
132                     handle.update(connection);
133                 } else {
134                     // in all other ack modes, simply destroy the handle
135
handle.destroy(connection);
136                 }
137                 connection.commit();
138             } catch (PersistenceException exception) {
139                 SQLHelper.rollback(connection);
140                 _log.error("Error in SentMessageCache.process", exception);
141             } catch (Exception JavaDoc exception) {
142                 _log.error("Error in SentMessageCache.process", exception);
143             } finally {
144                 SQLHelper.close(connection);
145             }
146         } else {
147             handle.setDelivered(true);
148             if (_session.isTransacted() ||
149                 _session.getAckMode() == Session.CLIENT_ACKNOWLEDGE) {
150                 _unackedMessages.add(handle);
151             } else {
152                 handle.destroy();
153             }
154         }
155     }
156
157     /**
158      * Acknowledge the specified messages in the cache and all previously
159      * sent messages.
160      *
161      * @param messageId the id of the message to ack
162      * @param consumerId the consumer id that sent the ack.
163      */

164     public void acknowledgeMessage(String JavaDoc messageId, long consumerId) {
165         // first check that the message exists in the list of unacked
166
// messages
167
boolean exists = false;
168         Iterator JavaDoc iterator = _unackedMessages.iterator();
169         while (iterator.hasNext()) {
170             MessageHandle handle = (MessageHandle) iterator.next();
171             if (handle.getConsumerId() == consumerId
172                     && handle.getMessageId().equals(messageId)) {
173                 exists = true;
174                 break;
175             }
176         }
177
178         if (exists) {
179             boolean intransaction = false;
180             Connection JavaDoc connection = null;
181
182             try {
183                 // start from the top of the cache and remove each
184
// message and then call destroy on it.
185
// We should do this in one transaction.
186
while (!_unackedMessages.isEmpty()) {
187                     MessageHandle handle =
188                         (MessageHandle) _unackedMessages.remove(0);
189                     if (handle.isPersistent()) {
190                         if (!intransaction) {
191                             connection = DatabaseService.getConnection();
192
193                             // begin the transaction
194
intransaction = true;
195                         }
196                         handle.destroy(connection);
197                     } else {
198                         handle.destroy();
199                     }
200
201                     // if the handle is equal to the source handle then
202
// break the loop
203
if (handle.getConsumerId() == consumerId
204                             && handle.getMessageId().equals(messageId)) {
205                         if (intransaction) {
206                             connection.commit();
207                             intransaction = false;
208                         }
209                         break;
210                     }
211                 }
212             } catch (PersistenceException exception) {
213                 SQLHelper.rollback(connection);
214                 _log.error("Error in SentMessageCache.acknowledgeMessage",
215                            exception);
216             } catch (Exception JavaDoc exception) {
217                 _log.error("Error in SentMessageCache.acknowledgeMessage",
218                            exception);
219             } finally {
220                 SQLHelper.close(connection);
221             }
222         }
223     }
224
225     /**
226      * Acknowledge all the messages in the cache
227      */

228     public void acknowledgeAllMessages() {
229         boolean intransaction = false;
230         Connection JavaDoc connection = null;
231
232         try {
233             // start from the top of the cache and remove each
234
// message and then call destroy on it.
235
// We should do this in one transaction.
236
while (!_unackedMessages.isEmpty()) {
237                 MessageHandle handle =
238                     (MessageHandle) _unackedMessages.remove(0);
239                 if (handle.isPersistent()) {
240                     if (!intransaction) {
241                         connection = DatabaseService.getConnection();
242                         intransaction = true;
243                     }
244                     handle.destroy(connection);
245                 } else {
246                     handle.destroy();
247                 }
248             }
249
250             if (intransaction) {
251                 connection.commit();
252                 intransaction = false;
253             }
254         } catch (PersistenceException exception) {
255             SQLHelper.rollback(connection);
256             _log.error("Error in SentMessageCache.acknowledgeMessage",
257                        exception);
258         } catch (Exception JavaDoc exception) {
259             _log.error("Error in SentMessageCache.acknowledgeMessage",
260                        exception);
261         } finally {
262             SQLHelper.close(connection);
263         }
264     }
265
266     /**
267      * Clear the cache by removing each entry and doing one of the
268      * following.
269      * <p>
270      * If the entry is for a queue destination then return it to
271      * the appropriate destination cache. If the cache does not
272      * exist, because it may have been garbage collected then we
273      * need to recreated.
274      * <p>
275      * If the entry is for a topic destination and check if the
276      * corresponding endpoint is still active. If it is then return it
277      * to the endpoint, otherwise do nothing.
278      */

279     public void clear() throws JMSException JavaDoc {
280         // copy the array of unacked messages
281
Object JavaDoc[] unacked = _unackedMessages.toArray();
282         _unackedMessages.clear();
283
284         int count = unacked.length;
285         for (int index = 0; index < count; index++) {
286             MessageHandle handle = (MessageHandle) unacked[index];
287             DestinationCache cache =
288                     DestinationManager.instance().getDestinationCache(handle.getDestination());
289             cache.returnMessageHandle(handle);
290         }
291     }
292
293     /**
294      * Check whether the specified handle is in the list of unacked
295      * messages
296      *
297      * @param handle the handle to check
298      * @return <code>true</code> if it is in the list of unacked messages
299      */

300     public boolean handleInCache(MessageHandle handle) {
301         return _unackedMessages.contains(handle);
302     }
303
304     /**
305      * Remove the specified handle from the cache
306      *
307      * @param handle the handle to remove
308      */

309     public void removeHandle(MessageHandle handle) {
310         _unackedMessages.remove(handle);
311     }
312
313 }
314
Popular Tags