KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > client > jms > Session


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - 2000 Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): Frederic Maistre (INRIA)
22  * Contributor(s): ScalAgent Distributed Technologies
23  */

24 package org.objectweb.joram.client.jms;
25
26 import java.util.*;
27
28 import javax.jms.JMSException JavaDoc;
29 import javax.jms.TransactionRolledBackException JavaDoc;
30 import javax.jms.IllegalStateException JavaDoc;
31 import javax.jms.MessageFormatException JavaDoc;
32
33 import org.objectweb.joram.client.jms.connection.RequestMultiplexer;
34 import org.objectweb.joram.client.jms.connection.Requestor;
35
36 import org.objectweb.joram.shared.client.*;
37
38 import org.objectweb.util.monolog.api.BasicLevel;
39 import org.objectweb.util.monolog.api.Logger;
40 import fr.dyade.aaa.util.Debug;
41
42 /**
43  * Implements the <code>javax.jms.Session</code> interface.
44  */

45 public class Session implements javax.jms.Session JavaDoc {
46
47   public static Logger logger = Debug.getLogger(Session.class.getName());
48   
49   
50   public static final String JavaDoc RECEIVE_ACK =
51       "org.objectweb.joram.client.jms.receiveAck";
52
53   public static boolean receiveAck = Boolean.getBoolean(RECEIVE_ACK);
54
55   /**
56    * Status of the session
57    */

58   private static class Status {
59     /**
60      * Status of the session when the connection is stopped.
61      * This is the initial status.
62      */

63     public static final int STOP = 0;
64
65     /**
66      * Status of the session when the connection is started.
67      */

68     public static final int START = 1;
69
70     /**
71      * Status of the connection when it is closed.
72      */

73     public static final int CLOSE = 2;
74
75     private static final String JavaDoc[] names = {
76       "STOP", "START", "CLOSE"};
77
78     public static String JavaDoc toString(int status) {
79       return names[status];
80     }
81   }
82
83   /**
84    * The way the session is used.
85    */

86   private static class SessionMode {
87     /**
88      * The session is still not used.
89      * This is the initial mode.
90      */

91     public static final int NONE = 0;
92
93     /**
94      * The session is used to synchronously receive messages.
95      */

96     public static final int RECEIVE = 1;
97
98     /**
99      * The session is used to asynchronously listen to messages.
100      */

101     public static final int LISTENER = 2;
102
103     /**
104      * The session is used by an application server.
105      */

106     public static final int APP_SERVER = 3;
107
108     private static final String JavaDoc[] names = {
109       "NONE", "RECEIVE", "LISTENER", "APP_SERVER"};
110
111     public static String JavaDoc toString(int status) {
112       return names[status];
113     }
114   }
115
116   /**
117    * The status of the current request.
118    * Only valid in the mode RECEIVE.
119    */

120   private static class RequestStatus {
121     /** No request. This is the initial status. */
122     public static final int NONE = 0;
123     /** A request is running (pending). */
124     public static final int RUN = 1;
125     /** The request is done. */
126     public static final int DONE = 2;
127
128     private static final String JavaDoc[] names = {
129       "NONE", "RUN", "DONE"};
130
131     public static String JavaDoc toString(int status) {
132       return names[status];
133     }
134   }
135
136   /** Task for closing the session if it becomes pending. */
137   private SessionCloseTask closingTask;
138
139   /** <code>true</code> if the session's transaction is scheduled. */
140   private boolean scheduled;
141
142   /** The message listener of the session, if any. */
143   protected javax.jms.MessageListener JavaDoc messageListener;
144
145   /** The identifier of the session. */
146   private String JavaDoc ident;
147
148   /** The connection the session belongs to. */
149   private Connection cnx;
150
151   /** <code>true</code> if the session is transacted. */
152   boolean transacted;
153
154   /** The acknowledgement mode of the session. */
155   private int acknowledgeMode;
156
157   /** <code>true</code> if the session's acknowledgements are automatic. */
158   private boolean autoAck;
159
160   /** Vector of message consumers. */
161   private Vector consumers;
162
163   /** Vector of message producers. */
164   private Vector producers;
165
166   /** Vector of queue browsers. */
167   private Vector browsers;
168
169   /** FIFO queue holding the asynchronous server deliveries. */
170   private fr.dyade.aaa.util.Queue repliesIn;
171
172   /** Daemon distributing asynchronous server deliveries. */
173   private SessionDaemon daemon;
174
175   /** Counter of message listeners. */
176   private int listenerCount;
177
178   /**
179    * Table holding the <code>ProducerMessages</code> holding producers'
180    * messages and destinated to be sent at commit.
181    * <p>
182    * <b>Key:</b> destination name<br>
183    * <b>Object:</b> <code>ProducerMessages</code>
184    */

185   Hashtable sendings;
186
187   /**
188    * Table holding the identifiers of the messages delivered per
189    * destination or subscription, and not acknowledged.
190    * <p>
191    * <b>Key:</b> destination or subscription name<br>
192    * <b>Object:</b> <code>MessageAcks</code> instance
193    */

194   Hashtable deliveries;
195
196   /**
197    * The request multiplexer used to communicate with the user proxy.
198    */

199   private RequestMultiplexer mtpx;
200
201   /**
202    * The requestor used by the session to communicate with the user proxy.
203    */

204   private Requestor requestor;
205
206   /**
207    * The requestor used by the session to make 'receive' with the user
208    * proxy. This second requestor is necessary because it must be closed
209    * during the session close (see method close).
210    */

211   private Requestor receiveRequestor;
212
213   /**
214    * Indicates that the session has been recovered by a message listener.
215    * Doesn't need to be volatile because it is only used by the SessionDaemon
216    * thread.
217    */

218   private boolean recover;
219
220   /**
221    * Status of the session: STOP, START, CLOSE
222    */

223   private int status;
224
225   /**
226    * Mode of the session: NONE, RECEIVE, LISTENER, APP_SERVER
227    */

228   private int sessionMode;
229
230   /**
231    * Status of the request: NONE, RUN, DONE.
232    */

233   private int requestStatus;
234
235   /**
236    * The message consumer currently making a request (null if none).
237    */

238   private MessageConsumer pendingMessageConsumer;
239
240   /**
241    * The current active control thread.
242    */

243   private Thread JavaDoc singleThreadOfControl;
244
245   /**
246    * Status boolean indicating whether the message input is activated or not
247    * for the message listeners.
248    */

249   private boolean passiveMsgInput;
250   
251   /**
252    * Used to synchronize the method close()
253    */

254   private Closer closer;
255   
256   /**
257    * Indicates whether the messages produced are asynchronously
258    * sent or not (without or with acknowledgement)
259    */

260   private boolean asyncSend;
261
262   /**
263    * Maximum number of messages that can be read at once from a queue.
264    */

265   private int queueMessageReadMax;
266   
267   /**
268    * Maximum number of acknowledgements that can be buffered in
269    * Session.DUPS_OK_ACKNOWLEDGE mode, default is 0.
270    */

271   private int topicAckBufferMax;
272   
273   /**
274    * This threshold is the maximum messages number over which the
275    * subscription is passivated.
276    */

277   private int topicPassivationThreshold;
278   
279   /**
280    * This threshold is the minimum messages number below which
281    * the subscription is activated.
282    */

283   private int topicActivationThreshold;
284   
285   private MessageConsumerListener messageConsumerListener;
286   
287   /**
288    * Opens a session.
289    *
290    * @param cnx The connection the session belongs to.
291    * @param transacted <code>true</code> for a transacted session.
292    * @param acknowledgeMode 1 (auto), 2 (client) or 3 (dups ok).
293    *
294    * @exception JMSException In case of an invalid acknowledge mode.
295    */

296   Session(Connection cnx,
297           boolean transacted,
298           int acknowledgeMode,
299           RequestMultiplexer mtpx)
300     throws JMSException JavaDoc {
301     if (! transacted
302         && acknowledgeMode != javax.jms.Session.AUTO_ACKNOWLEDGE
303         && acknowledgeMode != javax.jms.Session.CLIENT_ACKNOWLEDGE
304         && acknowledgeMode != javax.jms.Session.DUPS_OK_ACKNOWLEDGE
305         && !(cnx instanceof XAQueueConnection)
306         && !(cnx instanceof XATopicConnection)
307         && !(cnx instanceof XAConnection))
308       throw new JMSException JavaDoc("Can't create a non transacted session with an"
309                              + " invalid acknowledge mode.");
310
311     this.ident = cnx.nextSessionId();
312     this.cnx = cnx;
313     this.transacted = transacted;
314     this.acknowledgeMode = acknowledgeMode;
315     this.mtpx = mtpx;
316     requestor = new Requestor(mtpx);
317     receiveRequestor = new Requestor(mtpx);
318
319     autoAck = ! transacted
320       && acknowledgeMode != javax.jms.Session.CLIENT_ACKNOWLEDGE;
321
322     consumers = new Vector();
323     producers = new Vector();
324     browsers = new Vector();
325     repliesIn = new fr.dyade.aaa.util.Queue();
326     sendings = new Hashtable();
327     deliveries = new Hashtable();
328     
329     closer = new Closer();
330
331     // If the session is transacted and the transactions limited by a timer,
332
// a closing task might be useful.
333
if (transacted && cnx.getTxPendingTimer() > 0) {
334       closingTask = new SessionCloseTask(
335         cnx.getTxPendingTimer() * 1000);
336     }
337     
338     asyncSend = cnx.getAsyncSend();
339     queueMessageReadMax = cnx.getQueueMessageReadMax();
340     topicAckBufferMax = cnx.getTopicAckBufferMax();
341     topicActivationThreshold = cnx.getTopicActivationThreshold();
342     topicPassivationThreshold = cnx.getTopicPassivationThreshold();
343
344     setStatus(Status.STOP);
345     setSessionMode(SessionMode.NONE);
346     setRequestStatus(RequestStatus.NONE);
347   }
348
349   /**
350    * Sets the status of the session.
351    */

352   private void setStatus(int status) {
353     if (logger.isLoggable(BasicLevel.DEBUG))
354       logger.log(
355         BasicLevel.DEBUG,
356         "Session.setStatus(" +
357         Status.toString(status) + ')');
358     this.status = status;
359   }
360
361   boolean isStarted() {
362     return (status == Status.START);
363   }
364
365   /**
366    * Sets the session mode.
367    */

368   private void setSessionMode(int sessionMode) {
369     if (logger.isLoggable(BasicLevel.DEBUG))
370       logger.log(
371         BasicLevel.DEBUG,
372         "Session.setSessionMode(" +
373         SessionMode.toString(sessionMode) + ')');
374     this.sessionMode = sessionMode;
375   }
376
377   /**
378    * Sets the request status.
379    */

380   private void setRequestStatus(int requestStatus) {
381     if (logger.isLoggable(BasicLevel.DEBUG))
382       logger.log(
383         BasicLevel.DEBUG,
384         "Session.setRequestStatus(" +
385         RequestStatus.toString(requestStatus) + ')');
386     this.requestStatus = requestStatus;
387   }
388   
389   /**
390    * Checks if the session is closed.
391    * If true, an IllegalStateException is raised.
392    */

393   protected synchronized void checkClosed()
394     throws IllegalStateException JavaDoc {
395     if (status == Status.CLOSE)
396       throw new IllegalStateException JavaDoc(
397         "Forbidden call on a closed session.");
398   }
399
400   /**
401    * Checks if the calling thread is the thread of control. If not,
402    * an IllegalStateException is raised.
403    */

404   private synchronized void checkThreadOfControl()
405     throws IllegalStateException JavaDoc {
406     if (singleThreadOfControl != null &&
407         Thread.currentThread() != singleThreadOfControl)
408       throw new IllegalStateException JavaDoc("Illegal control thread");
409   }
410
411   /**
412    * Checks the session mode. If it is not the expected session mode,
413    * raises an IllegalStateException.
414    *
415    * @param expectedSessionMode the expected session mode.
416    */

417   private void checkSessionMode(
418     int expectedSessionMode)
419     throws IllegalStateException JavaDoc {
420     if (sessionMode == SessionMode.NONE) {
421       setSessionMode(sessionMode);
422     } else if (sessionMode != expectedSessionMode) {
423       throw new IllegalStateException JavaDoc("Bad session mode");
424     }
425   }
426
427   /** Returns a String image of this session. */
428   public String JavaDoc toString() {
429     return "Sess:" + ident;
430   }
431
432   /**
433    * API method.
434    *
435    * @exception JMSException Actually never thrown.
436    */

437   public final int getAcknowledgeMode() throws JMSException JavaDoc {
438     checkClosed();
439     return getAckMode();
440   }
441   
442   int getAckMode() {
443     if (transacted)
444       return Session.SESSION_TRANSACTED;
445     return acknowledgeMode;
446   }
447
448   /**
449    * API method.
450    *
451    * @exception IllegalStateException If the session is closed.
452    */

453   public synchronized final boolean getTransacted()
454     throws JMSException JavaDoc {
455     checkClosed();
456     return transacted;
457   }
458
459   /**
460    * set transacted.
461    * see connector ManagedConnectionImpl (Connector).
462    */

463   public void setTransacted(boolean t) {
464     if (status != Status.CLOSE) {
465       transacted = t;
466     }
467     // else should throw an exception but not expected in
468
// the connector.
469
}
470
471   /**
472    * API method.
473    *
474    * @exception JMSException Actually never thrown.
475    */

476   public synchronized void setMessageListener(
477     javax.jms.MessageListener JavaDoc messageListener)
478     throws JMSException JavaDoc {
479     checkSessionMode(SessionMode.APP_SERVER);
480     this.messageListener = messageListener;
481   }
482
483   /**
484    * API method.
485    *
486    * @exception JMSException Actually never thrown.
487    */

488   public synchronized javax.jms.MessageListener JavaDoc
489       getMessageListener()
490     throws JMSException JavaDoc {
491     return messageListener;
492   }
493
494   /**
495    * Creates a Message object.
496    * API method.
497    *
498    * @exception IllegalStateException If the session is closed.
499    */

500   public synchronized javax.jms.Message JavaDoc createMessage()
501     throws JMSException JavaDoc {
502     checkClosed();
503     return new Message();
504   }
505
506   /**
507    * Creates a <code>TextMessage</code> object.
508    * API method.
509    *
510    * @exception IllegalStateException If the session is closed.
511    */

512   public synchronized javax.jms.TextMessage JavaDoc createTextMessage()
513     throws JMSException JavaDoc {
514     checkClosed();
515     return new TextMessage();
516   }
517
518   /**
519    * Creates a <code>TextMessage</code> object with the specified text.
520    * API method.
521    *
522    * @exception IllegalStateException If the session is closed.
523    */

524   public synchronized javax.jms.TextMessage JavaDoc createTextMessage(String JavaDoc text)
525     throws JMSException JavaDoc {
526     checkClosed();
527     TextMessage message = new TextMessage();
528     message.setText(text);
529     return message;
530   }
531   
532   /**
533    * Creates a <code>BytesMessage</code> object.
534    * API method.
535    *
536    * @exception IllegalStateException If the session is closed.
537    */

538   public synchronized javax.jms.BytesMessage JavaDoc createBytesMessage()
539     throws JMSException JavaDoc {
540     checkClosed();
541     return new BytesMessage();
542   }
543
544   /**
545    * Creates a <code>MapMessage</code> object.
546    * API method.
547    *
548    * @exception IllegalStateException If the session is closed.
549    */

550   public synchronized javax.jms.MapMessage JavaDoc createMapMessage()
551     throws JMSException JavaDoc {
552     checkClosed();
553     return new MapMessage();
554   }
555
556   /**
557    * Creates a <code>ObjectMessage</code> object.
558    * API method.
559    *
560    * @exception IllegalStateException If the session is closed.
561    */

562   public synchronized javax.jms.ObjectMessage JavaDoc createObjectMessage()
563     throws JMSException JavaDoc {
564     checkClosed();
565     return new ObjectMessage();
566   }
567
568   /**
569    * Creates a <code>ObjectMessage</code> object.
570    * API method.
571    *
572    * @exception IllegalStateException If the session is closed.
573    */

574   public synchronized javax.jms.ObjectMessage JavaDoc createObjectMessage(
575     java.io.Serializable JavaDoc obj)
576     throws JMSException JavaDoc {
577     checkClosed();
578     ObjectMessage message = new ObjectMessage();
579     message.setObject(obj);
580     return message;
581   }
582
583   /**
584    * Creates a <code>StreamMessage</code> object.
585    * API method.
586    *
587    * @exception IllegalStateException If the session is closed.
588    */

589   public synchronized javax.jms.StreamMessage JavaDoc createStreamMessage()
590     throws JMSException JavaDoc {
591     checkClosed();
592     return new StreamMessage();
593   }
594
595   /**
596    * API method
597    *
598    * @exception IllegalStateException If the session is closed.
599    */

600   public synchronized javax.jms.QueueBrowser JavaDoc
601       createBrowser(javax.jms.Queue JavaDoc queue,
602                     String JavaDoc selector)
603     throws JMSException JavaDoc {
604     checkClosed();
605     checkThreadOfControl();
606     QueueBrowser qb = new QueueBrowser(this, (Queue) queue, selector);
607     browsers.addElement(qb);
608     return qb;
609   }
610
611   /**
612    * API method
613    *
614    * @exception IllegalStateException If the session is closed.
615    */

616   public synchronized javax.jms.QueueBrowser JavaDoc
617       createBrowser(javax.jms.Queue JavaDoc queue)
618     throws JMSException JavaDoc {
619     checkClosed();
620     checkThreadOfControl();
621     QueueBrowser qb = new QueueBrowser(this, (Queue) queue, null);
622     browsers.addElement(qb);
623     return qb;
624   }
625
626   /**
627    * Creates a MessageProducer to send messages to the specified destination.
628    * API method.
629    *
630    * @exception IllegalStateException If the session is closed or if the
631    * connection is broken.
632    * @exception JMSException If the creation fails for any other reason.
633    */

634   public synchronized javax.jms.MessageProducer JavaDoc createProducer(
635     javax.jms.Destination JavaDoc dest)
636     throws JMSException JavaDoc {
637     checkClosed();
638     checkThreadOfControl();
639     MessageProducer mp = new MessageProducer(
640       this,
641       (Destination) dest);
642     addProducer(mp);
643     return mp;
644   }
645
646   /**
647    * Creates a MessageConsumer for the specified destination using a
648    * message selector.
649    * API method.
650    *
651    * @exception IllegalStateException If the session is closed or if the
652    * connection is broken.
653    * @exception JMSException If the creation fails for any other reason.
654    */

655   public synchronized javax.jms.MessageConsumer JavaDoc
656       createConsumer(javax.jms.Destination JavaDoc dest,
657                      String JavaDoc selector,
658                      boolean noLocal)
659     throws JMSException JavaDoc {
660     checkClosed();
661     checkThreadOfControl();
662     MessageConsumer mc = new MessageConsumer(
663       this, (Destination) dest,
664       selector, null,
665       noLocal);
666     addConsumer(mc);
667     return mc;
668   }
669
670   /**
671    * Creates a MessageConsumer for the specified destination using a
672    * message selector.
673    * API method.
674    *
675    * @exception IllegalStateException If the session is closed or if the
676    * connection is broken.
677    * @exception JMSException If the creation fails for any other reason.
678    */

679   public synchronized javax.jms.MessageConsumer JavaDoc
680       createConsumer(javax.jms.Destination JavaDoc dest,
681                      String JavaDoc selector)
682     throws JMSException JavaDoc {
683     checkClosed();
684     checkThreadOfControl();
685     MessageConsumer mc = new MessageConsumer(
686       this, (Destination) dest, selector);
687     addConsumer(mc);
688     return mc;
689   }
690
691   /**
692    * Creates a MessageConsumer for the specified destination.
693    * API method.
694    *
695    * @exception IllegalStateException If the session is closed or if the
696    * connection is broken.
697    * @exception JMSException If the creation fails for any other reason.
698    */

699   public synchronized javax.jms.MessageConsumer JavaDoc
700       createConsumer(javax.jms.Destination JavaDoc dest)
701     throws JMSException JavaDoc {
702     checkClosed();
703     checkThreadOfControl();
704     MessageConsumer mc = new MessageConsumer(
705       this, (Destination) dest, null);
706     addConsumer(mc);
707     return mc;
708   }
709
710   /**
711    * API method.
712    *
713    * @exception IllegalStateException If the session is closed or if the
714    * connection is broken.
715    * @exception JMSException If the creation fails for any other reason.
716    */

717   public synchronized javax.jms.TopicSubscriber JavaDoc
718       createDurableSubscriber(javax.jms.Topic JavaDoc topic,
719                               String JavaDoc name,
720                               String JavaDoc selector,
721                               boolean noLocal)
722     throws JMSException JavaDoc {
723     if (logger.isLoggable(BasicLevel.DEBUG))
724       logger.log(
725         BasicLevel.DEBUG,
726         "Session.createDurableSubscriber(" +
727         topic + ',' + name + ',' +
728         selector + ',' + noLocal + ')');
729     checkClosed();
730     checkThreadOfControl();
731     TopicSubscriber ts = new TopicSubscriber(
732       this, (Topic) topic, name, selector, noLocal);
733     addConsumer(ts);
734     return ts;
735   }
736
737   /**
738    * API method.
739    *
740    * @exception IllegalStateException If the session is closed or if the
741    * connection is broken.
742    * @exception JMSException If the creation fails for any other reason.
743    */

744   public synchronized javax.jms.TopicSubscriber JavaDoc
745       createDurableSubscriber(javax.jms.Topic JavaDoc topic,
746                               String JavaDoc name)
747     throws JMSException JavaDoc {
748     if (logger.isLoggable(BasicLevel.DEBUG))
749       logger.log(
750         BasicLevel.DEBUG,
751         "Session.createDurableSubscriber(" +
752         topic + ',' + name + ')');
753     checkClosed();
754     checkThreadOfControl();
755     TopicSubscriber ts = new TopicSubscriber(
756       this, (Topic) topic, name, null, false);
757     addConsumer(ts);
758     return ts;
759   }
760
761   /**
762    * API method.
763    *
764    * @exception IllegalStateException If the session is closed.
765    */

766   public synchronized javax.jms.Queue JavaDoc createQueue(
767     String JavaDoc queueName)
768     throws JMSException JavaDoc {
769     checkClosed();
770     return new Queue(queueName);
771   }
772
773   /**
774    * API method.
775    *
776    * @exception IllegalStateException If the session is closed.
777    * @exception JMSException If the topic creation failed.
778    */

779   public synchronized javax.jms.Topic JavaDoc createTopic(
780     String JavaDoc topicName)
781     throws JMSException JavaDoc {
782     checkClosed();
783     checkThreadOfControl();
784
785     // Checks if the topic to retrieve is the administration topic:
786
if (topicName.equals("#AdminTopic")) {
787       try {
788         GetAdminTopicReply reply =
789           (GetAdminTopicReply) requestor.request(new GetAdminTopicRequest());
790         if (reply.getId() != null)
791           return new Topic(reply.getId());
792         else
793           throw new JMSException JavaDoc("AdminTopic could not be retrieved.");
794       }
795       catch (JMSException JavaDoc exc) {
796         throw exc;
797       }
798       catch (Exception JavaDoc exc) {
799         throw new JMSException JavaDoc("AdminTopic could not be retrieved: " + exc);
800       }
801     }
802     return new Topic(topicName);
803   }
804
805   /**
806    * API method.
807    *
808    * @exception IllegalStateException If the session is closed or if the
809    * connection is broken.
810    * @exception JMSException If the request fails for any other reason.
811    */

812   public synchronized javax.jms.TemporaryQueue JavaDoc createTemporaryQueue()
813     throws JMSException JavaDoc {
814     checkClosed();
815     checkThreadOfControl();
816
817     SessCreateTDReply reply =
818       (SessCreateTDReply) requestor.request(new SessCreateTQRequest());
819     String JavaDoc tempDest = reply.getAgentId();
820     return new TemporaryQueue(tempDest, cnx);
821   }
822
823   /**
824    * API method.
825    *
826    * @exception IllegalStateException If the session is closed or if the
827    * connection is broken.
828    * @exception JMSException If the request fails for any other reason.
829    */

830   public synchronized javax.jms.TemporaryTopic JavaDoc createTemporaryTopic()
831     throws JMSException JavaDoc {
832     checkClosed();
833     checkThreadOfControl();
834
835     SessCreateTDReply reply =
836       (SessCreateTDReply) requestor.request(new SessCreateTTRequest());
837     String JavaDoc tempDest = reply.getAgentId();
838     return new TemporaryTopic(tempDest, cnx);
839   }
840
841   /** API method. */
842   public synchronized void run() {
843     int load = repliesIn.size();
844
845     if (logger.isLoggable(BasicLevel.DEBUG))
846       logger.log(BasicLevel.DEBUG,
847                  "-- " + this + ": loaded with " + load +
848                  " message(s) and started.");
849
850     try {
851       // Processing the current number of messages in the queue:
852
for (int i = 0; i < load; i++) {
853         org.objectweb.joram.shared.messages.Message momMsg =
854           (org.objectweb.joram.shared.messages.Message) repliesIn.pop();
855         String JavaDoc msgId = momMsg.id;
856         
857         onMessage(momMsg, messageConsumerListener);
858       }
859     } catch (Exception JavaDoc exc) {
860       if (logger.isLoggable(BasicLevel.ERROR))
861         logger.log(BasicLevel.ERROR, "", exc);
862     }
863   }
864   
865   /**
866    * Called by MultiSessionConsumer
867    * ASF mode
868    */

869   void setMessageConsumerListener(MessageConsumerListener mcl) {
870     messageConsumerListener = mcl;
871   }
872       
873   /**
874    * API method.
875    *
876    * @exception IllegalStateException If the session is closed, or not
877    * transacted, or if the connection is broken.
878    */

879   public synchronized void commit() throws JMSException JavaDoc {
880     if (logger.isLoggable(BasicLevel.DEBUG))
881       logger.log(
882         BasicLevel.DEBUG,
883         "Session.commit()");
884
885     checkClosed();
886     checkThreadOfControl();
887
888     if (! transacted)
889       throw new IllegalStateException JavaDoc("Can't commit a non transacted"
890                                       + " session.");
891
892     if (logger.isLoggable(BasicLevel.DEBUG))
893       logger.log(BasicLevel.DEBUG, "--- " + this
894                                  + ": committing...");
895
896     // If the transaction was scheduled: cancelling.
897
if (scheduled) {
898       closingTask.cancel();
899       scheduled = false;
900     }
901
902     // Sending client messages:
903
try {
904       CommitRequest commitReq= new CommitRequest();
905       
906       Enumeration producerMessages = sendings.elements();
907       while (producerMessages.hasMoreElements()) {
908         ProducerMessages pM =
909           (ProducerMessages) producerMessages.nextElement();
910         commitReq.addProducerMessages(pM);
911       }
912       sendings.clear();
913       
914       // Acknowledging the received messages:
915
Enumeration targets = deliveries.keys();
916       while (targets.hasMoreElements()) {
917         String JavaDoc target = (String JavaDoc) targets.nextElement();
918         MessageAcks acks = (MessageAcks) deliveries.get(target);
919         commitReq.addAckRequest(
920           new SessAckRequest(
921             target,
922             acks.getIds(),
923             acks.getQueueMode()));
924       }
925       deliveries.clear();
926       
927       if (asyncSend) {
928         // Asynchronous sending
929
commitReq.setAsyncSend(true);
930         mtpx.sendRequest(commitReq);
931       } else {
932         requestor.request(commitReq);
933       }
934
935       if (logger.isLoggable(BasicLevel.DEBUG))
936         logger.log(BasicLevel.DEBUG, this + ": committed.");
937     }
938     // Catching an exception if the sendings or acknowledgement went wrong:
939
catch (JMSException JavaDoc jE) {
940       if (logger.isLoggable(BasicLevel.ERROR))
941         logger.log(BasicLevel.ERROR, "", jE);
942       TransactionRolledBackException JavaDoc tE =
943         new TransactionRolledBackException JavaDoc("A JMSException was thrown during"
944                                            + " the commit.");
945       tE.setLinkedException(jE);
946
947       if (logger.isLoggable(BasicLevel.ERROR))
948         logger.log(BasicLevel.ERROR, "Exception: " + tE);
949
950       rollback();
951       throw tE;
952     }
953   }
954
955   /**
956    * API method.
957    *
958    * @exception IllegalStateException If the session is closed, or not
959    * transacted.
960    */

961   public synchronized void rollback() throws JMSException JavaDoc {
962     if (logger.isLoggable(BasicLevel.DEBUG))
963       logger.log(
964         BasicLevel.DEBUG,
965         "Session.rollback()");
966
967     checkClosed();
968     checkThreadOfControl();
969
970     if (! transacted)
971       throw new IllegalStateException JavaDoc("Can't rollback a non transacted"
972                                       + " session.");
973
974     if (logger.isLoggable(BasicLevel.DEBUG))
975       logger.log(BasicLevel.DEBUG, "--- " + this
976                                  + ": rolling back...");
977
978     // If the transaction was scheduled: cancelling.
979
if (scheduled) {
980       closingTask.cancel();
981       scheduled = false;
982     }
983
984     // Denying the received messages:
985
deny();
986     // Deleting the produced messages:
987
sendings.clear();
988
989     if (logger.isLoggable(BasicLevel.DEBUG))
990       logger.log(BasicLevel.DEBUG, this + ": rolled back.");
991   }
992
993   /**
994    * API method.
995    *
996    * @exception IllegalStateException If the session is closed, or transacted.
997    */

998   public synchronized void recover() throws JMSException JavaDoc {
999     if (logger.isLoggable(BasicLevel.DEBUG))
1000      logger.log(
1001        BasicLevel.DEBUG,
1002        "Session.recover()");
1003
1004    checkClosed();
1005    checkThreadOfControl();
1006
1007    if (transacted)
1008      throw new IllegalStateException JavaDoc("Can't recover a transacted session.");
1009    
1010    if (logger.isLoggable(BasicLevel.DEBUG))
1011      logger.log(BasicLevel.DEBUG, "--- " + this
1012                                 + " recovering...");
1013
1014    if (daemon != null &&
1015        daemon.isCurrentThread()) {
1016      recover = true;
1017    } else {
1018      doRecover();
1019    }
1020
1021    if (logger.isLoggable(BasicLevel.DEBUG))
1022      logger.log(BasicLevel.DEBUG, this + ": recovered.");
1023  }
1024  
1025  private void doRecover() throws JMSException JavaDoc {
1026    if (logger.isLoggable(BasicLevel.DEBUG))
1027      logger.log(BasicLevel.DEBUG, "Session.doRecover()");
1028    deny();
1029  }
1030
1031  /**
1032   * API method.
1033   *
1034   * @exception IllegalStateException If the session is closed or if the
1035   * connection is broken.
1036   * @exception InvalidDestinationException If the subscription does not
1037   * exist.
1038   * @exception JMSException If the request fails for any other reason.
1039   */

1040  public synchronized void unsubscribe(String JavaDoc name) throws JMSException JavaDoc {
1041    if (logger.isLoggable(BasicLevel.DEBUG))
1042      logger.log(BasicLevel.DEBUG, "Session.unsubscribe(" + name + ')');
1043
1044    if (name == null)
1045      throw new JMSException JavaDoc("Bad subscription name: null");
1046
1047    checkClosed();
1048    checkThreadOfControl();
1049    
1050    MessageConsumer cons;
1051    if (consumers != null) {
1052      for (int i = 0; i < consumers.size(); i++) {
1053        cons = (MessageConsumer) consumers.get(i);
1054        if (! cons.queueMode && cons.targetName.equals(name))
1055          throw new JMSException JavaDoc("Can't delete durable subscription " + name
1056                                 + " as long as an active subscriber exists.");
1057      }
1058    }
1059    syncRequest(new ConsumerUnsubRequest(name));
1060  }
1061
1062  /**
1063   * Closes the session.
1064   * API method.
1065   *
1066   * @exception JMSException
1067   */

1068  public void close() throws JMSException JavaDoc {
1069    if (logger.isLoggable(BasicLevel.DEBUG))
1070      logger.log(
1071        BasicLevel.DEBUG,
1072        "Session.close()");
1073    closer.close();
1074  }
1075
1076  /**
1077   * This class synchronizes the close.
1078   * Close can't be synchronized with 'this' because the Session must be
1079   * accessed concurrently during its closure. So we need a second lock.
1080   */

1081  class Closer {
1082    synchronized void close()
1083      throws JMSException JavaDoc {
1084      doClose();
1085    }
1086  }
1087
1088  void doClose() throws JMSException JavaDoc {
1089    synchronized (this) {
1090      if (status == Status.CLOSE) return;
1091    }
1092    
1093    // Don't synchronize the consumer closure because
1094
// it could deadlock with message listeners or
1095
// client threads still using the session.
1096

1097    Vector consumersToClose = (Vector)consumers.clone();
1098    consumers.clear();
1099    for (int i = 0; i < consumersToClose.size(); i++) {
1100      MessageConsumer mc =
1101        (MessageConsumer)consumersToClose.elementAt(i);
1102      try {
1103        mc.close();
1104      } catch (JMSException JavaDoc exc) {
1105        if (logger.isLoggable(BasicLevel.DEBUG))
1106          logger.log(
1107            BasicLevel.DEBUG, "", exc);
1108      }
1109    }
1110    
1111    Vector browsersToClose = (Vector)browsers.clone();
1112    browsers.clear();
1113    for (int i = 0; i < browsersToClose.size(); i++) {
1114      QueueBrowser qb =
1115        (QueueBrowser)browsersToClose.elementAt(i);
1116      try {
1117        qb.close();
1118      } catch (JMSException JavaDoc exc) {
1119        if (logger.isLoggable(BasicLevel.DEBUG))
1120          logger.log(
1121            BasicLevel.DEBUG, "", exc);
1122      }
1123    }
1124    
1125    Vector producersToClose = (Vector)producers.clone();
1126    producers.clear();
1127    for (int i = 0; i < producersToClose.size(); i++) {
1128      MessageProducer mp =
1129        (MessageProducer)producersToClose.elementAt(i);
1130      try {
1131        mp.close();
1132      } catch (JMSException JavaDoc exc) {
1133        if (logger.isLoggable(BasicLevel.DEBUG))
1134          logger.log(
1135            BasicLevel.DEBUG, "", exc);
1136      }
1137    }
1138    
1139    // This is now in removeMessageListener
1140
// called by MessageConsumer.close()
1141
// (see above)
1142
// try {
1143
// repliesIn.stop();
1144
// } catch (InterruptedException iE) {}
1145

1146    stop();
1147
1148    // The requestor must be closed because
1149
// it could be used by a concurrent receive
1150
// as it is not synchronized (see receive()).
1151
receiveRequestor.close();
1152      
1153    // Denying the non acknowledged messages:
1154
if (transacted) {
1155      rollback();
1156    } else {
1157      deny();
1158    }
1159
1160    cnx.closeSession(this);
1161      
1162    synchronized (this) {
1163      setStatus(Status.CLOSE);
1164    }
1165  }
1166
1167  /**
1168   * Starts the asynchronous deliveries in the session.
1169   * <p>
1170   * This method is called by a started connection.
1171   */

1172  synchronized void start() {
1173    if (logger.isLoggable(BasicLevel.DEBUG))
1174      logger.log(
1175        BasicLevel.DEBUG,
1176        "Session.start()");
1177
1178    if (status == Status.CLOSE) return;
1179    if (status == Status.START) return;
1180    if (listenerCount > 0) {
1181      doStart();
1182    }
1183
1184    setStatus(Status.START);
1185  }
1186
1187  private void doStart() {
1188    if (logger.isLoggable(BasicLevel.DEBUG))
1189      logger.log(
1190        BasicLevel.DEBUG,
1191        "Session.doStart()");
1192    repliesIn.start();
1193    daemon = new SessionDaemon();
1194    daemon.setDaemon(false);
1195    daemon.start();
1196    singleThreadOfControl = daemon.getThread();
1197  }
1198
1199  /**
1200   * Stops the asynchronous deliveries processing in the session.
1201   * <p>
1202   * This method must be carefully used. When the session is stopped, the
1203   * connection might very well going on pushing deliveries in the
1204   * session's queue. If the session is never re-started, these deliveries
1205   * will never be poped out, and this may lead to a situation of consumed
1206   * but never acknowledged messages.
1207   * <p>
1208   * This fatal situation never occurs as the <code>stop()</code> method is
1209   * either called by he <code>Session.close()</code>
1210   * and <code>Connection.stop()</code> methods, which first empties the
1211   * session's deliveries and forbid any further push.
1212   */

1213  synchronized void stop() {
1214    if (logger.isLoggable(BasicLevel.DEBUG))
1215      logger.log(
1216        BasicLevel.DEBUG,
1217        "Session.stop()");
1218    if (status == Status.STOP ||
1219        status == Status.CLOSE) return;
1220
1221    // DF: According to JMS 1.1 java doc
1222
// the method stop "blocks until receives in progress have completed."
1223
// But the JMS 1.1 specification doesn't mention this point.
1224
// So we don't implement it: a stop doesn't block until
1225
// receives have completed.
1226

1227// while (requestStatus != RequestStatus.NONE) {
1228
// try {
1229
// wait();
1230
// } catch (InterruptedException exc) {}
1231
// }
1232

1233    doStop();
1234
1235    setStatus(Status.STOP);
1236  }
1237
1238  private void doStop() {
1239    if (daemon != null) {
1240      daemon.stop();
1241      daemon = null;
1242      singleThreadOfControl = null;
1243    }
1244  }
1245
1246  /**
1247   * Method called by message producers when producing a message for
1248   * preparing the session to later commit it.
1249   *
1250   * @param dest The destination the message is destinated to.
1251   * @param msg The message.
1252   */

1253  private void prepareSend(Destination dest,
1254                           org.objectweb.joram.shared.messages.Message msg) throws JMSException JavaDoc {
1255    if (logger.isLoggable(BasicLevel.DEBUG))
1256      logger.log(BasicLevel.DEBUG,
1257                 "Session.prepareSend(" + dest + ',' + msg + ')');
1258
1259    checkClosed();
1260    checkThreadOfControl();
1261    
1262    // If the transaction was scheduled, cancelling:
1263
if (scheduled) closingTask.cancel();
1264
1265    ProducerMessages pM = (ProducerMessages) sendings.get(dest.getName());
1266    if (pM == null) {
1267      pM = new ProducerMessages(dest.getName());
1268      sendings.put(dest.getName(), pM);
1269    }
1270    pM.addMessage(msg);
1271
1272    // If the transaction was scheduled, re-scheduling it:
1273
if (scheduled) closingTask.start();
1274  }
1275
1276  /**
1277   * Method called by message consumers when receiving a message for
1278   * preparing the session to later acknowledge or deny it.
1279   *
1280   * @param name Name of the destination or of the proxy subscription
1281   * the message comes from.
1282   * @param id Identifier of the consumed message.
1283   * @param queueMode <code>true</code> if the message consumed comes from
1284   * a queue.
1285   */

1286  private void prepareAck(String JavaDoc name,
1287                          String JavaDoc id,
1288                          boolean queueMode) {
1289    if (logger.isLoggable(BasicLevel.DEBUG))
1290      logger.log(
1291        BasicLevel.DEBUG,
1292        "Session.prepareAck(" +
1293        name + ',' + id + ',' + queueMode + ')');
1294
1295    // If the transaction was scheduled, cancelling:
1296
if (scheduled)
1297      closingTask.cancel();
1298
1299    MessageAcks acks = (MessageAcks) deliveries.get(name);
1300    if (acks == null) {
1301      acks = new MessageAcks(queueMode);
1302      deliveries.put(name, acks);
1303    }
1304    acks.addId(id);
1305
1306    if (logger.isLoggable(BasicLevel.DEBUG))
1307      logger.log(
1308        BasicLevel.DEBUG, " -> acks = " + acks);
1309
1310    // If the transaction must be scheduled, scheduling it:
1311
if (closingTask != null) {
1312      scheduled = true;
1313      closingTask.start();
1314    }
1315  }
1316
1317  /**
1318   * Method acknowledging the received messages.
1319   * Called by Message.
1320   */

1321  synchronized void acknowledge() throws JMSException JavaDoc {
1322    checkClosed();
1323    if (transacted ||
1324        acknowledgeMode != javax.jms.Session.CLIENT_ACKNOWLEDGE) {
1325      return;
1326    }
1327    doAcknowledge();
1328  }
1329
1330  /**
1331   * Method acknowledging the received messages.
1332   */

1333  private void doAcknowledge() throws JMSException JavaDoc {
1334    Enumeration targets = deliveries.keys();
1335    while (targets.hasMoreElements()) {
1336      String JavaDoc target = (String JavaDoc) targets.nextElement();
1337      MessageAcks acks = (MessageAcks) deliveries.remove(target);
1338      mtpx.sendRequest(
1339        new SessAckRequest(
1340          target,
1341          acks.getIds(),
1342          acks.getQueueMode()));
1343    }
1344  }
1345
1346  /**
1347   * Method denying the received messages.
1348   *
1349   * Called from:
1350   * - rollback -> synchronized client thread
1351   * - recover -> synchronized client thread
1352   * - close -> synchronized client thread
1353   * - onMessage -> not synchronized session daemon.
1354   * It is the only thread that can run into the session
1355   * (session mode = LISTENER) except for the method close that
1356   * can be called concurrently. But close() first stops the session
1357   * daemon and then calls deny().
1358   *
1359   * The hashtable deliveries is also accessed from:
1360   * - acknowledge -> synchronized client thread
1361   * - commit -> synchronized client thread
1362   * - receive -> synchronized client thread.
1363   * - onMessage -> not synchronized session daemon (see above).
1364   */

1365  private void deny() throws JMSException JavaDoc {
1366    if (logger.isLoggable(BasicLevel.DEBUG))
1367      logger.log(
1368        BasicLevel.DEBUG,
1369        "Session.deny()");
1370    Enumeration targets = deliveries.keys();
1371    while (targets.hasMoreElements()) {
1372      String JavaDoc target = (String JavaDoc) targets.nextElement();
1373      MessageAcks acks = (MessageAcks) deliveries.remove(target);
1374      if (logger.isLoggable(BasicLevel.DEBUG))
1375        logger.log(
1376          BasicLevel.DEBUG,
1377          " -> acks = " + acks + ')');
1378      SessDenyRequest deny = new SessDenyRequest(
1379        target,
1380        acks.getIds(),
1381        acks.getQueueMode());
1382      if (acks.getQueueMode()) {
1383        requestor.request(deny);
1384      } else {
1385        mtpx.sendRequest(deny);
1386      }
1387    }
1388  }
1389
1390  /**
1391   * Called by MessageConsumer
1392   * Not synchronized because ot it can be
1393   * concurrently called by close()
1394   * and Connection.stop().
1395   */

1396  javax.jms.Message JavaDoc receive(
1397    long requestTimeToLive,
1398    long waitTimeOut,
1399    MessageConsumer mc,
1400    String JavaDoc targetName,
1401    String JavaDoc selector,
1402    boolean queueMode)
1403    throws JMSException JavaDoc {
1404    if (logger.isLoggable(BasicLevel.DEBUG))
1405      logger.log(BasicLevel.DEBUG,
1406                 "Session.receive(" + requestTimeToLive + ',' +
1407                 waitTimeOut + ',' + targetName + ',' +
1408                 selector + ',' + queueMode + ')');
1409    preReceive(mc);
1410    try {
1411      ConsumerMessages reply = null;
1412      ConsumerReceiveRequest request =
1413        new ConsumerReceiveRequest(targetName, selector,
1414                                   requestTimeToLive, queueMode);
1415      if (receiveAck) request.setReceiveAck(true);
1416      reply = (ConsumerMessages) receiveRequestor.request(request, waitTimeOut);
1417
1418      if (logger.isLoggable(BasicLevel.DEBUG))
1419        logger.log(
1420          BasicLevel.DEBUG,
1421          " -> reply = " + reply);
1422        
1423      synchronized (this) {
1424        // The session may have been
1425
// closed in between.
1426
if (status == Status.CLOSE) {
1427          if (reply != null) {
1428            mtpx.deny(reply);
1429          }
1430          return null;
1431        }
1432        
1433        if (reply != null) {
1434          Vector msgs = reply.getMessages();
1435          if (msgs != null && ! msgs.isEmpty()) {
1436            Message msg = Message.wrapMomMessage(this, (org.objectweb.joram.shared.messages.Message) msgs.get(0));
1437            String JavaDoc msgId = msg.getJMSMessageID();;
1438            
1439            // Auto ack: acknowledging the message:
1440
if (autoAck && ! receiveAck) {
1441              ConsumerAckRequest req = new ConsumerAckRequest(targetName, queueMode);
1442              req.addId(msgId);
1443              mtpx.sendRequest(req);
1444            } else {
1445              prepareAck(targetName, msgId, queueMode);
1446            }
1447            msg.session = this;
1448            return msg;
1449          } else {
1450            return null;
1451          }
1452        } else {
1453            return null;
1454        }
1455      }
1456    } finally {
1457      postReceive();
1458    }
1459  }
1460
1461  /**
1462   * First stage before calling the proxy and waiting
1463   * for the reply. It is synchronized because it
1464   * locks the session in order to prevent any other
1465   * thread to make another operation.
1466   */

1467  private synchronized void preReceive(
1468    MessageConsumer mc) throws JMSException JavaDoc {
1469    if (logger.isLoggable(BasicLevel.DEBUG))
1470      logger.log(
1471        BasicLevel.DEBUG,
1472        "Session.preReceive(" + mc + ')');
1473    // The message consumer may have been closed
1474
// after the first check (in MessageConsumer.receive())
1475
// and before preReceive.
1476
mc.checkClosed();
1477
1478    checkClosed();
1479    checkThreadOfControl();
1480    
1481    // Don't call checkSessionMode because
1482
// we also check that the session mode is not
1483
// already set to RECEIVE.
1484
switch (sessionMode) {
1485    case SessionMode.NONE:
1486      setSessionMode(SessionMode.RECEIVE);
1487      break;
1488    default:
1489      throw new IllegalStateException JavaDoc("Illegal session mode");
1490    }
1491
1492    if (requestStatus != RequestStatus.NONE)
1493      throw new IllegalStateException JavaDoc("Illegal request status");
1494
1495    singleThreadOfControl = Thread.currentThread();
1496    pendingMessageConsumer = mc;
1497    
1498    setRequestStatus(RequestStatus.RUN);
1499  }
1500  
1501  /**
1502   * Final stage after calling the reply has been returned
1503   * by the roxy. It releases the session and enables another
1504   * thread to call it.
1505   */

1506  private synchronized void postReceive() {
1507    if (logger.isLoggable(BasicLevel.DEBUG))
1508      logger.log(
1509        BasicLevel.DEBUG,
1510        "Session.postReceive()");
1511
1512    singleThreadOfControl = null;
1513    pendingMessageConsumer = null;
1514    setRequestStatus(RequestStatus.NONE);
1515    setSessionMode(SessionMode.NONE);
1516    notifyAll();
1517  }
1518  
1519  /**
1520   * Called here and by sub-classes.
1521   */

1522  protected synchronized void addConsumer(
1523    MessageConsumer mc) {
1524    consumers.addElement(mc);
1525  }
1526
1527  /**
1528   * Called by MessageConsumer.
1529   */

1530  synchronized void closeConsumer(MessageConsumer mc) {
1531    if (logger.isLoggable(BasicLevel.DEBUG))
1532      logger.log(
1533        BasicLevel.DEBUG,
1534        "Session.closeConsumer(" + mc + ')');
1535    consumers.removeElement(mc);
1536
1537    if (pendingMessageConsumer == mc) {
1538      if (requestStatus == RequestStatus.RUN) {
1539        // Close the requestor. A call to abortRequest()
1540
// is not enough because the receiving thread
1541
// may call request() just after this thread
1542
// calls abort().
1543
receiveRequestor.close();
1544
1545        // Wait for the end of the request
1546
try {
1547          while (requestStatus != RequestStatus.NONE) {
1548            wait();
1549          }
1550        } catch (InterruptedException JavaDoc exc) {}
1551
1552        // Create a new requestor.
1553
receiveRequestor = new Requestor(mtpx);
1554      }
1555    }
1556  }
1557  
1558  /**
1559   * Called by Connection (i.e. temporary destinations deletion)
1560   */

1561  synchronized void checkConsumers(String JavaDoc agentId)
1562    throws JMSException JavaDoc {
1563    for (int j = 0; j < consumers.size(); j++) {
1564      MessageConsumer cons =
1565        (MessageConsumer) consumers.elementAt(j);
1566      if (agentId.equals(cons.dest.agentId)) {
1567        throw new JMSException JavaDoc(
1568          "Consumers still exist for this temp queue.");
1569      }
1570    }
1571  }
1572
1573  /**
1574   * Called here and by sub-classes.
1575   */

1576  protected void addProducer(MessageProducer mp) {
1577    producers.addElement(mp);
1578  }
1579
1580  /**
1581   * Called by MessageProducer.
1582   */

1583  synchronized void closeProducer(MessageProducer mp) {
1584    producers.removeElement(mp);
1585  }
1586
1587  /**
1588   * Called by Queue browser.
1589   */

1590  synchronized void closeBrowser(QueueBrowser qb) {
1591    browsers.removeElement(qb);
1592  }
1593
1594  /**
1595   * Called by MessageConsumer
1596   */

1597  synchronized MessageConsumerListener addMessageListener(
1598    MessageConsumerListener mcl) throws JMSException JavaDoc {
1599    if (logger.isLoggable(BasicLevel.DEBUG))
1600      logger.log(
1601        BasicLevel.DEBUG,
1602        "Session.addMessageListener(" + mcl + ')');
1603    checkClosed();
1604    checkThreadOfControl();
1605
1606    checkSessionMode(SessionMode.LISTENER);
1607
1608    mcl.start();
1609    
1610    if (status == Status.START &&
1611        listenerCount == 0) {
1612      doStart();
1613    }
1614
1615    listenerCount++;
1616    return mcl;
1617  }
1618
1619  /**
1620   * Called by MessageConsumer. The thread of control and the status
1621   * must be checked if the call results from a setMessageListener
1622   * but not from a close.
1623   */

1624  void removeMessageListener(
1625    MessageConsumerListener mcl,
1626    boolean check) throws JMSException JavaDoc {
1627    if (logger.isLoggable(BasicLevel.DEBUG))
1628      logger.log(
1629        BasicLevel.DEBUG,
1630        "Session.removeMessageListener(" +
1631        mcl + ',' + check + ')');
1632
1633    if (check) {
1634      checkClosed();
1635      checkThreadOfControl();
1636    }
1637    
1638    // This may block if a message listener
1639
// is currently receiving a message (onMessage is called)
1640
// so we have to be out of the synchronized block.
1641
mcl.close();
1642    
1643    synchronized (this) {
1644      listenerCount--;
1645      if (status == Status.START && listenerCount == 0) {
1646        try {
1647          repliesIn.stop();
1648        } catch (InterruptedException JavaDoc iE) {
1649        }
1650        // All the message listeners have been closed
1651
// so we can call doStop() in a synchronized
1652
// block. No deadlock possible.
1653
doStop();
1654      }
1655    }
1656  }
1657
1658  /**
1659   * Called by MessageConsumerListener (demultiplexer thread
1660   * from RequestMultiplexer) in order to distribute messages
1661   * to a message consumer.
1662   * Not synchronized because a concurrent close
1663   * can be done.
1664   *
1665   * @exception
1666   */

1667  void pushMessages(SingleSessionConsumer consumerListener,
1668                   ConsumerMessages messages) {
1669    if (logger.isLoggable(BasicLevel.DEBUG))
1670      logger.log(
1671        BasicLevel.DEBUG,
1672        "Session.pushMessages(" +
1673        consumerListener + ',' + messages + ')');
1674    repliesIn.push(
1675      new MessageListenerContext(
1676        consumerListener, messages));
1677  }
1678
1679  /**
1680   * Called by ConnectionConsumer in order to distribute a message through the
1681   * method run(). Session mode is APP_SERVER.
1682   */

1683  void onMessage(org.objectweb.joram.shared.messages.Message msg) {
1684    if (logger.isLoggable(BasicLevel.DEBUG))
1685      logger.log(BasicLevel.DEBUG, "Session.onMessage(" + msg + ')');
1686
1687    repliesIn.push(msg);
1688  }
1689
1690  /**
1691   * Called by:
1692   * - method run (application server thread) synchronized
1693   */

1694  private void ackMessage(String JavaDoc targetName,
1695                          String JavaDoc msgId,
1696                          boolean queueMode)
1697    throws JMSException JavaDoc {
1698    ConsumerAckRequest ack = new ConsumerAckRequest(targetName, queueMode);
1699    ack.addId(msgId);
1700    mtpx.sendRequest(ack);
1701  }
1702
1703  /**
1704   * Called by:
1705   * - method run (application server thread) synchronized
1706   * - method onMessage (SessionDaemon thread) not synchronized
1707   * but no concurrent call except a close which first stops
1708   * SessionDaemon.
1709   */

1710  private void denyMessage(String JavaDoc targetName,
1711                           String JavaDoc msgId,
1712                           boolean queueMode)
1713    throws JMSException JavaDoc {
1714    if (logger.isLoggable(BasicLevel.DEBUG))
1715      logger.log(
1716        BasicLevel.DEBUG,
1717        "Session.denyMessage(" +
1718        targetName + ',' +
1719        msgId + ',' +
1720        queueMode + ')');
1721    ConsumerDenyRequest cdr = new ConsumerDenyRequest(
1722      targetName, msgId, queueMode);
1723    if (queueMode) {
1724      requestor.request(cdr);
1725    } else {
1726      mtpx.sendRequest(cdr, null);
1727    }
1728  }
1729  
1730  /**
1731   * Called by SessionDaemon.
1732   * Not synchronized but no concurrent call except
1733   * a close which first stops SessionDaemon.
1734   */

1735  private void onMessages(MessageListenerContext ctx) throws JMSException JavaDoc {
1736    Vector msgs = ctx.messages.getMessages();
1737    for (int i = 0; i < msgs.size(); i++) {
1738      onMessage((org.objectweb.joram.shared.messages.Message) msgs.elementAt(i),
1739                ctx.consumerListener);
1740    }
1741  }
1742
1743  /**
1744   * Called by onMessages()
1745   */

1746  void onMessage(org.objectweb.joram.shared.messages.Message momMsg,
1747                 MessageConsumerListener mcl) throws JMSException JavaDoc {
1748    String JavaDoc msgId = momMsg.id;
1749    
1750    if (! autoAck)
1751      prepareAck(mcl.getTargetName(), msgId, mcl.getQueueMode());
1752
1753    Message msg = null;
1754    try {
1755      msg = Message.wrapMomMessage(this, momMsg);
1756    } catch (JMSException JavaDoc jE) {
1757      // Catching a JMSException means that the building of the Joram
1758
// message went wrong: denying the message:
1759
if (autoAck)
1760        denyMessage(mcl.getTargetName(), msgId, mcl.getQueueMode());
1761      return;
1762    }
1763    msg.session = this;
1764    
1765    try {
1766      if (messageListener == null) {
1767        // Standard JMS (MessageConsumer)
1768
mcl.onMessage(msg, acknowledgeMode);
1769      } else {
1770        // ASF (ConnectionConsumer)
1771
mcl.onMessage(msg, messageListener, acknowledgeMode);
1772      }
1773    } catch (JMSException JavaDoc exc) {
1774      if (logger.isLoggable(BasicLevel.DEBUG))
1775        logger.log(BasicLevel.DEBUG, "", exc);
1776
1777      if (autoAck || mcl.isClosed()) {
1778        denyMessage(mcl.getTargetName(), msgId, mcl.getQueueMode());
1779      }
1780      return;
1781    }
1782    
1783    if (recover) {
1784      // The session has been recovered by the
1785
// listener thread.
1786
if (autoAck) {
1787        denyMessage(mcl.getTargetName(), msgId, mcl.getQueueMode());
1788      } else {
1789        doRecover();
1790        recover = false;
1791      }
1792    } else {
1793      if (autoAck) {
1794        mcl.ack(msgId, acknowledgeMode);
1795      }
1796    }
1797  }
1798
1799  /**
1800   * Called by MessageProducer.
1801   */

1802  synchronized void send(Destination dest,
1803                         javax.jms.Message JavaDoc msg,
1804                         int deliveryMode,
1805                         int priority,
1806                         long timeToLive,
1807                         boolean timestampDisabled) throws JMSException JavaDoc {
1808    if (logger.isLoggable(BasicLevel.DEBUG))
1809      logger.log(BasicLevel.DEBUG,
1810                 "Session.send(" + dest + ',' + msg + ',' + deliveryMode + ',' +
1811                 priority + ',' + timeToLive + ',' + timestampDisabled + ')');
1812    
1813    checkClosed();
1814    checkThreadOfControl();
1815
1816    // Updating the message property fields:
1817
String JavaDoc msgID = cnx.nextMessageId();
1818    msg.setJMSMessageID(msgID);
1819    msg.setJMSDeliveryMode(deliveryMode);
1820    msg.setJMSDestination(dest);
1821    if (timeToLive == 0) {
1822      msg.setJMSExpiration(0);
1823    } else {
1824      msg.setJMSExpiration(System.currentTimeMillis() + timeToLive);
1825    }
1826    msg.setJMSPriority(priority);
1827    if (! timestampDisabled) {
1828      msg.setJMSTimestamp(System.currentTimeMillis());
1829    }
1830    
1831    Message joramMsg = null;
1832    try {
1833      joramMsg = (Message) msg;
1834    } catch (ClassCastException JavaDoc exc) {
1835      try {
1836        // If the message to send is a non proprietary JMS message, try
1837
// to convert it.
1838
joramMsg = Message.convertJMSMessage(msg);
1839      } catch (JMSException JavaDoc jE) {
1840        MessageFormatException JavaDoc mE =
1841          new MessageFormatException JavaDoc("Message to send is invalid.");
1842        mE.setLinkedException(jE);
1843        throw mE;
1844      }
1845    }
1846    joramMsg.prepare();
1847
1848    if (transacted) {
1849      if (logger.isLoggable(BasicLevel.DEBUG))
1850        logger.log(BasicLevel.DEBUG, "Buffering the message.");
1851      // If the session is transacted, keeping the request for later delivery:
1852
prepareSend(dest, (org.objectweb.joram.shared.messages.Message) joramMsg.momMsg.clone());
1853    } else {
1854      ProducerMessages pM = new ProducerMessages(dest.getName(),
1855                                                 (org.objectweb.joram.shared.messages.Message) joramMsg.momMsg.clone());
1856      
1857      if (logger.isLoggable(BasicLevel.DEBUG))
1858        logger.log(BasicLevel.DEBUG, "Sending " + joramMsg);
1859      
1860      if (asyncSend || (! joramMsg.momMsg.persistent)) {
1861        // Asynchronous sending
1862
pM.setAsyncSend(true);
1863        mtpx.sendRequest(pM);
1864      } else {
1865        requestor.request(pM);
1866      }
1867    }
1868  }
1869
1870  /**
1871   * Called by MessageConsumer. The requestor raises an
1872   * exception if it is called during another request.
1873   * This cannot happen as a session is monothreaded.
1874   * A concurrent close first aborts the current request
1875   * so it releases the requestor for a subsequent use.
1876   */

1877  synchronized AbstractJmsReply syncRequest(
1878    AbstractJmsRequest request)
1879    throws JMSException JavaDoc {
1880    return requestor.request(request);
1881  }
1882
1883  final Connection getConnection() {
1884    return cnx;
1885  }
1886
1887  final String JavaDoc getId() {
1888    return ident;
1889  }
1890
1891  final RequestMultiplexer getRequestMultiplexer() {
1892    return mtpx;
1893  }
1894
1895  public final boolean isAutoAck() {
1896    return autoAck;
1897  }
1898
1899  private void activateMessageInput() throws JMSException JavaDoc {
1900    for (int i = 0; i < consumers.size(); i++) {
1901      MessageConsumer cons =
1902        (MessageConsumer) consumers.elementAt(i);
1903      cons.activateMessageInput();
1904    }
1905    passiveMsgInput = false;
1906  }
1907
1908  private void passivateMessageInput() throws JMSException JavaDoc {
1909    for (int i = 0; i < consumers.size(); i++) {
1910      MessageConsumer cons =
1911        (MessageConsumer) consumers.elementAt(i);
1912      cons.passivateMessageInput();
1913    }
1914    passiveMsgInput = true;
1915  }
1916
1917  /**
1918   * Set asyncSend for this Session.
1919   *
1920   * @param b
1921   */

1922  public void setAsyncSend(boolean b) {
1923    asyncSend = b;
1924  }
1925  
1926  /**
1927   * Set queueMessageReadMax for this Session.
1928   *
1929   * @param i
1930   */

1931  public void setQueueMessageReadMax(int i) {
1932    queueMessageReadMax = i;
1933  }
1934  
1935  public final int getQueueMessageReadMax() {
1936    return queueMessageReadMax;
1937  }
1938  
1939  public final int getTopicAckBufferMax() {
1940    return topicAckBufferMax;
1941  }
1942  
1943  public void setTopicAckBufferMax(int i) {
1944    topicAckBufferMax = i;
1945  }
1946  
1947  public final int getTopicActivationThreshold() {
1948    return topicActivationThreshold;
1949  }
1950  
1951  public void setTopicActivationThreshold(int i) {
1952    topicActivationThreshold = i;
1953  }
1954  
1955  public final int getTopicPassivationThreshold() {
1956    return topicPassivationThreshold;
1957  }
1958  
1959  public void setTopicPassivationThreshold(int i) {
1960    topicPassivationThreshold = i;
1961  }
1962  
1963  /**
1964   * The <code>SessionCloseTask</code> class is used by non-XA transacted
1965   * sessions for taking care of closing them if they tend to be pending,
1966   * and if a transaction timer has been set.
1967   */

1968  private class SessionCloseTask extends TimerTask {
1969    private long txPendingTimer;
1970
1971    SessionCloseTask(long txPendingTimer) {
1972      this.txPendingTimer = txPendingTimer;
1973    }
1974
1975    /** Method called when the timer expires, actually closing the session. */
1976    public void run() {
1977      try {
1978        if (logger.isLoggable(BasicLevel.WARN))
1979          logger.log(BasicLevel.WARN, "Session closed "
1980                                     + "because of pending transaction");
1981        close();
1982      } catch (Exception JavaDoc e) {}
1983    }
1984
1985    public void start() {
1986      try {
1987        mtpx.schedule(this, txPendingTimer);
1988      } catch (Exception JavaDoc e) {}
1989    }
1990  }
1991
1992  /**
1993   * This thread controls the session in mode LISTENER.
1994   */

1995  private class SessionDaemon extends fr.dyade.aaa.util.Daemon {
1996    SessionDaemon() {
1997      super("Connection#" + cnx + " - Session#" + ident);
1998    }
1999
2000    public void run() {
2001      while (running) {
2002        canStop = true;
2003        MessageListenerContext ctx;
2004        try {
2005          ctx = (MessageListenerContext)repliesIn.get();
2006          repliesIn.pop();
2007        } catch (InterruptedException JavaDoc exc) {
2008          if (logger.isLoggable(BasicLevel.DEBUG))
2009            logger.log(BasicLevel.DEBUG, "", exc);
2010          return;
2011        }
2012
2013        canStop = false;
2014        try {
2015          onMessages(ctx);
2016        } catch (JMSException JavaDoc exc) {
2017          if (logger.isLoggable(BasicLevel.DEBUG))
2018            logger.log(BasicLevel.DEBUG, "", exc);
2019        }
2020      }
2021    }
2022
2023    Thread JavaDoc getThread() {
2024      return thread;
2025    }
2026
2027    protected void shutdown() {}
2028
2029    protected void close() {}
2030  }
2031
2032  /**
2033   * Context used to associate a message consumer with
2034   * a set of messages to consume.
2035   */

2036  private static class MessageListenerContext {
2037    SingleSessionConsumer consumerListener;
2038    ConsumerMessages messages;
2039
2040    MessageListenerContext(
2041      SingleSessionConsumer consumerListener,
2042      ConsumerMessages messages) {
2043      this.consumerListener = consumerListener;
2044      this.messages = messages;
2045    }
2046  }
2047}
2048
Popular Tags