KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > avalon > excalibur > command > TPCThreadManager


1 /*
2  * Copyright (C) The Apache Software Foundation. All rights reserved.
3  *
4  * This software is published under the terms of the Apache Software License
5  * version 1.1, a copy of which has been included with this distribution in
6  * the LICENSE.txt file.
7  */

8 package org.apache.avalon.excalibur.command;
9
10 import org.apache.avalon.framework.parameters.Parameters;
11 import org.apache.avalon.excalibur.concurrent.Mutex;
12 import org.apache.avalon.excalibur.thread.*;
13 import org.apache.avalon.excalibur.thread.impl.ResourceLimitingThreadPool;
14
15 import org.apache.avalon.excalibur.event.Sink;
16 import org.apache.avalon.excalibur.event.EventHandler;
17
18 import java.util.HashSet JavaDoc;
19 import java.util.HashMap JavaDoc;
20 import java.util.Iterator JavaDoc;
21
22 /**
23  * This is a ThreadManager that uses a certain number of threads per processor.
24  * The number of threads in the pool is a direct proportion to the number of
25  * processors.
26  *
27  * @author <a HREF="mailto:bloritsch@apache.org">Berin Loritsch</a>
28  */

29 public final class TPCThreadManager implements Runnable JavaDoc, ThreadManager
30 {
31     private final ThreadPool m_threadPool;
32     private final Mutex m_mutex = new Mutex();
33     private final HashMap JavaDoc m_pipelines = new HashMap JavaDoc();
34     private ThreadControl m_threadControl;
35     private boolean m_done = false;
36     private final long m_sleepTime;
37
38     /**
39      * The default constructor assumes there is a system property named "os.arch.cpus"
40      * that has a default for the number of CPUs on a system. Otherwise, the value
41      * is 1.
42      */

43     public TPCThreadManager()
44     {
45         this( Integer.parseInt( System.getProperty( "os.arch.cpus", "1" ) ) , 1 );
46     }
47
48     /**
49      * This constructor assumes there is a parameter named "os.arch.cpus"
50      * that has a default for the number of CPUs on a system. Otherwise, the value
51      * is 1.
52      */

53     public TPCThreadManager(Parameters params)
54     {
55         this( params.getParameterAsInteger( "os.arch.cpus", 1 ) ,
56               params.getParameterAsInteger( "container.threadsPerCPU", 2 ) );
57     }
58
59     /**
60      * Constructor provides one thread per number of processors.
61      */

62     public TPCThreadManager( int numProcessors )
63     {
64         this( numProcessors, 1 );
65     }
66
67     /**
68      * Constructor provides a specified number of threads per processor. If
69      * either value is less then one, then the value is rewritten as one.
70      */

71     public TPCThreadManager( int numProcessors, int threadsPerProcessor )
72     {
73         this( numProcessors, threadsPerProcessor, 1000 );
74     }
75
76     /**
77      * Constructor provides a specified number of threads per processor. If
78      * either value is less then one, then the value is rewritten as one.
79      */

80     public TPCThreadManager( int numProcessors, int threadsPerProcessor, long sleepTime )
81     {
82         int processors = Math.max( numProcessors, 1 );
83         int threads = Math.max( threadsPerProcessor, 1 );
84
85         m_threadPool = new ResourceLimitingThreadPool( "TPCThreadManager",
86                 ( processors * threads ) + 1, true, true, 1000L, 10L * 1000L );
87
88         m_sleepTime = sleepTime;
89         m_threadControl = m_threadPool.execute( this );
90     }
91
92     /**
93      * Register an EventPipeline with the ThreadManager.
94      */

95     public void register( EventPipeline pipeline )
96     {
97         try
98         {
99             m_mutex.acquire();
100
101             m_pipelines.put( pipeline, new PipelineRunner( pipeline ) );
102
103             if ( m_done )
104             {
105                 m_threadControl = m_threadPool.execute( this );
106             }
107         }
108         catch ( InterruptedException JavaDoc ie )
109         {
110             // ignore for now
111
}
112         finally
113         {
114             m_mutex.release();
115         }
116     }
117
118     /**
119      * Deregister an EventPipeline with the ThreadManager
120      */

121     public void deregister( EventPipeline pipeline )
122     {
123         try
124         {
125             m_mutex.acquire();
126
127             m_pipelines.remove( pipeline );
128
129             if ( m_pipelines.isEmpty() )
130             {
131                 m_done = true;
132                 m_threadControl.join( 1000 );
133             }
134         }
135         catch ( InterruptedException JavaDoc ie )
136         {
137             // ignore for now
138
}
139         finally
140         {
141             m_mutex.release();
142         }
143     }
144
145     /**
146      * Deregisters all EventPipelines from this ThreadManager
147      */

148     public void deregisterAll()
149     {
150         try
151         {
152             m_mutex.acquire();
153
154             m_done = true;
155             m_pipelines.clear();
156
157             m_threadControl.join( 1000 );
158         }
159         catch ( InterruptedException JavaDoc ie )
160         {
161             // ignore for now
162
}
163         finally
164         {
165             m_mutex.release();
166         }
167     }
168
169     public void run()
170     {
171         while ( ! m_done )
172         {
173             try
174             {
175                 m_mutex.acquire();
176
177                 Iterator JavaDoc i = m_pipelines.values().iterator();
178
179                 while ( i.hasNext() )
180                 {
181                     m_threadPool.execute( (PipelineRunner) i.next() );
182                 }
183             }
184             catch ( InterruptedException JavaDoc ie )
185             {
186                 // ignore for now
187
}
188             finally
189             {
190                 m_mutex.release();
191             }
192
193             try
194             {
195                 Thread.sleep( m_sleepTime );
196             }
197             catch ( InterruptedException JavaDoc ie )
198             {
199                // ignore and continue processing
200
}
201         }
202     }
203
204     public final static class PipelineRunner implements Runnable JavaDoc
205     {
206         private final EventPipeline m_pipeline;
207
208         protected PipelineRunner( EventPipeline pipeline )
209         {
210             m_pipeline = pipeline;
211         }
212
213         public void run()
214         {
215             Sink[] sinks = m_pipeline.getSinks();
216             EventHandler handler = m_pipeline.getEventHandler();
217
218             for (int i = 0; i < sinks.length; i++)
219             {
220                 handler.handleEvents( sinks[i].dequeueAll() );
221             }
222         }
223     }
224 }
Popular Tags