KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > tools > ant > taskdefs > Parallel


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */

18 package org.apache.tools.ant.taskdefs;
19
20 import java.lang.reflect.Method JavaDoc;
21 import java.util.Enumeration JavaDoc;
22 import java.util.Vector JavaDoc;
23 import java.util.List JavaDoc;
24 import java.util.ArrayList JavaDoc;
25 import org.apache.tools.ant.BuildException;
26 import org.apache.tools.ant.Location;
27 import org.apache.tools.ant.Task;
28 import org.apache.tools.ant.TaskContainer;
29 import org.apache.tools.ant.util.StringUtils;
30
31 /**
32  * Executes the contained tasks in separate threads, continuing
33  * once all are completed.
34  * <p>
35  * New behavior allows for the ant script to specify a maximum number of
36  * threads that will be executed in parallel. One should be very careful about
37  * using the <code>waitFor</code> task when specifying <code>threadCount</code>
38  * as it can cause deadlocks if the number of threads is too small or if one of
39  * the nested tasks fails to execute completely. The task selection algorithm
40  * will insure that the tasks listed before a task have started before that
41  * task is started, but it will not insure a successful completion of those
42  * tasks or that those tasks will finish first (i.e. it's a classic race
43  * condition).
44  * </p>
45  * @since Ant 1.4
46  *
47  * @ant.task category="control"
48  */

