33 // for output. Can be used blocking or asynchronously. |
33 // for output. Can be used blocking or asynchronously. |
34 |
34 |
35 public class Queue<T> implements ExceptionallyCloseable { |
35 public class Queue<T> implements ExceptionallyCloseable { |
36 |
36 |
37 private final LinkedList<T> q = new LinkedList<>(); |
37 private final LinkedList<T> q = new LinkedList<>(); |
38 private volatile boolean closed = false; |
38 private boolean closed = false; |
39 private volatile Throwable exception = null; |
39 private boolean closing = false; |
|
40 private Throwable exception = null; |
40 private Runnable callback; |
41 private Runnable callback; |
41 private boolean callbackDisabled = false; |
42 private boolean callbackDisabled = false; |
42 private int waiters; // true if someone waiting |
43 private int waiters; // true if someone waiting |
|
44 private final T closeSentinel; |
|
45 |
|
46 Queue(T closeSentinel) { |
|
47 this.closeSentinel = closeSentinel; |
|
48 } |
43 |
49 |
44 public synchronized int size() { |
50 public synchronized int size() { |
45 return q.size(); |
51 return q.size(); |
46 } |
52 } |
47 |
53 |
48 // public synchronized boolean tryPut(T obj) throws IOException { |
|
49 // if (closed) return false; |
|
50 // put(obj); |
|
51 // return true; |
|
52 // } |
|
53 |
|
54 public synchronized void put(T obj) throws IOException { |
54 public synchronized void put(T obj) throws IOException { |
55 if (closed) { |
55 if (closed || closing) { |
56 throw new IOException("stream closed"); |
56 throw new IOException("stream closed"); |
57 } |
57 } |
58 |
58 |
59 q.add(obj); |
59 q.add(obj); |
60 |
60 |
71 // dangerous and may lead to deadlocks. |
71 // dangerous and may lead to deadlocks. |
72 callback.run(); |
72 callback.run(); |
73 } |
73 } |
74 } |
74 } |
75 |
75 |
76 // public synchronized void disableCallback() { |
76 // Other close() variants are immediate and abortive |
77 // callbackDisabled = true; |
77 // This allows whatever is on Q to be processed first. |
78 // } |
|
79 |
78 |
80 // public synchronized void enableCallback() { |
79 public synchronized void orderlyClose() { |
81 // callbackDisabled = false; |
80 if (closing || closed) |
82 // while (q.size() > 0) { |
81 return; |
83 // callback.run(); |
82 try { |
84 // } |
83 put(closeSentinel); |
85 // } |
84 } catch (IOException e) { |
86 |
85 e.printStackTrace(); |
87 // /** |
86 } |
88 // * callback is invoked any time put is called where |
87 closing = true; |
89 // * the Queue was empty. |
88 } |
90 // */ |
|
91 // public synchronized void registerPutCallback(Runnable callback) { |
|
92 // Objects.requireNonNull(callback); |
|
93 // this.callback = callback; |
|
94 // if (q.size() > 0) { |
|
95 // // Note: calling callback while holding the lock is |
|
96 // // dangerous and may lead to deadlocks. |
|
97 // callback.run(); |
|
98 // } |
|
99 // } |
|
100 |
89 |
101 @Override |
90 @Override |
102 public synchronized void close() { |
91 public synchronized void close() { |
103 closed = true; |
92 closed = true; |
104 notifyAll(); |
93 notifyAll(); |
144 } |
138 } |
145 |
139 |
146 if (q.isEmpty()) { |
140 if (q.isEmpty()) { |
147 return null; |
141 return null; |
148 } |
142 } |
149 T res = q.removeFirst(); |
143 return take(); |
150 return res; |
|
151 } |
144 } |
152 |
|
153 // public synchronized T[] pollAll(T[] type) throws IOException { |
|
154 // T[] ret = q.toArray(type); |
|
155 // q.clear(); |
|
156 // return ret; |
|
157 // } |
|
158 |
|
159 // public synchronized void pushback(T v) { |
|
160 // q.addFirst(v); |
|
161 // } |
|
162 |
|
163 // public synchronized void pushbackAll(T[] v) { |
|
164 // for (int i=v.length-1; i>=0; i--) { |
|
165 // q.addFirst(v[i]); |
|
166 // } |
|
167 // } |
|
168 |
145 |
169 private IOException newIOException(String msg) { |
146 private IOException newIOException(String msg) { |
170 if (exception == null) { |
147 if (exception == null) { |
171 return new IOException(msg); |
148 return new IOException(msg); |
172 } else { |
149 } else { |
173 return new IOException(msg, exception); |
150 return new IOException(msg, exception); |
174 } |
151 } |
175 } |
152 } |
176 |
|
177 } |
153 } |