47 * operations obtain elements at the head of the queue. |
47 * operations obtain elements at the head of the queue. |
48 * |
48 * |
49 * <p>This is a classic "bounded buffer", in which a |
49 * <p>This is a classic "bounded buffer", in which a |
50 * fixed-sized array holds elements inserted by producers and |
50 * fixed-sized array holds elements inserted by producers and |
51 * extracted by consumers. Once created, the capacity cannot be |
51 * extracted by consumers. Once created, the capacity cannot be |
52 * increased. Attempts to <tt>put</tt> an element into a full queue |
52 * changed. Attempts to {@code put} an element into a full queue |
53 * will result in the operation blocking; attempts to <tt>take</tt> an |
53 * will result in the operation blocking; attempts to {@code take} an |
54 * element from an empty queue will similarly block. |
54 * element from an empty queue will similarly block. |
55 * |
55 * |
56 * <p> This class supports an optional fairness policy for ordering |
56 * <p>This class supports an optional fairness policy for ordering |
57 * waiting producer and consumer threads. By default, this ordering |
57 * waiting producer and consumer threads. By default, this ordering |
58 * is not guaranteed. However, a queue constructed with fairness set |
58 * is not guaranteed. However, a queue constructed with fairness set |
59 * to <tt>true</tt> grants threads access in FIFO order. Fairness |
59 * to {@code true} grants threads access in FIFO order. Fairness |
60 * generally decreases throughput but reduces variability and avoids |
60 * generally decreases throughput but reduces variability and avoids |
61 * starvation. |
61 * starvation. |
62 * |
62 * |
63 * <p>This class and its iterator implement all of the |
63 * <p>This class and its iterator implement all of the |
64 * <em>optional</em> methods of the {@link Collection} and {@link |
64 * <em>optional</em> methods of the {@link Collection} and {@link |
81 * it is empty. Otherwise it could not be declared final, which is |
81 * it is empty. Otherwise it could not be declared final, which is |
82 * necessary here. |
82 * necessary here. |
83 */ |
83 */ |
84 private static final long serialVersionUID = -817911632652898426L; |
84 private static final long serialVersionUID = -817911632652898426L; |
85 |
85 |
86 /** The queued items */ |
86 /** The queued items */ |
87 private final E[] items; |
87 final Object[] items; |
88 /** items index for next take, poll or remove */ |
88 |
89 private int takeIndex; |
89 /** items index for next take, poll, peek or remove */ |
90 /** items index for next put, offer, or add. */ |
90 int takeIndex; |
91 private int putIndex; |
91 |
92 /** Number of items in the queue */ |
92 /** items index for next put, offer, or add */ |
93 private int count; |
93 int putIndex; |
|
94 |
|
95 /** Number of elements in the queue */ |
|
96 int count; |
94 |
97 |
95 /* |
98 /* |
96 * Concurrency control uses the classic two-condition algorithm |
99 * Concurrency control uses the classic two-condition algorithm |
97 * found in any textbook. |
100 * found in any textbook. |
98 */ |
101 */ |
99 |
102 |
100 /** Main lock guarding all access */ |
103 /** Main lock guarding all access */ |
101 private final ReentrantLock lock; |
104 final ReentrantLock lock; |
102 /** Condition for waiting takes */ |
105 /** Condition for waiting takes */ |
103 private final Condition notEmpty; |
106 private final Condition notEmpty; |
104 /** Condition for waiting puts */ |
107 /** Condition for waiting puts */ |
105 private final Condition notFull; |
108 private final Condition notFull; |
106 |
109 |
127 /** |
159 /** |
128 * Extracts element at current take position, advances, and signals. |
160 * Extracts element at current take position, advances, and signals. |
129 * Call only when holding lock. |
161 * Call only when holding lock. |
130 */ |
162 */ |
131 private E extract() { |
163 private E extract() { |
132 final E[] items = this.items; |
164 final Object[] items = this.items; |
133 E x = items[takeIndex]; |
165 E x = this.<E>cast(items[takeIndex]); |
134 items[takeIndex] = null; |
166 items[takeIndex] = null; |
135 takeIndex = inc(takeIndex); |
167 takeIndex = inc(takeIndex); |
136 --count; |
168 --count; |
137 notFull.signal(); |
169 notFull.signal(); |
138 return x; |
170 return x; |
139 } |
171 } |
140 |
172 |
141 /** |
173 /** |
142 * Utility for remove and iterator.remove: Delete item at position i. |
174 * Deletes item at position i. |
|
175 * Utility for remove and iterator.remove. |
143 * Call only when holding lock. |
176 * Call only when holding lock. |
144 */ |
177 */ |
145 void removeAt(int i) { |
178 void removeAt(int i) { |
146 final E[] items = this.items; |
179 final Object[] items = this.items; |
147 // if removing front item, just advance |
180 // if removing front item, just advance |
148 if (i == takeIndex) { |
181 if (i == takeIndex) { |
149 items[takeIndex] = null; |
182 items[takeIndex] = null; |
150 takeIndex = inc(takeIndex); |
183 takeIndex = inc(takeIndex); |
151 } else { |
184 } else { |
165 --count; |
198 --count; |
166 notFull.signal(); |
199 notFull.signal(); |
167 } |
200 } |
168 |
201 |
169 /** |
202 /** |
170 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed) |
203 * Creates an {@code ArrayBlockingQueue} with the given (fixed) |
171 * capacity and default access policy. |
204 * capacity and default access policy. |
172 * |
205 * |
173 * @param capacity the capacity of this queue |
206 * @param capacity the capacity of this queue |
174 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1 |
207 * @throws IllegalArgumentException if {@code capacity < 1} |
175 */ |
208 */ |
176 public ArrayBlockingQueue(int capacity) { |
209 public ArrayBlockingQueue(int capacity) { |
177 this(capacity, false); |
210 this(capacity, false); |
178 } |
211 } |
179 |
212 |
180 /** |
213 /** |
181 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed) |
214 * Creates an {@code ArrayBlockingQueue} with the given (fixed) |
182 * capacity and the specified access policy. |
215 * capacity and the specified access policy. |
183 * |
216 * |
184 * @param capacity the capacity of this queue |
217 * @param capacity the capacity of this queue |
185 * @param fair if <tt>true</tt> then queue accesses for threads blocked |
218 * @param fair if {@code true} then queue accesses for threads blocked |
186 * on insertion or removal, are processed in FIFO order; |
219 * on insertion or removal, are processed in FIFO order; |
187 * if <tt>false</tt> the access order is unspecified. |
220 * if {@code false} the access order is unspecified. |
188 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1 |
221 * @throws IllegalArgumentException if {@code capacity < 1} |
189 */ |
222 */ |
190 public ArrayBlockingQueue(int capacity, boolean fair) { |
223 public ArrayBlockingQueue(int capacity, boolean fair) { |
191 if (capacity <= 0) |
224 if (capacity <= 0) |
192 throw new IllegalArgumentException(); |
225 throw new IllegalArgumentException(); |
193 this.items = (E[]) new Object[capacity]; |
226 this.items = new Object[capacity]; |
194 lock = new ReentrantLock(fair); |
227 lock = new ReentrantLock(fair); |
195 notEmpty = lock.newCondition(); |
228 notEmpty = lock.newCondition(); |
196 notFull = lock.newCondition(); |
229 notFull = lock.newCondition(); |
197 } |
230 } |
198 |
231 |
199 /** |
232 /** |
200 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed) |
233 * Creates an {@code ArrayBlockingQueue} with the given (fixed) |
201 * capacity, the specified access policy and initially containing the |
234 * capacity, the specified access policy and initially containing the |
202 * elements of the given collection, |
235 * elements of the given collection, |
203 * added in traversal order of the collection's iterator. |
236 * added in traversal order of the collection's iterator. |
204 * |
237 * |
205 * @param capacity the capacity of this queue |
238 * @param capacity the capacity of this queue |
206 * @param fair if <tt>true</tt> then queue accesses for threads blocked |
239 * @param fair if {@code true} then queue accesses for threads blocked |
207 * on insertion or removal, are processed in FIFO order; |
240 * on insertion or removal, are processed in FIFO order; |
208 * if <tt>false</tt> the access order is unspecified. |
241 * if {@code false} the access order is unspecified. |
209 * @param c the collection of elements to initially contain |
242 * @param c the collection of elements to initially contain |
210 * @throws IllegalArgumentException if <tt>capacity</tt> is less than |
243 * @throws IllegalArgumentException if {@code capacity} is less than |
211 * <tt>c.size()</tt>, or less than 1. |
244 * {@code c.size()}, or less than 1. |
212 * @throws NullPointerException if the specified collection or any |
245 * @throws NullPointerException if the specified collection or any |
213 * of its elements are null |
246 * of its elements are null |
214 */ |
247 */ |
215 public ArrayBlockingQueue(int capacity, boolean fair, |
248 public ArrayBlockingQueue(int capacity, boolean fair, |
216 Collection<? extends E> c) { |
249 Collection<? extends E> c) { |
217 this(capacity, fair); |
250 this(capacity, fair); |
218 if (capacity < c.size()) |
251 |
219 throw new IllegalArgumentException(); |
252 final ReentrantLock lock = this.lock; |
220 |
253 lock.lock(); // Lock only for visibility, not mutual exclusion |
221 for (E e : c) |
254 try { |
222 add(e); |
255 int i = 0; |
|
256 try { |
|
257 for (E e : c) { |
|
258 checkNotNull(e); |
|
259 items[i++] = e; |
|
260 } |
|
261 } catch (ArrayIndexOutOfBoundsException ex) { |
|
262 throw new IllegalArgumentException(); |
|
263 } |
|
264 count = i; |
|
265 putIndex = (i == capacity) ? 0 : i; |
|
266 } finally { |
|
267 lock.unlock(); |
|
268 } |
223 } |
269 } |
224 |
270 |
225 /** |
271 /** |
226 * Inserts the specified element at the tail of this queue if it is |
272 * Inserts the specified element at the tail of this queue if it is |
227 * possible to do so immediately without exceeding the queue's capacity, |
273 * possible to do so immediately without exceeding the queue's capacity, |
228 * returning <tt>true</tt> upon success and throwing an |
274 * returning {@code true} upon success and throwing an |
229 * <tt>IllegalStateException</tt> if this queue is full. |
275 * {@code IllegalStateException} if this queue is full. |
230 * |
276 * |
231 * @param e the element to add |
277 * @param e the element to add |
232 * @return <tt>true</tt> (as specified by {@link Collection#add}) |
278 * @return {@code true} (as specified by {@link Collection#add}) |
233 * @throws IllegalStateException if this queue is full |
279 * @throws IllegalStateException if this queue is full |
234 * @throws NullPointerException if the specified element is null |
280 * @throws NullPointerException if the specified element is null |
235 */ |
281 */ |
236 public boolean add(E e) { |
282 public boolean add(E e) { |
237 return super.add(e); |
283 return super.add(e); |
238 } |
284 } |
239 |
285 |
240 /** |
286 /** |
241 * Inserts the specified element at the tail of this queue if it is |
287 * Inserts the specified element at the tail of this queue if it is |
242 * possible to do so immediately without exceeding the queue's capacity, |
288 * possible to do so immediately without exceeding the queue's capacity, |
243 * returning <tt>true</tt> upon success and <tt>false</tt> if this queue |
289 * returning {@code true} upon success and {@code false} if this queue |
244 * is full. This method is generally preferable to method {@link #add}, |
290 * is full. This method is generally preferable to method {@link #add}, |
245 * which can fail to insert an element only by throwing an exception. |
291 * which can fail to insert an element only by throwing an exception. |
246 * |
292 * |
247 * @throws NullPointerException if the specified element is null |
293 * @throws NullPointerException if the specified element is null |
248 */ |
294 */ |
249 public boolean offer(E e) { |
295 public boolean offer(E e) { |
250 if (e == null) throw new NullPointerException(); |
296 checkNotNull(e); |
251 final ReentrantLock lock = this.lock; |
297 final ReentrantLock lock = this.lock; |
252 lock.lock(); |
298 lock.lock(); |
253 try { |
299 try { |
254 if (count == items.length) |
300 if (count == items.length) |
255 return false; |
301 return false; |
297 * @throws NullPointerException {@inheritDoc} |
337 * @throws NullPointerException {@inheritDoc} |
298 */ |
338 */ |
299 public boolean offer(E e, long timeout, TimeUnit unit) |
339 public boolean offer(E e, long timeout, TimeUnit unit) |
300 throws InterruptedException { |
340 throws InterruptedException { |
301 |
341 |
302 if (e == null) throw new NullPointerException(); |
342 checkNotNull(e); |
303 long nanos = unit.toNanos(timeout); |
343 long nanos = unit.toNanos(timeout); |
304 final ReentrantLock lock = this.lock; |
344 final ReentrantLock lock = this.lock; |
305 lock.lockInterruptibly(); |
345 lock.lockInterruptibly(); |
306 try { |
346 try { |
307 for (;;) { |
347 while (count == items.length) { |
308 if (count != items.length) { |
|
309 insert(e); |
|
310 return true; |
|
311 } |
|
312 if (nanos <= 0) |
348 if (nanos <= 0) |
313 return false; |
349 return false; |
314 try { |
350 nanos = notFull.awaitNanos(nanos); |
315 nanos = notFull.awaitNanos(nanos); |
351 } |
316 } catch (InterruptedException ie) { |
352 insert(e); |
317 notFull.signal(); // propagate to non-interrupted thread |
353 return true; |
318 throw ie; |
|
319 } |
|
320 } |
|
321 } finally { |
354 } finally { |
322 lock.unlock(); |
355 lock.unlock(); |
323 } |
356 } |
324 } |
357 } |
325 |
358 |
326 public E poll() { |
359 public E poll() { |
327 final ReentrantLock lock = this.lock; |
360 final ReentrantLock lock = this.lock; |
328 lock.lock(); |
361 lock.lock(); |
329 try { |
362 try { |
330 if (count == 0) |
363 return (count == 0) ? null : extract(); |
331 return null; |
|
332 E x = extract(); |
|
333 return x; |
|
334 } finally { |
364 } finally { |
335 lock.unlock(); |
365 lock.unlock(); |
336 } |
366 } |
337 } |
367 } |
338 |
368 |
339 public E take() throws InterruptedException { |
369 public E take() throws InterruptedException { |
340 final ReentrantLock lock = this.lock; |
370 final ReentrantLock lock = this.lock; |
341 lock.lockInterruptibly(); |
371 lock.lockInterruptibly(); |
342 try { |
372 try { |
343 try { |
373 while (count == 0) |
344 while (count == 0) |
374 notEmpty.await(); |
345 notEmpty.await(); |
375 return extract(); |
346 } catch (InterruptedException ie) { |
|
347 notEmpty.signal(); // propagate to non-interrupted thread |
|
348 throw ie; |
|
349 } |
|
350 E x = extract(); |
|
351 return x; |
|
352 } finally { |
376 } finally { |
353 lock.unlock(); |
377 lock.unlock(); |
354 } |
378 } |
355 } |
379 } |
356 |
380 |
357 public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
381 public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
358 long nanos = unit.toNanos(timeout); |
382 long nanos = unit.toNanos(timeout); |
359 final ReentrantLock lock = this.lock; |
383 final ReentrantLock lock = this.lock; |
360 lock.lockInterruptibly(); |
384 lock.lockInterruptibly(); |
361 try { |
385 try { |
362 for (;;) { |
386 while (count == 0) { |
363 if (count != 0) { |
|
364 E x = extract(); |
|
365 return x; |
|
366 } |
|
367 if (nanos <= 0) |
387 if (nanos <= 0) |
368 return null; |
388 return null; |
369 try { |
389 nanos = notEmpty.awaitNanos(nanos); |
370 nanos = notEmpty.awaitNanos(nanos); |
390 } |
371 } catch (InterruptedException ie) { |
391 return extract(); |
372 notEmpty.signal(); // propagate to non-interrupted thread |
|
373 throw ie; |
|
374 } |
|
375 |
|
376 } |
|
377 } finally { |
392 } finally { |
378 lock.unlock(); |
393 lock.unlock(); |
379 } |
394 } |
380 } |
395 } |
381 |
396 |
382 public E peek() { |
397 public E peek() { |
383 final ReentrantLock lock = this.lock; |
398 final ReentrantLock lock = this.lock; |
384 lock.lock(); |
399 lock.lock(); |
385 try { |
400 try { |
386 return (count == 0) ? null : items[takeIndex]; |
401 return (count == 0) ? null : itemAt(takeIndex); |
387 } finally { |
402 } finally { |
388 lock.unlock(); |
403 lock.unlock(); |
389 } |
404 } |
390 } |
405 } |
391 |
406 |
429 } |
444 } |
430 } |
445 } |
431 |
446 |
432 /** |
447 /** |
433 * Removes a single instance of the specified element from this queue, |
448 * Removes a single instance of the specified element from this queue, |
434 * if it is present. More formally, removes an element <tt>e</tt> such |
449 * if it is present. More formally, removes an element {@code e} such |
435 * that <tt>o.equals(e)</tt>, if this queue contains one or more such |
450 * that {@code o.equals(e)}, if this queue contains one or more such |
436 * elements. |
451 * elements. |
437 * Returns <tt>true</tt> if this queue contained the specified element |
452 * Returns {@code true} if this queue contained the specified element |
438 * (or equivalently, if this queue changed as a result of the call). |
453 * (or equivalently, if this queue changed as a result of the call). |
439 * |
454 * |
|
455 * <p>Removal of interior elements in circular array based queues |
|
456 * is an intrinsically slow and disruptive operation, so should |
|
457 * be undertaken only in exceptional circumstances, ideally |
|
458 * only when the queue is known not to be accessible by other |
|
459 * threads. |
|
460 * |
440 * @param o element to be removed from this queue, if present |
461 * @param o element to be removed from this queue, if present |
441 * @return <tt>true</tt> if this queue changed as a result of the call |
462 * @return {@code true} if this queue changed as a result of the call |
442 */ |
463 */ |
443 public boolean remove(Object o) { |
464 public boolean remove(Object o) { |
444 if (o == null) return false; |
465 if (o == null) return false; |
445 final E[] items = this.items; |
466 final Object[] items = this.items; |
446 final ReentrantLock lock = this.lock; |
467 final ReentrantLock lock = this.lock; |
447 lock.lock(); |
468 lock.lock(); |
448 try { |
469 try { |
449 int i = takeIndex; |
470 for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { |
450 int k = 0; |
|
451 for (;;) { |
|
452 if (k++ >= count) |
|
453 return false; |
|
454 if (o.equals(items[i])) { |
471 if (o.equals(items[i])) { |
455 removeAt(i); |
472 removeAt(i); |
456 return true; |
473 return true; |
457 } |
474 } |
458 i = inc(i); |
475 } |
459 } |
476 return false; |
460 |
477 } finally { |
461 } finally { |
478 lock.unlock(); |
462 lock.unlock(); |
479 } |
463 } |
480 } |
464 } |
481 |
465 |
482 /** |
466 /** |
483 * Returns {@code true} if this queue contains the specified element. |
467 * Returns <tt>true</tt> if this queue contains the specified element. |
484 * More formally, returns {@code true} if and only if this queue contains |
468 * More formally, returns <tt>true</tt> if and only if this queue contains |
485 * at least one element {@code e} such that {@code o.equals(e)}. |
469 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>. |
|
470 * |
486 * |
471 * @param o object to be checked for containment in this queue |
487 * @param o object to be checked for containment in this queue |
472 * @return <tt>true</tt> if this queue contains the specified element |
488 * @return {@code true} if this queue contains the specified element |
473 */ |
489 */ |
474 public boolean contains(Object o) { |
490 public boolean contains(Object o) { |
475 if (o == null) return false; |
491 if (o == null) return false; |
476 final E[] items = this.items; |
492 final Object[] items = this.items; |
477 final ReentrantLock lock = this.lock; |
493 final ReentrantLock lock = this.lock; |
478 lock.lock(); |
494 lock.lock(); |
479 try { |
495 try { |
480 int i = takeIndex; |
496 for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) |
481 int k = 0; |
|
482 while (k++ < count) { |
|
483 if (o.equals(items[i])) |
497 if (o.equals(items[i])) |
484 return true; |
498 return true; |
485 i = inc(i); |
|
486 } |
|
487 return false; |
499 return false; |
488 } finally { |
500 } finally { |
489 lock.unlock(); |
501 lock.unlock(); |
490 } |
502 } |
491 } |
503 } |
529 * runtime type of the specified array and the size of this queue. |
538 * runtime type of the specified array and the size of this queue. |
530 * |
539 * |
531 * <p>If this queue fits in the specified array with room to spare |
540 * <p>If this queue fits in the specified array with room to spare |
532 * (i.e., the array has more elements than this queue), the element in |
541 * (i.e., the array has more elements than this queue), the element in |
533 * the array immediately following the end of the queue is set to |
542 * the array immediately following the end of the queue is set to |
534 * <tt>null</tt>. |
543 * {@code null}. |
535 * |
544 * |
536 * <p>Like the {@link #toArray()} method, this method acts as bridge between |
545 * <p>Like the {@link #toArray()} method, this method acts as bridge between |
537 * array-based and collection-based APIs. Further, this method allows |
546 * array-based and collection-based APIs. Further, this method allows |
538 * precise control over the runtime type of the output array, and may, |
547 * precise control over the runtime type of the output array, and may, |
539 * under certain circumstances, be used to save allocation costs. |
548 * under certain circumstances, be used to save allocation costs. |
540 * |
549 * |
541 * <p>Suppose <tt>x</tt> is a queue known to contain only strings. |
550 * <p>Suppose {@code x} is a queue known to contain only strings. |
542 * The following code can be used to dump the queue into a newly |
551 * The following code can be used to dump the queue into a newly |
543 * allocated array of <tt>String</tt>: |
552 * allocated array of {@code String}: |
544 * |
553 * |
545 * <pre> |
554 * <pre> |
546 * String[] y = x.toArray(new String[0]);</pre> |
555 * String[] y = x.toArray(new String[0]);</pre> |
547 * |
556 * |
548 * Note that <tt>toArray(new Object[0])</tt> is identical in function to |
557 * Note that {@code toArray(new Object[0])} is identical in function to |
549 * <tt>toArray()</tt>. |
558 * {@code toArray()}. |
550 * |
559 * |
551 * @param a the array into which the elements of the queue are to |
560 * @param a the array into which the elements of the queue are to |
552 * be stored, if it is big enough; otherwise, a new array of the |
561 * be stored, if it is big enough; otherwise, a new array of the |
553 * same runtime type is allocated for this purpose |
562 * same runtime type is allocated for this purpose |
554 * @return an array containing all of the elements in this queue |
563 * @return an array containing all of the elements in this queue |
555 * @throws ArrayStoreException if the runtime type of the specified array |
564 * @throws ArrayStoreException if the runtime type of the specified array |
556 * is not a supertype of the runtime type of every element in |
565 * is not a supertype of the runtime type of every element in |
557 * this queue |
566 * this queue |
558 * @throws NullPointerException if the specified array is null |
567 * @throws NullPointerException if the specified array is null |
559 */ |
568 */ |
|
569 @SuppressWarnings("unchecked") |
560 public <T> T[] toArray(T[] a) { |
570 public <T> T[] toArray(T[] a) { |
561 final E[] items = this.items; |
571 final Object[] items = this.items; |
562 final ReentrantLock lock = this.lock; |
572 final ReentrantLock lock = this.lock; |
563 lock.lock(); |
573 lock.lock(); |
564 try { |
574 try { |
565 if (a.length < count) |
575 final int count = this.count; |
|
576 final int len = a.length; |
|
577 if (len < count) |
566 a = (T[])java.lang.reflect.Array.newInstance( |
578 a = (T[])java.lang.reflect.Array.newInstance( |
567 a.getClass().getComponentType(), |
579 a.getClass().getComponentType(), count); |
568 count |
580 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++) |
569 ); |
581 a[k] = (T) items[i]; |
570 |
582 if (len > count) |
571 int k = 0; |
|
572 int i = takeIndex; |
|
573 while (k < count) { |
|
574 a[k++] = (T)items[i]; |
|
575 i = inc(i); |
|
576 } |
|
577 if (a.length > count) |
|
578 a[count] = null; |
583 a[count] = null; |
579 return a; |
584 return a; |
580 } finally { |
585 } finally { |
581 lock.unlock(); |
586 lock.unlock(); |
582 } |
587 } |
688 } finally { |
698 } finally { |
689 lock.unlock(); |
699 lock.unlock(); |
690 } |
700 } |
691 } |
701 } |
692 |
702 |
693 |
|
694 /** |
703 /** |
695 * Returns an iterator over the elements in this queue in proper sequence. |
704 * Returns an iterator over the elements in this queue in proper sequence. |
696 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that |
705 * The elements will be returned in order from first (head) to last (tail). |
697 * will never throw {@link ConcurrentModificationException}, |
706 * |
|
707 * <p>The returned {@code Iterator} is a "weakly consistent" iterator that |
|
708 * will never throw {@link java.util.ConcurrentModificationException |
|
709 * ConcurrentModificationException}, |
698 * and guarantees to traverse elements as they existed upon |
710 * and guarantees to traverse elements as they existed upon |
699 * construction of the iterator, and may (but is not guaranteed to) |
711 * construction of the iterator, and may (but is not guaranteed to) |
700 * reflect any modifications subsequent to construction. |
712 * reflect any modifications subsequent to construction. |
701 * |
713 * |
702 * @return an iterator over the elements in this queue in proper sequence |
714 * @return an iterator over the elements in this queue in proper sequence |
703 */ |
715 */ |
704 public Iterator<E> iterator() { |
716 public Iterator<E> iterator() { |
705 final ReentrantLock lock = this.lock; |
717 return new Itr(); |
706 lock.lock(); |
718 } |
707 try { |
719 |
708 return new Itr(); |
720 /** |
709 } finally { |
721 * Iterator for ArrayBlockingQueue. To maintain weak consistency |
710 lock.unlock(); |
722 * with respect to puts and takes, we (1) read ahead one slot, so |
711 } |
723 * as to not report hasNext true but then not have an element to |
712 } |
724 * return -- however we later recheck this slot to use the most |
713 |
725 * current value; (2) ensure that each array slot is traversed at |
714 /** |
726 * most once (by tracking "remaining" elements); (3) skip over |
715 * Iterator for ArrayBlockingQueue |
727 * null slots, which can occur if takes race ahead of iterators. |
|
728 * However, for circular array-based queues, we cannot rely on any |
|
729 * well established definition of what it means to be weakly |
|
730 * consistent with respect to interior removes since these may |
|
731 * require slot overwrites in the process of sliding elements to |
|
732 * cover gaps. So we settle for resiliency, operating on |
|
733 * established apparent nexts, which may miss some elements that |
|
734 * have moved between calls to next. |
716 */ |
735 */ |
717 private class Itr implements Iterator<E> { |
736 private class Itr implements Iterator<E> { |
718 /** |
737 private int remaining; // Number of elements yet to be returned |
719 * Index of element to be returned by next, |
738 private int nextIndex; // Index of element to be returned by next |
720 * or a negative number if no such. |
739 private E nextItem; // Element to be returned by next call to next |
721 */ |
740 private E lastItem; // Element returned by last call to next |
722 private int nextIndex; |
741 private int lastRet; // Index of last element returned, or -1 if none |
723 |
|
724 /** |
|
725 * nextItem holds on to item fields because once we claim |
|
726 * that an element exists in hasNext(), we must return it in |
|
727 * the following next() call even if it was in the process of |
|
728 * being removed when hasNext() was called. |
|
729 */ |
|
730 private E nextItem; |
|
731 |
|
732 /** |
|
733 * Index of element returned by most recent call to next. |
|
734 * Reset to -1 if this element is deleted by a call to remove. |
|
735 */ |
|
736 private int lastRet; |
|
737 |
742 |
738 Itr() { |
743 Itr() { |
739 lastRet = -1; |
744 final ReentrantLock lock = ArrayBlockingQueue.this.lock; |
740 if (count == 0) |
745 lock.lock(); |
741 nextIndex = -1; |
746 try { |
742 else { |
747 lastRet = -1; |
743 nextIndex = takeIndex; |
748 if ((remaining = count) > 0) |
744 nextItem = items[takeIndex]; |
749 nextItem = itemAt(nextIndex = takeIndex); |
|
750 } finally { |
|
751 lock.unlock(); |
745 } |
752 } |
746 } |
753 } |
747 |
754 |
748 public boolean hasNext() { |
755 public boolean hasNext() { |
749 /* |
756 return remaining > 0; |
750 * No sync. We can return true by mistake here |
|
751 * only if this iterator passed across threads, |
|
752 * which we don't support anyway. |
|
753 */ |
|
754 return nextIndex >= 0; |
|
755 } |
|
756 |
|
757 /** |
|
758 * Checks whether nextIndex is valid; if so setting nextItem. |
|
759 * Stops iterator when either hits putIndex or sees null item. |
|
760 */ |
|
761 private void checkNext() { |
|
762 if (nextIndex == putIndex) { |
|
763 nextIndex = -1; |
|
764 nextItem = null; |
|
765 } else { |
|
766 nextItem = items[nextIndex]; |
|
767 if (nextItem == null) |
|
768 nextIndex = -1; |
|
769 } |
|
770 } |
757 } |
771 |
758 |
772 public E next() { |
759 public E next() { |
773 final ReentrantLock lock = ArrayBlockingQueue.this.lock; |
760 final ReentrantLock lock = ArrayBlockingQueue.this.lock; |
774 lock.lock(); |
761 lock.lock(); |
775 try { |
762 try { |
776 if (nextIndex < 0) |
763 if (remaining <= 0) |
777 throw new NoSuchElementException(); |
764 throw new NoSuchElementException(); |
778 lastRet = nextIndex; |
765 lastRet = nextIndex; |
779 E x = nextItem; |
766 E x = itemAt(nextIndex); // check for fresher value |
780 nextIndex = inc(nextIndex); |
767 if (x == null) { |
781 checkNext(); |
768 x = nextItem; // we are forced to report old value |
|
769 lastItem = null; // but ensure remove fails |
|
770 } |
|
771 else |
|
772 lastItem = x; |
|
773 while (--remaining > 0 && // skip over nulls |
|
774 (nextItem = itemAt(nextIndex = inc(nextIndex))) == null) |
|
775 ; |
782 return x; |
776 return x; |
783 } finally { |
777 } finally { |
784 lock.unlock(); |
778 lock.unlock(); |
785 } |
779 } |
786 } |
780 } |