49 public class Parallel extends Task
50                       implements TaskContainer {
51
52     /** Class which holds a list of tasks to execute */
53     public static class TaskList implements TaskContainer {
54         /** Collection holding the nested tasks */
55         private List JavaDoc tasks = new ArrayList JavaDoc();
56
57         /**
58          * Add a nested task to execute parallel (asynchron).
59          * <p>
60          * @param nestedTask Nested task to be executed in parallel.
61          * must not be null.
62          */

63         public void addTask(Task nestedTask) {
64             tasks.add(nestedTask);
65         }
66     }
67
68     /** Collection holding the nested tasks */
69     private Vector JavaDoc nestedTasks = new Vector JavaDoc();
70
71     /** Semaphore to notify of completed threads */
72     private final Object JavaDoc semaphore = new Object JavaDoc();
73
74     /** Total number of threads to run */
75     private int numThreads = 0;
76
77     /** Total number of threads per processor to run. */
78     private int numThreadsPerProcessor = 0;
79
80     /** The timeout period in milliseconds */
81     private long timeout;
82
83     /** Indicates threads are still running and new threads can be issued */
84     private volatile boolean stillRunning;
85
86     /** Indicates that the execution timedout */
87     private boolean timedOut;
88
89     /**
90      * Indicates whether failure of any of the nested tasks should end
91      * execution
92      */

93     private boolean failOnAny;
94
95     /** The dameon task list if any */
96     private TaskList daemonTasks;
97
98     /** Accumulation of exceptions messages from all nested tasks */
99     private StringBuffer JavaDoc exceptionMessage;
100
101     /** Number of exceptions from nested tasks */
102     private int numExceptions = 0;
103
104     /** The first exception encountered */
105     private Throwable JavaDoc firstException;
106
107     /** The location of the first exception */
108     private Location firstLocation;
109
110     /**
111      * Add a group of daemon threads
112      * @param daemonTasks The tasks to be executed as daemon.
113      */

114     public void addDaemons(TaskList daemonTasks) {
115         if (this.daemonTasks != null) {
116             throw new BuildException("Only one daemon group is supported");
117         }
118         this.daemonTasks = daemonTasks;
119     }
120
121     /**
122      * Interval to poll for completed threads when threadCount or
123      * threadsPerProcessor is specified. Integer in milliseconds.; optional
124      *
125      * @param pollInterval New value of property pollInterval.
126      */

127     public void setPollInterval(int pollInterval) {
128     }
129
130     /**
131      * Control whether a failure in a nested task halts execution. Note that
132      * the task will complete but existing threads will continue to run - they
133      * are not stopped
134      *
135      * @param failOnAny if true any nested task failure causes parallel to
136      * complete.
137      */

138     public void setFailOnAny(boolean failOnAny) {
139         this.failOnAny = failOnAny;
140     }
141
142     /**
143      * Add a nested task to execute in parallel.
144      * @param nestedTask Nested task to be executed in parallel
145      */

146     public void addTask(Task nestedTask) {
147         nestedTasks.addElement(nestedTask);
148     }
149
150     /**
151      * Dynamically generates the number of threads to execute based on the
152      * number of available processors (via
153      * <code>java.lang.Runtime.availableProcessors()</code>). Requires a J2SE
154      * 1.4 VM, and it will overwrite the value set in threadCount.
155      * If used in a 1.1, 1.2, or 1.3 VM then the task will defer to
156      * <code>threadCount</code>.; optional
157      * @param numThreadsPerProcessor Number of threads to create per available
158      * processor.
159      *
160      */

161     public void setThreadsPerProcessor(int numThreadsPerProcessor) {
162         this.numThreadsPerProcessor = numThreadsPerProcessor;
163     }
164
165     /**
166      * Statically determine the maximum number of tasks to execute
167      * simultaneously. If there are less tasks than threads then all will be
168      * executed at once, if there are more then only <code>threadCount</code>
169      * tasks will be executed at one time. If <code>threadsPerProcessor</code>
170      * is set and the JVM is at least a 1.4 VM then this value is
171      * ignored.; optional
172      *
173      * @param numThreads total number of threads.
174      *
175      */

176     public void setThreadCount(int numThreads) {
177         this.numThreads = numThreads;
178     }
179
180     /**
181      * Sets the timeout on this set of tasks. If the timeout is reached
182      * before the other threads complete, the execution of this
183      * task completes with an exception.
184      *
185      * Note that existing threads continue to run.
186      *
187      * @param timeout timeout in milliseconds.
188      */

189     public void setTimeout(long timeout) {
190         this.timeout = timeout;
191     }
192
193
194
195     /**
196      * Execute the parallel tasks
197      *
198      * @exception BuildException if any of the threads failed.
199      */

200     public void execute() throws BuildException {
201         updateThreadCounts();
202         if (numThreads == 0) {
203             numThreads = nestedTasks.size();
204         }
205         spinThreads();
206     }
207
208     /**
209      * Determine the number of threads based on the number of processors
210      */

211     private void updateThreadCounts() {
212         if (numThreadsPerProcessor != 0) {
213             int numProcessors = getNumProcessors();
214             if (numProcessors != 0) {
215                 numThreads = numProcessors * numThreadsPerProcessor;
216             }
217         }
218     }
219
220     private void processExceptions(TaskRunnable[] runnables) {
221         if (runnables == null) {
222             return;
223         }
224         for (int i = 0; i < runnables.length; ++i) {
225             Throwable JavaDoc t = runnables[i].getException();
226             if (t != null) {
227                 numExceptions++;
228                 if (firstException == null) {
229                     firstException = t;
230                 }
231                 if (t instanceof BuildException
232                     && firstLocation == Location.UNKNOWN_LOCATION) {
233                     firstLocation = ((BuildException) t).getLocation();
234                 }
235                 exceptionMessage.append(StringUtils.LINE_SEP);
236                 exceptionMessage.append(t.getMessage());
237             }
238         }
239     }
240
241     /**
242      * Spin up required threads with a maximum number active at any given time.
243      *
244      * @exception BuildException if any of the threads failed.
245      */

246     private void spinThreads() throws BuildException {
247         final int numTasks = nestedTasks.size();
248         TaskRunnable[] runnables = new TaskRunnable[numTasks];
249         stillRunning = true;
250         timedOut = false;
251
252         int threadNumber = 0;
253         for (Enumeration JavaDoc e = nestedTasks.elements(); e.hasMoreElements();
254              threadNumber++) {
255             Task nestedTask = (Task) e.nextElement();
256             runnables[threadNumber]
257                 = new TaskRunnable(nestedTask);
258         }
259
260         final int maxRunning = numTasks < numThreads ? numTasks : numThreads;
261         TaskRunnable[] running = new TaskRunnable[maxRunning];
262
263         threadNumber = 0;
264         ThreadGroup JavaDoc group = new ThreadGroup JavaDoc("parallel");
265
266         TaskRunnable[] daemons = null;
267         if (daemonTasks != null && daemonTasks.tasks.size() != 0) {
268             daemons = new TaskRunnable[daemonTasks.tasks.size()];
269         }
270
271         synchronized (semaphore) {
272             // When we leave this block we can be sure all data is really
273
// stored in main memory before the new threads start, the new
274
// threads will for sure load the data from main memory.
275
//
276
// This probably is slightly paranoid.
277
}
278
279         synchronized (semaphore) {
280             // start any daemon threads
281
if (daemons != null) {
282                 for (int i = 0; i < daemons.length; ++i) {
283                     daemons[i] = new TaskRunnable((Task) daemonTasks.tasks.get(i));
284                     Thread JavaDoc daemonThread = new Thread JavaDoc(group, daemons[i]);
285                     daemonThread.setDaemon(true);
286                     daemonThread.start();
287                 }
288             }
289
290             // now run main threads in limited numbers...
291
// start initial batch of threads
292
for (int i = 0; i < maxRunning; ++i) {
293                 running[i] = runnables[threadNumber++];
294                 Thread JavaDoc thread = new Thread JavaDoc(group, running[i]);
295                 thread.start();
296             }
297
298             if (timeout != 0) {
299                 // start the timeout thread
300
Thread JavaDoc timeoutThread = new Thread JavaDoc() {
301                     public synchronized void run() {
302                         try {
303                             wait(timeout);
304                             synchronized (semaphore) {
305                                 stillRunning = false;
306                                 timedOut = true;
307                                 semaphore.notifyAll();
308                             }
309                         } catch (InterruptedException JavaDoc e) {
310                             // ignore
311
}
312                     }
313                 };
314                 timeoutThread.start();
315             }
316
317             // now find available running slots for the remaining threads
318
outer:
319             while (threadNumber < numTasks && stillRunning) {
320                 for (int i = 0; i < maxRunning; i++) {
321                     if (running[i] == null || running[i].isFinished()) {
322                         running[i] = runnables[threadNumber++];
323                         Thread JavaDoc thread = new Thread JavaDoc(group, running[i]);
324                         thread.start();
325                         // continue on outer while loop to get another
326
// available slot
327
continue outer;
328                     }
329                 }
330
331                 // if we got here all slots in use, so sleep until
332
// something happens
333
try {
334                     semaphore.wait();
335                 } catch (InterruptedException JavaDoc ie) {
336                     // doesn't java know interruptions are rude?
337
// just pretend it didn't happen and go about out business.
338
// sheesh!
339
}
340             }
341
342             // are all threads finished
343
outer2:
344             while (stillRunning) {
345                 for (int i = 0; i < maxRunning; ++i) {
346                     if (running[i] != null && !running[i].isFinished()) {
347                         //System.out.println("Thread " + i + " is still alive ");
348
// still running - wait for it
349
try {
350                             semaphore.wait();
351                         } catch (InterruptedException JavaDoc ie) {
352                             // who would interrupt me at a time like this?
353
}
354                         continue outer2;
355                     }
356                 }
357                 stillRunning = false;
358             }
359         }
360
361         if (timedOut) {
362             throw new BuildException("Parallel execution timed out");
363         }
364
365         // now did any of the threads throw an exception
366
exceptionMessage = new StringBuffer JavaDoc();
367         numExceptions = 0;
368         firstException = null;
369         firstLocation = Location.UNKNOWN_LOCATION;
370         processExceptions(daemons);
371         processExceptions(runnables);
372
373         if (numExceptions == 1) {
374             if (firstException instanceof BuildException) {
375                 throw (BuildException) firstException;
376             } else {
377                 throw new BuildException(firstException);
378             }
379         } else if (numExceptions > 1) {
380             throw new BuildException(exceptionMessage.toString(),
381                                      firstLocation);
382         }
383     }
384
385     /**
386      * Determine the number of processors. Only effective on later VMs
387      *
388      * @return the number of processors available or 0 if not determinable.
389      */

390     private int getNumProcessors() {
391         try {
392             Class JavaDoc[] paramTypes = {};
393             Method JavaDoc availableProcessors =
394                 Runtime JavaDoc.class.getMethod("availableProcessors", paramTypes);
395
396             Object JavaDoc[] args = {};
397             Integer JavaDoc ret = (Integer JavaDoc) availableProcessors.invoke(Runtime.getRuntime(), args);
398             return ret.intValue();
399         } catch (Exception JavaDoc e) {
400             // return a bogus number
401
return 0;
402         }
403     }
404
405     /**
406      * thread that execs a task
407      */

408     private class TaskRunnable implements Runnable JavaDoc {
409         private Throwable JavaDoc exception;
410         private Task task;
411         private boolean finished;
412
413         /**
414          * Construct a new TaskRunnable.<p>
415          *
416          * @param task the Task to be executed in a separate thread
417          */

418         TaskRunnable(Task task) {
419             this.task = task;
420         }
421
422         /**
423          * Executes the task within a thread and takes care about
424          * Exceptions raised within the task.
425          */

426         public void run() {
427             try {
428                 task.perform();
429             } catch (Throwable JavaDoc t) {
430                 exception = t;
431                 if (failOnAny) {
432                     stillRunning = false;
433                 }
434             } finally {
435                 synchronized (semaphore) {
436                     finished = true;
437                     semaphore.notifyAll();
438                 }
439             }
440         }
441
442         /**
443          * get any exception that got thrown during execution;
444          * @return an exception or null for no exception/not yet finished
445          */

446         public Throwable JavaDoc getException() {
447             return exception;
448         }
449
450         /**
451          * Provides the indicator that the task has been finished.
452          * @return Returns true when the task is finished.
453          */

454         boolean isFinished() {
455             return finished;
456         }
457     }
458
459 }
460
Popular Tags