KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > objectweb > joram > client > jms > connection > RequestMultiplexer


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies
4  * Copyright (C) 1996 - 2000 Dyade
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  * USA.
20  *
21  * Initial developer(s): ScalAgent Distributed Technologies
22  */

23 package org.objectweb.joram.client.jms.connection;
24
25 import java.util.Enumeration JavaDoc;
26 import java.util.Hashtable JavaDoc;
27 import java.util.Set JavaDoc;
28 import java.util.Timer JavaDoc;
29 import java.util.TimerTask JavaDoc;
30 import java.util.Vector JavaDoc;
31
32 import javax.jms.IllegalStateException JavaDoc;
33 import javax.jms.InvalidDestinationException JavaDoc;
34 import javax.jms.JMSException JavaDoc;
35 import javax.jms.JMSSecurityException JavaDoc;
36
37 import org.objectweb.joram.client.jms.Connection;
38 import org.objectweb.joram.shared.client.AbstractJmsReply;
39 import org.objectweb.joram.shared.client.AbstractJmsRequest;
40 import org.objectweb.joram.shared.client.ConsumerMessages;
41 import org.objectweb.joram.shared.client.JmsRequestGroup;
42 import org.objectweb.joram.shared.client.MomExceptionReply;
43 import org.objectweb.joram.shared.client.PingRequest;
44 import org.objectweb.joram.shared.client.SessDenyRequest;
45
46 import org.objectweb.joram.shared.JoramTracing;
47 import org.objectweb.util.monolog.api.BasicLevel;
48
49 public class RequestMultiplexer {
50
51   private static class Status {
52     public static final int OPEN = 0;
53     public static final int CLOSE = 1;
54     
55     private static final String JavaDoc[] names = {"OPEN", "CLOSE"};
56
57     public static String JavaDoc toString(int status) {
58       return names[status];
59     }
60   }
61
62   private Connection cnx;
63
64   private volatile int status;
65
66   private RequestChannel channel;
67
68   public Hashtable JavaDoc requestsTable;
69
70   private int requestCounter;
71
72   private DemultiplexerDaemon demtpx;
73
74   private Timer JavaDoc timer;
75   
76   /**
77    * The task responsible for keeping
78    * the connection alive.
79    */

80   private HeartBeatTask heartBeatTask;
81
82   private javax.jms.ExceptionListener JavaDoc exceptionListener;
83
84   /**
85    * The date of the last request
86    */

87   private volatile long lastRequestDate;
88   
89   public RequestMultiplexer(Connection cnx,
90                             RequestChannel channel,
91                             long heartBeat) throws JMSException JavaDoc {
92     this.channel = channel;
93     this.cnx = cnx;
94     requestsTable = new Hashtable JavaDoc();
95     requestCounter = 0;
96     timer = new Timer JavaDoc();
97     channel.setTimer(timer);
98     try {
99       channel.connect();
100     } catch (Exception JavaDoc exc) {
101       if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
102         JoramTracing.dbgClient.log(BasicLevel.DEBUG, "", exc);
103       throw new JMSException JavaDoc(exc.toString());
104     }
105     
106     demtpx = new DemultiplexerDaemon();
107     demtpx.start();
108     setStatus(Status.OPEN);
109     
110     if (heartBeat > 0) {
111       heartBeatTask = new HeartBeatTask(heartBeat);
112       lastRequestDate = System.currentTimeMillis();
113       try {
114         heartBeatTask.start();
115       } catch (Exception JavaDoc exc) {
116         if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
117           JoramTracing.dbgClient.log(BasicLevel.DEBUG, "", exc);
118         throw new JMSException JavaDoc(exc.toString());
119       }
120     }
121   }
122
123   private void setStatus(int status) {
124     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
125       JoramTracing.dbgClient.log(
126         BasicLevel.DEBUG,
127         "RequestMultiplexer.setStatus(" +
128         Status.toString(status) + ')');
129     this.status = status;
130   }
131   
132   public boolean isClosed() {
133     return status == Status.CLOSE;
134   }
135
136   public void setExceptionListener(
137     javax.jms.ExceptionListener JavaDoc exceptionListener) {
138     this.exceptionListener = exceptionListener;
139   }
140
141   public javax.jms.ExceptionListener JavaDoc getExceptionListener() {
142     return exceptionListener;
143   }
144
145   public void sendRequest(AbstractJmsRequest request) throws JMSException JavaDoc {
146     sendRequest(request, null);
147   }
148   
149   public void sendRequest(AbstractJmsRequest request, ReplyListener listener)
150       throws JMSException JavaDoc {
151
152     synchronized (this) {
153       if (status == Status.CLOSE)
154         throw new IllegalStateException JavaDoc("Connection closed");
155
156       if (requestCounter == Integer.MAX_VALUE) {
157         requestCounter = 0;
158       }
159
160       request.setRequestId(requestCounter++);
161
162       if (listener != null) {
163         requestsTable.put(new Integer JavaDoc(request.getRequestId()), listener);
164       }
165
166       if (heartBeatTask != null) {
167         lastRequestDate = System.currentTimeMillis();
168       }
169     }
170
171     try {
172       channel.send(request);
173     } catch (Exception JavaDoc exc) {
174       if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
175         JoramTracing.dbgClient.log(BasicLevel.DEBUG, "", exc);
176       JMSException JavaDoc jmsExc = new JMSException JavaDoc(exc.toString());
177       jmsExc.setLinkedException(exc);
178       throw jmsExc;
179     }
180   }
181   
182   public void setMultiThreadSync(int delay, int threshold) {
183     channel = new MultiThreadSyncChannel(channel, delay, threshold);
184   }
185
186   /**
187    * Not synchronized because it would possibly
188    * deadlock with some reply listeners
189    * (actually requestors).
190    */

191   public void close() {
192     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
193       JoramTracing.dbgClient.log(BasicLevel.DEBUG, "RequestMultiplexer.close()");
194     
195     synchronized (this) {
196       if (status == Status.CLOSE)
197         return;
198       // Immediately set the status as no error
199
// can be thrown. This enables to release
200
// the lock and avoid any dead lock
201
// with the demultiplexer thread that
202
// calls close() when interrupted.
203
setStatus(Status.CLOSE);
204     }
205
206     if (heartBeatTask != null) heartBeatTask.cancel();
207     if (timer != null) timer.cancel();
208     channel.close();
209     demtpx.stop();
210
211     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
212       JoramTracing.dbgClient.log(
213         BasicLevel.DEBUG, " -> requestsTable=" + requestsTable);
214     
215     // The requests table can't be accessed
216
// either by an external thread (status CLOSE)
217
// or by the internal demultiplexer thread (stopped).
218

219     cleanup();
220   }
221
222   /**
223    * Used by:
224    * 1- close()
225    * 2- the connector layer (OutboundConnection.cleanup())
226    */

227   public void cleanup() {
228     // Create first a copy of the current keys
229
// registered into the requests table.
230
Integer JavaDoc[] requestIds;
231     synchronized (requestsTable) {
232       Set JavaDoc keySet = requestsTable.keySet();
233       requestIds = new Integer JavaDoc[keySet.size()];
234       keySet.toArray(requestIds);
235     }
236     for (int i = 0; i < requestIds.length; i++) {
237       ReplyListener rl = (ReplyListener) requestsTable.get(requestIds[i]);
238       // The listener may be null because the table
239
// may have been modified meanwhile.
240
if (rl != null) {
241         rl.replyAborted(requestIds[i].intValue());
242       }
243     }
244     requestsTable.clear();
245   }
246
247   /**
248    * Not synchronized because it would possibly
249    * deadlock with some reply listeners
250    * (actually requestors).
251    */

252   public void abortRequest(int requestId) {
253     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
254       JoramTracing.dbgClient.log(
255         BasicLevel.DEBUG,
256         "RequestMultiplexer.abortRequest(" + requestId + ')');
257     ReplyListener rl = doAbortRequest(requestId);
258     if (rl != null) {
259       rl.replyAborted(requestId);
260     }
261   }
262   
263   private synchronized ReplyListener doAbortRequest(int requestId) {
264     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
265       JoramTracing.dbgClient.log(
266         BasicLevel.DEBUG, "RequestMultiplexer.doAbortRequest(" +
267         requestId + ')');
268     if (status == Status.CLOSE) return null;
269     return (ReplyListener)requestsTable.remove(
270       new Integer JavaDoc(requestId));
271   }
272
273   /**
274    * Not synchronized because it may be called by the
275    * demultiplexer during a concurrent close. It would deadlock
276    * as the close waits for the demultiplexer to stop.
277    */

278   private void route(AbstractJmsReply reply) {
279     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
280       JoramTracing.dbgClient.log(
281         BasicLevel.DEBUG,
282         "RequestMultiplexer.route(" + reply + ')');
283     int requestId = reply.getCorrelationId();
284     Integer JavaDoc requestKey = new Integer JavaDoc(requestId);
285     ReplyListener rl = (ReplyListener)requestsTable.get(requestKey);
286     if (reply instanceof MomExceptionReply) {
287       MomExceptionReply excReply = (MomExceptionReply) reply;
288       int excType = excReply.getType();
289       JMSException JavaDoc jmsExc = null;
290       if (excType == MomExceptionReply.AccessException) {
291         jmsExc = new JMSSecurityException JavaDoc(excReply.getMessage());
292       } else if (excType == MomExceptionReply.DestinationException) {
293         jmsExc = new InvalidDestinationException JavaDoc(excReply.getMessage());
294       } else {
295         jmsExc = new JMSException JavaDoc(excReply.getMessage());
296       }
297       if (rl instanceof ErrorListener) {
298         ((ErrorListener)rl).errorReceived(requestId, jmsExc);
299       } else {
300         // The listener is null or doesn't implement ErrorListener
301
onException(jmsExc);
302       }
303     } else {
304       if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
305         JoramTracing.dbgClient.log(
306           BasicLevel.DEBUG, " -> rl = " + rl + ')');
307       if (rl != null) {
308         try {
309           if (rl.replyReceived(reply)) {
310             requestsTable.remove(requestKey);
311           }
312         } catch (AbortedRequestException exc) {
313           JoramTracing.dbgClient.log(
314             BasicLevel.WARN,
315             " -> Request aborted: " + requestId);
316           abortReply(reply);
317         }
318       } else {
319         if (JoramTracing.dbgClient.isLoggable(BasicLevel.WARN))
320           JoramTracing.dbgClient.log(
321             BasicLevel.WARN,
322             " -> Listener not found for the reply: " + requestId);
323         abortReply(reply);
324       }
325     }
326   }
327
328   private void abortReply(AbstractJmsReply reply) {
329     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
330       JoramTracing.dbgClient.log(BasicLevel.DEBUG,
331                                  "RequestMultiplexer.abortReply(" + reply + ')');
332     if (reply instanceof ConsumerMessages) {
333       deny((ConsumerMessages)reply);
334     }
335     // Else nothing to do.
336
}
337
338   public void deny(ConsumerMessages messages) {
339     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
340       JoramTracing.dbgClient.log(
341         BasicLevel.DEBUG, "RequestMultiplexer.deny(" +
342         messages + ')');
343
344     Vector JavaDoc msgList = messages.getMessages();
345     Vector JavaDoc ids = new Vector JavaDoc();
346     for (int i = 0; i < msgList.size(); i++) {
347       ids.addElement(((org.objectweb.joram.shared.messages.Message) msgList.elementAt(i)).id);
348     }
349     SessDenyRequest deny = new SessDenyRequest(messages.comesFrom(),
350                                                ids,
351                                                messages.getQueueMode());
352     try {
353       sendRequest(deny);
354     } catch (JMSException JavaDoc exc) {
355       if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
356         JoramTracing.dbgClient.log(
357           BasicLevel.DEBUG, "", exc);
358       // Connection failure
359
// Nothing to do
360
}
361   }
362
363   class onExceptionRunner implements Runnable JavaDoc {
364     Exception JavaDoc exc;
365
366     onExceptionRunner(Exception JavaDoc exc) {
367       this.exc = exc;
368     }
369
370     public void run() {
371       onException(exc);
372     }
373   }
374
375   private void onException(Exception JavaDoc exc) {
376     if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
377       JoramTracing.dbgClient.log(
378         BasicLevel.DEBUG, "RequestMultiplexer.onException(" + exc + ')');
379     JMSException JavaDoc jmsExc;
380     if (exc instanceof JMSException JavaDoc) {
381       jmsExc = (JMSException JavaDoc) exc;
382     } else {
383       jmsExc = new IllegalStateException JavaDoc(exc.getMessage());
384     }
385     if (exceptionListener != null)
386       exceptionListener.onException(jmsExc);
387   }
388
389   public void schedule(TimerTask JavaDoc task,
390                        long period) {
391     if (timer != null) {
392       try {
393         timer.schedule(task, period);
394       } catch (Exception JavaDoc exc) {
395         if (JoramTracing.dbgClient.isLoggable(BasicLevel.ERROR))
396           JoramTracing.dbgClient.log(BasicLevel.ERROR, "", exc);
397       }
398     }
399   }
400   
401   public void setDemultiplexerDaemonName(String JavaDoc name) {
402     demtpx.setName(name);
403   }
404   
405   public String JavaDoc getDemultiplexerDaemonName() {
406     return demtpx.getName();
407   }
408
409   private class DemultiplexerDaemon extends fr.dyade.aaa.util.Daemon {
410     DemultiplexerDaemon() {
411       // The real name is set later when
412
// the proxy id and connection id are known
413
// see setDemultiplexerDaemonName()
414
super("Connection#?");
415     }
416
417     public void run() {
418       try {
419         loop:
420         while (running) {
421           canStop = true;
422           AbstractJmsReply reply;
423           try {
424             reply = channel.receive();
425           } catch (Exception JavaDoc exc) {
426             if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
427               JoramTracing.dbgClient.log(BasicLevel.DEBUG,
428                                          "Exception during receive", exc);
429             // Check if the connection is not already
430
// closed (the exception may occur as a consequence
431
// of a closure or at the same time as an independant
432
// close call).
433
if (! isClosed()) {
434               RequestMultiplexer.this.close();
435               // The connection close() must be
436
// called by another thread. Calling it with
437
// this thread (demultiplexer daemon) could
438
// lead to a deadlock if another thread called
439
// close() just before.
440
Closer closer = new Closer(exc);
441               new Thread JavaDoc(closer).start();
442             } else {
443               // Else it means that the connection is already closed
444
// Runs the onException in a separate thread in order to avoid
445
// deadlock in connector onException (synchronized).
446
onExceptionRunner oer = new onExceptionRunner(exc);
447               new Thread JavaDoc(oer).start();
448             }
449             
450             break loop;
451           }
452           canStop = false;
453           route(reply);
454         }
455       } finally {
456         finish();
457       }
458     }
459     
460     /**
461      * Enables the daemon to stop itself.
462      */

463     public void stop() {
464       if (isCurrentThread()) {
465         finish();
466       } else {
467         super.stop();
468       }
469     }
470
471     protected void shutdown() {}
472
473     protected void close() {}
474   }
475   
476   private class Closer implements Runnable JavaDoc {
477     private Exception JavaDoc exc;
478     
479     Closer(Exception JavaDoc e) {
480       exc = e;
481     }
482     
483     public void run() {
484       try {
485         RequestMultiplexer.this.cnx.close();
486       } catch (JMSException JavaDoc exc2) {
487         if (JoramTracing.dbgClient.isLoggable(BasicLevel.WARN))
488           JoramTracing.dbgClient.log(BasicLevel.WARN,
489               "Error during close", exc2);
490       }
491       
492       onException(exc);
493     }
494   }
495
496   /**
497    * Timer task responsible for sending a ping message
498    * to the server if no request has been sent during
499    * the specified timeout ('cnxPendingTimer' from the
500    * factory parameters).
501    */

502   private class HeartBeatTask extends TimerTask JavaDoc {
503
504     private long heartBeat;
505
506     HeartBeatTask(long heartBeat) {
507       this.heartBeat = heartBeat;
508     }
509
510     public void run() {
511       try {
512         long date = System.currentTimeMillis();
513         if ((date - lastRequestDate) > heartBeat) {
514           sendRequest(new PingRequest());
515         }
516       } catch (Exception JavaDoc exc) {
517         if (JoramTracing.dbgClient.isLoggable(BasicLevel.DEBUG))
518           JoramTracing.dbgClient.log(BasicLevel.DEBUG, "", exc);
519       }
520     }
521
522     public void start() throws Exception JavaDoc {
523       timer.schedule(this, heartBeat, heartBeat);
524     }
525   }
526
527   
528
529 }
530
Popular Tags