Code - Class EDU.oswego.cs.dl.util.concurrent.QueuedSemaphore


1 /*
2   File: QueuedSemaphore.java
3
4   Originally written by Doug Lea and released into the public domain.
5   This may be used for any purposes whatsoever without acknowledgment.
6   Thanks for the assistance and support of Sun Microsystems Labs,
7   and everyone contributing, testing, and using this code.
8
9   History:
10   Date Who What
11   11Jun1998 dl Create public version
12    5Aug1998 dl replaced int counters with longs
13   24Aug1999 dl release(n): screen arguments
14 */

15
16
17 package EDU.oswego.cs.dl.util.concurrent;
18
19 /**
20  * Abstract base class for semaphores relying on queued wait nodes.
21  * <p>[<a HREF="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
22 **/

23
24
25 public abstract class QueuedSemaphore extends Semaphore {
26   
27   protected final WaitQueue wq_;
28
29   QueuedSemaphore(WaitQueue q, long initialPermits) {
30     super(initialPermits);
31     wq_ = q;
32   }
33
34   public void acquire() throws InterruptedException {
35     if (Thread.interrupted()) throw new InterruptedException();
36     if (precheck()) return;
37     WaitQueue.WaitNode w = new WaitQueue.WaitNode();
38     w.doWait(this);
39   }
40
41   public boolean attempt(long msecs) throws InterruptedException {
42     if (Thread.interrupted()) throw new InterruptedException();
43     if (precheck()) return true;
44     if (msecs <= 0) return false;
45
46     WaitQueue.WaitNode w = new WaitQueue.WaitNode();
47     return w.doTimedWait(this, msecs);
48   }
49
50   protected synchronized boolean precheck() {
51     boolean pass = (permits_ > 0);
52     if (pass) --permits_;
53     return pass;
54   }
55
56   protected synchronized boolean recheck(WaitQueue.WaitNode w) {
57     boolean pass = (permits_ > 0);
58     if (pass) --permits_;
59     else wq_.insert(w);
60     return pass;
61   }
62
63
64   protected synchronized WaitQueue.WaitNode getSignallee() {
65     WaitQueue.WaitNode w = wq_.extract();
66     if (w == null) ++permits_; // if none, inc permits for new arrivals
67
return w;
68   }
69
70   public void release() {
71     for (;;) {
72       WaitQueue.WaitNode w = getSignallee();
73       if (w == null) return; // no one to signal
74
if (w.signal()) return; // notify if still waiting, else skip
75
}
76   }
77
78   /** Release N permits **/
79   public void release(long n) {
80     if (n < 0) throw new IllegalArgumentException("Negative argument");
81
82     for (long i = 0; i < n; ++i) release();
83   }
84
85   /**
86    * Base class for internal queue classes for semaphores, etc.
87    * Relies on subclasses to actually implement queue mechanics
88    **/

89
90   protected static abstract class WaitQueue {
91
92     protected abstract void insert(WaitNode w);// assumed not to block
93
protected abstract WaitNode extract(); // should return null if empty
94

95     protected static class WaitNode {
96       boolean waiting = true;
97       WaitNode next = null;
98
99       protected synchronized boolean signal() {
100         boolean signalled = waiting;
101         if (signalled) {
102           waiting = false;
103           notify();
104         }
105         return signalled;
106       }
107
108       protected synchronized boolean doTimedWait(QueuedSemaphore sem,
109                                                  long msecs)
110         throws InterruptedException {
111         if (sem.recheck(this) || !waiting)
112           return true;
113         else if (msecs <= 0) {
114           waiting = false;
115           return false;
116         }
117         else {
118           long waitTime = msecs;
119           long start = System.currentTimeMillis();
120
121           try {
122             for (;;) {
123               wait(waitTime);
124               if (!waiting) // definitely signalled
125
return true;
126               else {
127                 waitTime = msecs - (System.currentTimeMillis() - start);
128                 if (waitTime <= 0) { // timed out
129
waiting = false;
130                   return false;
131                 }
132               }
133             }
134           }
135           catch(InterruptedException ex) {
136             if (waiting) { // no notification
137
waiting = false; // invalidate for the signaller
138
throw ex;
139             }
140             else { // thread was interrupted after it was notified
141
Thread.currentThread().interrupt();
142               return true;
143             }
144           }
145         }
146       }
147
148       protected synchronized void doWait(QueuedSemaphore sem)
149         throws InterruptedException {
150         if (!sem.recheck(this)) {
151           try {
152             while (waiting) wait();
153           }
154           catch(InterruptedException ex) {
155             if (waiting) { // no notification
156
waiting = false; // invalidate for the signaller
157
throw ex;
158             }
159             else { // thread was interrupted after it was notified
160
Thread.currentThread().interrupt();
161               return;
162             }
163           }
164         }
165       }
166     }
167
168   }
169
170
171 }
172

Java API By Example, From Geeks To Geeks. | Conditions of Use | About Us © 2002 - 2005, KickJava.com, or its affiliates