37 |
37 |
38 import java.util.AbstractQueue; |
38 import java.util.AbstractQueue; |
39 import java.util.Collection; |
39 import java.util.Collection; |
40 import java.util.Iterator; |
40 import java.util.Iterator; |
41 import java.util.NoSuchElementException; |
41 import java.util.NoSuchElementException; |
|
42 import java.util.Objects; |
42 import java.util.Spliterator; |
43 import java.util.Spliterator; |
43 import java.util.Spliterators; |
44 import java.util.Spliterators; |
44 import java.util.concurrent.atomic.AtomicInteger; |
45 import java.util.concurrent.atomic.AtomicInteger; |
45 import java.util.concurrent.locks.Condition; |
46 import java.util.concurrent.locks.Condition; |
46 import java.util.concurrent.locks.ReentrantLock; |
47 import java.util.concurrent.locks.ReentrantLock; |
232 void fullyUnlock() { |
233 void fullyUnlock() { |
233 takeLock.unlock(); |
234 takeLock.unlock(); |
234 putLock.unlock(); |
235 putLock.unlock(); |
235 } |
236 } |
236 |
237 |
237 // /** |
|
238 // * Tells whether both locks are held by current thread. |
|
239 // */ |
|
240 // boolean isFullyLocked() { |
|
241 // return (putLock.isHeldByCurrentThread() && |
|
242 // takeLock.isHeldByCurrentThread()); |
|
243 // } |
|
244 |
|
245 /** |
238 /** |
246 * Creates a {@code LinkedBlockingQueue} with a capacity of |
239 * Creates a {@code LinkedBlockingQueue} with a capacity of |
247 * {@link Integer#MAX_VALUE}. |
240 * {@link Integer#MAX_VALUE}. |
248 */ |
241 */ |
249 public LinkedBlockingQueue() { |
242 public LinkedBlockingQueue() { |
515 |
508 |
516 /** |
509 /** |
517 * Unlinks interior Node p with predecessor trail. |
510 * Unlinks interior Node p with predecessor trail. |
518 */ |
511 */ |
519 void unlink(Node<E> p, Node<E> trail) { |
512 void unlink(Node<E> p, Node<E> trail) { |
520 // assert isFullyLocked(); |
513 // assert putLock.isHeldByCurrentThread(); |
|
514 // assert takeLock.isHeldByCurrentThread(); |
521 // p.next is not changed, to allow iterators that are |
515 // p.next is not changed, to allow iterators that are |
522 // traversing p to maintain their weak-consistency guarantee. |
516 // traversing p to maintain their weak-consistency guarantee. |
523 p.item = null; |
517 p.item = null; |
524 trail.next = p.next; |
518 trail.next = p.next; |
525 if (last == p) |
519 if (last == p) |
699 * @throws ClassCastException {@inheritDoc} |
693 * @throws ClassCastException {@inheritDoc} |
700 * @throws NullPointerException {@inheritDoc} |
694 * @throws NullPointerException {@inheritDoc} |
701 * @throws IllegalArgumentException {@inheritDoc} |
695 * @throws IllegalArgumentException {@inheritDoc} |
702 */ |
696 */ |
703 public int drainTo(Collection<? super E> c, int maxElements) { |
697 public int drainTo(Collection<? super E> c, int maxElements) { |
704 if (c == null) |
698 Objects.requireNonNull(c); |
705 throw new NullPointerException(); |
|
706 if (c == this) |
699 if (c == this) |
707 throw new IllegalArgumentException(); |
700 throw new IllegalArgumentException(); |
708 if (maxElements <= 0) |
701 if (maxElements <= 0) |
709 return 0; |
702 return 0; |
710 boolean signalNotFull = false; |
703 boolean signalNotFull = false; |
739 signalNotFull(); |
732 signalNotFull(); |
740 } |
733 } |
741 } |
734 } |
742 |
735 |
743 /** |
736 /** |
|
737 * Used for any element traversal that is not entirely under lock. |
|
738 * Such traversals must handle both: |
|
739 * - dequeued nodes (p.next == p) |
|
740 * - (possibly multiple) interior removed nodes (p.item == null) |
|
741 */ |
|
742 Node<E> succ(Node<E> p) { |
|
743 return (p == (p = p.next)) ? head.next : p; |
|
744 } |
|
745 |
|
746 /** |
744 * Returns an iterator over the elements in this queue in proper sequence. |
747 * Returns an iterator over the elements in this queue in proper sequence. |
745 * The elements will be returned in order from first (head) to last (tail). |
748 * The elements will be returned in order from first (head) to last (tail). |
746 * |
749 * |
747 * <p>The returned iterator is |
750 * <p>The returned iterator is |
748 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. |
751 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. |
758 * Basic weakly-consistent iterator. At all times hold the next |
761 * Basic weakly-consistent iterator. At all times hold the next |
759 * item to hand out so that if hasNext() reports true, we will |
762 * item to hand out so that if hasNext() reports true, we will |
760 * still have it to return even if lost race with a take etc. |
763 * still have it to return even if lost race with a take etc. |
761 */ |
764 */ |
762 |
765 |
763 private Node<E> current; |
766 private Node<E> next; |
|
767 private E nextItem; |
764 private Node<E> lastRet; |
768 private Node<E> lastRet; |
765 private E currentElement; |
|
766 |
769 |
767 Itr() { |
770 Itr() { |
768 fullyLock(); |
771 fullyLock(); |
769 try { |
772 try { |
770 current = head.next; |
773 if ((next = head.next) != null) |
771 if (current != null) |
774 nextItem = next.item; |
772 currentElement = current.item; |
|
773 } finally { |
775 } finally { |
774 fullyUnlock(); |
776 fullyUnlock(); |
775 } |
777 } |
776 } |
778 } |
777 |
779 |
778 public boolean hasNext() { |
780 public boolean hasNext() { |
779 return current != null; |
781 return next != null; |
780 } |
782 } |
781 |
783 |
782 public E next() { |
784 public E next() { |
|
785 Node<E> p; |
|
786 if ((p = next) == null) |
|
787 throw new NoSuchElementException(); |
|
788 lastRet = p; |
|
789 E x = nextItem; |
783 fullyLock(); |
790 fullyLock(); |
784 try { |
791 try { |
785 if (current == null) |
792 E e = null; |
786 throw new NoSuchElementException(); |
793 for (p = p.next; p != null && (e = p.item) == null; ) |
787 lastRet = current; |
794 p = succ(p); |
788 E item = null; |
795 next = p; |
789 // Unlike other traversal methods, iterators must handle both: |
796 nextItem = e; |
790 // - dequeued nodes (p.next == p) |
|
791 // - (possibly multiple) interior removed nodes (p.item == null) |
|
792 for (Node<E> p = current, q;; p = q) { |
|
793 if ((q = p.next) == p) |
|
794 q = head.next; |
|
795 if (q == null || (item = q.item) != null) { |
|
796 current = q; |
|
797 E x = currentElement; |
|
798 currentElement = item; |
|
799 return x; |
|
800 } |
|
801 } |
|
802 } finally { |
797 } finally { |
803 fullyUnlock(); |
798 fullyUnlock(); |
804 } |
799 } |
|
800 return x; |
|
801 } |
|
802 |
|
803 public void forEachRemaining(Consumer<? super E> action) { |
|
804 // A variant of forEachFrom |
|
805 Objects.requireNonNull(action); |
|
806 Node<E> p; |
|
807 if ((p = next) == null) return; |
|
808 lastRet = p; |
|
809 next = null; |
|
810 final int batchSize = 32; |
|
811 Object[] es = null; |
|
812 int n, len = 1; |
|
813 do { |
|
814 fullyLock(); |
|
815 try { |
|
816 if (es == null) { |
|
817 p = p.next; |
|
818 for (Node<E> q = p; q != null; q = succ(q)) |
|
819 if (q.item != null && ++len == batchSize) |
|
820 break; |
|
821 es = new Object[len]; |
|
822 es[0] = nextItem; |
|
823 nextItem = null; |
|
824 n = 1; |
|
825 } else |
|
826 n = 0; |
|
827 for (; p != null && n < len; p = succ(p)) |
|
828 if ((es[n] = p.item) != null) { |
|
829 lastRet = p; |
|
830 n++; |
|
831 } |
|
832 } finally { |
|
833 fullyUnlock(); |
|
834 } |
|
835 for (int i = 0; i < n; i++) { |
|
836 @SuppressWarnings("unchecked") E e = (E) es[i]; |
|
837 action.accept(e); |
|
838 } |
|
839 } while (n > 0 && p != null); |
805 } |
840 } |
806 |
841 |
807 public void remove() { |
842 public void remove() { |
808 if (lastRet == null) |
843 if (lastRet == null) |
809 throw new IllegalStateException(); |
844 throw new IllegalStateException(); |
823 fullyUnlock(); |
858 fullyUnlock(); |
824 } |
859 } |
825 } |
860 } |
826 } |
861 } |
827 |
862 |
828 /** A customized variant of Spliterators.IteratorSpliterator */ |
863 /** |
829 static final class LBQSpliterator<E> implements Spliterator<E> { |
864 * A customized variant of Spliterators.IteratorSpliterator. |
|
865 * Keep this class in sync with (very similar) LBDSpliterator. |
|
866 */ |
|
867 private final class LBQSpliterator implements Spliterator<E> { |
830 static final int MAX_BATCH = 1 << 25; // max batch array size; |
868 static final int MAX_BATCH = 1 << 25; // max batch array size; |
831 final LinkedBlockingQueue<E> queue; |
|
832 Node<E> current; // current node; null until initialized |
869 Node<E> current; // current node; null until initialized |
833 int batch; // batch size for splits |
870 int batch; // batch size for splits |
834 boolean exhausted; // true when no more nodes |
871 boolean exhausted; // true when no more nodes |
835 long est; // size estimate |
872 long est = size(); // size estimate |
836 LBQSpliterator(LinkedBlockingQueue<E> queue) { |
873 |
837 this.queue = queue; |
874 LBQSpliterator() {} |
838 this.est = queue.size(); |
|
839 } |
|
840 |
875 |
841 public long estimateSize() { return est; } |
876 public long estimateSize() { return est; } |
842 |
877 |
843 public Spliterator<E> trySplit() { |
878 public Spliterator<E> trySplit() { |
844 Node<E> h; |
879 Node<E> h; |
845 final LinkedBlockingQueue<E> q = this.queue; |
|
846 int b = batch; |
880 int b = batch; |
847 int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; |
881 int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; |
848 if (!exhausted && |
882 if (!exhausted && |
849 ((h = current) != null || (h = q.head.next) != null) && |
883 ((h = current) != null || (h = head.next) != null) |
850 h.next != null) { |
884 && h.next != null) { |
851 Object[] a = new Object[n]; |
885 Object[] a = new Object[n]; |
852 int i = 0; |
886 int i = 0; |
853 Node<E> p = current; |
887 Node<E> p = current; |
854 q.fullyLock(); |
888 fullyLock(); |
855 try { |
889 try { |
856 if (p != null || (p = q.head.next) != null) { |
890 if (p != null || (p = head.next) != null) |
857 do { |
891 for (; p != null && i < n; p = succ(p)) |
858 if ((a[i] = p.item) != null) |
892 if ((a[i] = p.item) != null) |
859 ++i; |
893 i++; |
860 } while ((p = p.next) != null && i < n); |
|
861 } |
|
862 } finally { |
894 } finally { |
863 q.fullyUnlock(); |
895 fullyUnlock(); |
864 } |
896 } |
865 if ((current = p) == null) { |
897 if ((current = p) == null) { |
866 est = 0L; |
898 est = 0L; |
867 exhausted = true; |
899 exhausted = true; |
868 } |
900 } |
877 } |
909 } |
878 } |
910 } |
879 return null; |
911 return null; |
880 } |
912 } |
881 |
913 |
|
914 public boolean tryAdvance(Consumer<? super E> action) { |
|
915 Objects.requireNonNull(action); |
|
916 if (!exhausted) { |
|
917 E e = null; |
|
918 fullyLock(); |
|
919 try { |
|
920 Node<E> p; |
|
921 if ((p = current) != null || (p = head.next) != null) |
|
922 do { |
|
923 e = p.item; |
|
924 p = succ(p); |
|
925 } while (e == null && p != null); |
|
926 exhausted = ((current = p) == null); |
|
927 } finally { |
|
928 fullyUnlock(); |
|
929 } |
|
930 if (e != null) { |
|
931 action.accept(e); |
|
932 return true; |
|
933 } |
|
934 } |
|
935 return false; |
|
936 } |
|
937 |
882 public void forEachRemaining(Consumer<? super E> action) { |
938 public void forEachRemaining(Consumer<? super E> action) { |
883 if (action == null) throw new NullPointerException(); |
939 Objects.requireNonNull(action); |
884 final LinkedBlockingQueue<E> q = this.queue; |
|
885 if (!exhausted) { |
940 if (!exhausted) { |
886 exhausted = true; |
941 exhausted = true; |
887 Node<E> p = current; |
942 Node<E> p = current; |
888 do { |
943 current = null; |
889 E e = null; |
944 forEachFrom(action, p); |
890 q.fullyLock(); |
945 } |
891 try { |
|
892 if (p == null) |
|
893 p = q.head.next; |
|
894 while (p != null) { |
|
895 e = p.item; |
|
896 p = p.next; |
|
897 if (e != null) |
|
898 break; |
|
899 } |
|
900 } finally { |
|
901 q.fullyUnlock(); |
|
902 } |
|
903 if (e != null) |
|
904 action.accept(e); |
|
905 } while (p != null); |
|
906 } |
|
907 } |
|
908 |
|
909 public boolean tryAdvance(Consumer<? super E> action) { |
|
910 if (action == null) throw new NullPointerException(); |
|
911 final LinkedBlockingQueue<E> q = this.queue; |
|
912 if (!exhausted) { |
|
913 E e = null; |
|
914 q.fullyLock(); |
|
915 try { |
|
916 if (current == null) |
|
917 current = q.head.next; |
|
918 while (current != null) { |
|
919 e = current.item; |
|
920 current = current.next; |
|
921 if (e != null) |
|
922 break; |
|
923 } |
|
924 } finally { |
|
925 q.fullyUnlock(); |
|
926 } |
|
927 if (current == null) |
|
928 exhausted = true; |
|
929 if (e != null) { |
|
930 action.accept(e); |
|
931 return true; |
|
932 } |
|
933 } |
|
934 return false; |
|
935 } |
946 } |
936 |
947 |
937 public int characteristics() { |
948 public int characteristics() { |
938 return Spliterator.ORDERED | Spliterator.NONNULL | |
949 return (Spliterator.ORDERED | |
939 Spliterator.CONCURRENT; |
950 Spliterator.NONNULL | |
|
951 Spliterator.CONCURRENT); |
940 } |
952 } |
941 } |
953 } |
942 |
954 |
943 /** |
955 /** |
944 * Returns a {@link Spliterator} over the elements in this queue. |
956 * Returns a {@link Spliterator} over the elements in this queue. |
955 * |
967 * |
956 * @return a {@code Spliterator} over the elements in this queue |
968 * @return a {@code Spliterator} over the elements in this queue |
957 * @since 1.8 |
969 * @since 1.8 |
958 */ |
970 */ |
959 public Spliterator<E> spliterator() { |
971 public Spliterator<E> spliterator() { |
960 return new LBQSpliterator<E>(this); |
972 return new LBQSpliterator(); |
|
973 } |
|
974 |
|
975 /** |
|
976 * @throws NullPointerException {@inheritDoc} |
|
977 */ |
|
978 public void forEach(Consumer<? super E> action) { |
|
979 Objects.requireNonNull(action); |
|
980 forEachFrom(action, null); |
|
981 } |
|
982 |
|
983 /** |
|
984 * Runs action on each element found during a traversal starting at p. |
|
985 * If p is null, traversal starts at head. |
|
986 */ |
|
987 void forEachFrom(Consumer<? super E> action, Node<E> p) { |
|
988 // Extract batches of elements while holding the lock; then |
|
989 // run the action on the elements while not |
|
990 final int batchSize = 32; // max number of elements per batch |
|
991 Object[] es = null; // container for batch of elements |
|
992 int n, len = 0; |
|
993 do { |
|
994 fullyLock(); |
|
995 try { |
|
996 if (es == null) { |
|
997 if (p == null) p = head.next; |
|
998 for (Node<E> q = p; q != null; q = succ(q)) |
|
999 if (q.item != null && ++len == batchSize) |
|
1000 break; |
|
1001 es = new Object[len]; |
|
1002 } |
|
1003 for (n = 0; p != null && n < len; p = succ(p)) |
|
1004 if ((es[n] = p.item) != null) |
|
1005 n++; |
|
1006 } finally { |
|
1007 fullyUnlock(); |
|
1008 } |
|
1009 for (int i = 0; i < n; i++) { |
|
1010 @SuppressWarnings("unchecked") E e = (E) es[i]; |
|
1011 action.accept(e); |
|
1012 } |
|
1013 } while (n > 0 && p != null); |
961 } |
1014 } |
962 |
1015 |
963 /** |
1016 /** |
964 * Saves this queue to a stream (that is, serializes it). |
1017 * Saves this queue to a stream (that is, serializes it). |
965 * |
1018 * |