KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > notification > servant > StructuredProxyPushSupplierImpl


1 package org.jacorb.notification.servant;
2
3 /*
4  * JacORB - a free Java ORB
5  *
6  * Copyright (C) 1999-2004 Gerald Brose
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the Free
20  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */

23
24 import java.util.List JavaDoc;
25
26 import org.apache.avalon.framework.configuration.Configuration;
27 import org.apache.avalon.framework.configuration.ConfigurationException;
28 import org.jacorb.notification.OfferManager;
29 import org.jacorb.notification.SubscriptionManager;
30 import org.jacorb.notification.engine.MessagePushOperation;
31 import org.jacorb.notification.engine.PushTaskExecutorFactory;
32 import org.jacorb.notification.engine.TaskProcessor;
33 import org.jacorb.notification.interfaces.Message;
34 import org.jacorb.notification.interfaces.MessageConsumer;
35 import org.jacorb.notification.util.CollectionsWrapper;
36 import org.omg.CORBA.ORB JavaDoc;
37 import org.omg.CosEventChannelAdmin.AlreadyConnected;
38 import org.omg.CosEventComm.Disconnected;
39 import org.omg.CosNotification.EventType;
40 import org.omg.CosNotification.StructuredEvent;
41 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
42 import org.omg.CosNotifyChannelAdmin.ProxySupplierHelper;
43 import org.omg.CosNotifyChannelAdmin.ProxyType;
44 import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplierOperations;
45 import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplierPOATie;
46 import org.omg.CosNotifyComm.InvalidEventType;
47 import org.omg.CosNotifyComm.StructuredPushConsumer;
48 import org.omg.CosNotifyComm.StructuredPushConsumerOperations;
49 import org.omg.PortableServer.POA JavaDoc;
50 import org.omg.PortableServer.Servant JavaDoc;
51
52 /**
53  * @author Alphonse Bendt
54  * @version $Id: StructuredProxyPushSupplierImpl.java,v 1.17 2005/04/27 10:45:46 alphonse.bendt Exp $
55  */

56
57 public class StructuredProxyPushSupplierImpl extends AbstractProxyPushSupplier implements
58         StructuredProxyPushSupplierOperations
59 {
60     private class PushStructuredOperation extends MessagePushOperation
61     {
62         public PushStructuredOperation(Message message) {
63             super(message);
64         }
65
66         public void invokePush() throws Disconnected {
67             deliverMessageInternal(message_);
68         }
69     }
70
71     
72     private final static StructuredPushConsumerOperations NULL_CONSUMER = new StructuredPushConsumerOperations()
73     {
74         public void push_structured_event(StructuredEvent event)
75         {
76         }
77
78         public void disconnect_structured_push_consumer()
79         {
80         }
81
82         public void offer_change(EventType[] added, EventType[] removed) throws InvalidEventType
83         {
84         }
85     };
86
87     private StructuredPushConsumerOperations pushConsumer_;
88
89     private long timeSpent_;
90     
91     // //////////////////////////////////////
92

93     public StructuredProxyPushSupplierImpl(IAdmin admin, ORB JavaDoc orb, POA JavaDoc poa, Configuration conf,
94             TaskProcessor taskProcessor, PushTaskExecutorFactory pushTaskExecutorFactory, OfferManager offerManager,
95             SubscriptionManager subscriptionManager, ConsumerAdmin consumerAdmin)
96             throws ConfigurationException
97     {
98         super(admin, orb, poa, conf, taskProcessor, pushTaskExecutorFactory, offerManager,
99                 subscriptionManager, consumerAdmin);
100     }
101
102     public ProxyType MyType()
103     {
104         return ProxyType.PUSH_STRUCTURED;
105     }
106
107     public void pushPendingData()
108     {
109         Message[] _mesgs = getAllMessages();
110
111         if (_mesgs != null)
112         {
113             for (int x = 0; x < _mesgs.length; ++x)
114             {
115                 try
116                 {
117                     deliverMessageWithRetry(_mesgs[x]);
118                 } finally
119                 {
120                     _mesgs[x].dispose();
121                 }
122             }
123         }
124     }
125
126     private void deliverMessageWithRetry(final Message message)
127     {
128         try
129         {
130             deliverMessageInternal(message);
131         } catch (Throwable JavaDoc e)
132         {
133             PushStructuredOperation _failedOperation = new PushStructuredOperation(message);
134
135             handleFailedPushOperation(_failedOperation, e);
136         }
137     }
138
139     void deliverMessageInternal(final Message message) throws Disconnected
140     {
141         long now = System.currentTimeMillis();
142         pushConsumer_.push_structured_event(message.toStructuredEvent());
143         timeSpent_ += (System.currentTimeMillis() - now);
144         resetErrorCounter();
145     }
146
147     public void connect_structured_push_consumer(StructuredPushConsumer consumer)
148             throws AlreadyConnected
149     {
150         checkIsNotConnected();
151
152         if (logger_.isDebugEnabled())
153         {
154             logger_.debug("connect structured_push_consumer");
155         }
156
157         pushConsumer_ = consumer;
158
159         connectClient(consumer);
160     }
161
162     public void disconnect_structured_push_supplier()
163     {
164         destroy();
165     }
166
167     protected void connectionResumed()
168     {
169         schedulePush();
170     }
171
172     protected void disconnectClient()
173     {
174         pushConsumer_.disconnect_structured_push_consumer();
175
176         pushConsumer_ = NULL_CONSUMER;
177     }
178
179     public List JavaDoc getSubsequentFilterStages()
180     {
181         return CollectionsWrapper.singletonList(this);
182     }
183
184     public MessageConsumer getMessageConsumer()
185     {
186         return this;
187     }
188
189     public synchronized Servant JavaDoc getServant()
190     {
191         if (thisServant_ == null)
192         {
193             thisServant_ = new StructuredProxyPushSupplierPOATie(this);
194         }
195         return thisServant_;
196     }
197
198     public org.omg.CORBA.Object JavaDoc activate()
199     {
200         return ProxySupplierHelper.narrow(getServant()._this_object(getORB()));
201     }
202     
203     protected long getCost()
204     {
205         return timeSpent_;
206     }
207 }
Popular Tags