KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > example > AsyncAction


1 package org.example;
2
3 import java.util.HashMap JavaDoc;
4 import java.util.Map JavaDoc;
5
6 import javax.jms.Connection JavaDoc;
7 import javax.jms.ConnectionFactory JavaDoc;
8 import javax.jms.Destination JavaDoc;
9 import javax.jms.JMSException JavaDoc;
10 import javax.jms.MapMessage JavaDoc;
11 import javax.jms.MessageProducer JavaDoc;
12 import javax.jms.Session JavaDoc;
13 import javax.naming.Context JavaDoc;
14 import javax.naming.InitialContext JavaDoc;
15 import javax.naming.NamingException JavaDoc;
16
17 import org.apache.commons.logging.Log;
18 import org.apache.commons.logging.LogFactory;
19
20 import org.jbpm.context.exe.ContextInstance;
21 import org.jbpm.graph.def.ActionHandler;
22 import org.jbpm.graph.exe.ExecutionContext;
23 import org.jbpm.graph.exe.Token;
24
25 /**
26  * @author Alejandro Guízar
27  * @version $Revision: 1.2 $ $Date: 2005/06/21 20:28:28 $
28  */

29 public class AsyncAction implements ActionHandler {
30   
31   // the operation to perform
32
private String JavaDoc operation;
33   // a space-separated list of variable names
34
private String JavaDoc variables;
35   
36   private Connection JavaDoc jmsConnection;
37   // maps operation names to jms destinations
38
private Map JavaDoc operationQueues = new HashMap JavaDoc();
39   
40   /** JMS property for the operation name */
41   public static final String JavaDoc OPERATION_PROPERTY = "Operation";
42   /** Item name for the process identifier */
43   public static final String JavaDoc PROCESS_ID_ITEM = "_$processId";
44   /** Item name for the token identifier */
45   public static final String JavaDoc TOKEN_ID_ITEM = "_$tokenId";
46   
47   // jndi name for jms connection factory */
48
private static final String JavaDoc DEFAULT_CONNECTION_FACTORY = "ConnectionFactory";
49   
50   // even indices are operation names, odd indices are jndi names for queues
51
private static final String JavaDoc[] DEFAULT_OPERATION_QUEUES = {
52       "prong1", "queue/testQueue",
53       "prong2", "queue/testQueue",
54       "prong3", "queue/testQueue"
55   };
56   
57   private static final Log log = LogFactory.getLog(AsyncAction.class);
58
59   public AsyncAction() {
60     Context JavaDoc initialContext = null;
61     try {
62       initialContext = new InitialContext JavaDoc();
63       createJmsConnection(initialContext);
64       registerOperationQueues(initialContext);
65     }
66     catch (NamingException JavaDoc e) {
67       log.error(e);
68       throw new RuntimeException JavaDoc("could not create initial context", e);
69     }
70     finally {
71       if (initialContext != null) {
72         try {
73           initialContext.close();
74         }
75         catch (NamingException JavaDoc e) {
76           log.warn("could not close initial context", e);
77         }
78       }
79     }
80   }
81   
82   public void execute(ExecutionContext executionContext) {
83     ContextInstance contextInstance = executionContext.getContextInstance();
84     Token token = executionContext.getToken();
85     
86     Session JavaDoc jmsSession = null;
87     try {
88       jmsSession = jmsConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
89       
90       MapMessage JavaDoc message = jmsSession.createMapMessage();
91       
92       // operation goes in message header
93
message.setStringProperty(OPERATION_PROPERTY, operation);
94       
95       // variable values go in message body
96
if (variables != null) {
97         String JavaDoc[] variableNames = variables.split("\\s");
98         for (int i = 0, n = variableNames.length; i < n; i++) {
99           String JavaDoc name = variableNames[i];
100           Object JavaDoc value = contextInstance.getVariable(name, token);
101           log.debug("setting variable in message body: name=" + name + ", value=" + value);
102           message.setObject(name, value);
103         }
104       }
105       
106       // process and token identifiers also go in the body
107
long processId = token.getProcessInstance().getId();
108       message.setLong(PROCESS_ID_ITEM, processId);
109       long tokenId = token.getId();
110       message.setLong(TOKEN_ID_ITEM, tokenId);
111       
112       // send the message to the queue corresponding to the operation
113
log.debug("sending message: operation=" + operation +
114           ", processId=" + processId + ", tokenId=" + tokenId);
115       Destination JavaDoc queue = getQueueForOperation(operation);
116       MessageProducer JavaDoc sender = jmsSession.createProducer(queue);
117       sender.send(message);
118     }
119     catch (JMSException JavaDoc e) {
120       log.error(e);
121       throw new RuntimeException JavaDoc("could not send message for operation: " + operation, e);
122     }
123     finally {
124       if (jmsSession != null) {
125         try {
126           jmsSession.close();
127         }
128         catch (JMSException JavaDoc e) {
129           log.warn("could not close jms session", e);
130         }
131       }
132     }
133   }
134   
135   protected Destination JavaDoc getQueueForOperation(String JavaDoc operation) {
136     return (Destination JavaDoc) operationQueues.get(operation);
137   }
138
139   protected void createJmsConnection(Context JavaDoc namingContext) {
140     log.debug("creating jms connection: factory=" + DEFAULT_CONNECTION_FACTORY);
141     try {
142       ConnectionFactory JavaDoc factory = (ConnectionFactory JavaDoc) namingContext.lookup(DEFAULT_CONNECTION_FACTORY);
143       jmsConnection = factory.createConnection();
144     }
145     catch (NamingException JavaDoc e) {
146       log.error(e);
147       throw new RuntimeException JavaDoc("could not retrieve jms connection factory bound to: " +
148           DEFAULT_CONNECTION_FACTORY, e);
149     }
150     catch (JMSException JavaDoc e) {
151       log.error(e);
152       throw new RuntimeException JavaDoc("could not create jms connection", e);
153     }
154   }
155   
156   protected void registerOperationQueues(Context JavaDoc namingContext) {
157     for (int i = 0, n = DEFAULT_OPERATION_QUEUES.length; i < n; i += 2) {
158       String JavaDoc operation = DEFAULT_OPERATION_QUEUES[i];
159       String JavaDoc queueName = DEFAULT_OPERATION_QUEUES[i+1];
160       log.debug("registering operation queue: operation=" + operation + ", queue=" + queueName);
161       try {
162         Destination JavaDoc queue = (Destination JavaDoc) namingContext.lookup(queueName);
163         operationQueues.put(operation, queue);
164       }
165       catch (NamingException JavaDoc e) {
166         log.error(e);
167         throw new RuntimeException JavaDoc("could not retrieve queue bound to: " + queueName, e);
168       }
169     }
170   }
171 }
172
Popular Tags