454 volatile Node next; |
454 volatile Node next; |
455 volatile Thread waiter; // null until waiting |
455 volatile Thread waiter; // null until waiting |
456 |
456 |
457 // CAS methods for fields |
457 // CAS methods for fields |
458 final boolean casNext(Node cmp, Node val) { |
458 final boolean casNext(Node cmp, Node val) { |
459 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); |
459 return U.compareAndSwapObject(this, NEXT, cmp, val); |
460 } |
460 } |
461 |
461 |
462 final boolean casItem(Object cmp, Object val) { |
462 final boolean casItem(Object cmp, Object val) { |
463 // assert cmp == null || cmp.getClass() != Node.class; |
463 // assert cmp == null || cmp.getClass() != Node.class; |
464 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); |
464 return U.compareAndSwapObject(this, ITEM, cmp, val); |
465 } |
465 } |
466 |
466 |
467 /** |
467 /** |
468 * Constructs a new node. Uses relaxed write because item can |
468 * Constructs a new node. Uses relaxed write because item can |
469 * only be seen after publication via casNext. |
469 * only be seen after publication via casNext. |
470 */ |
470 */ |
471 Node(Object item, boolean isData) { |
471 Node(Object item, boolean isData) { |
472 UNSAFE.putObject(this, itemOffset, item); // relaxed write |
472 U.putObject(this, ITEM, item); // relaxed write |
473 this.isData = isData; |
473 this.isData = isData; |
474 } |
474 } |
475 |
475 |
476 /** |
476 /** |
477 * Links node to itself to avoid garbage retention. Called |
477 * Links node to itself to avoid garbage retention. Called |
478 * only after CASing head field, so uses relaxed write. |
478 * only after CASing head field, so uses relaxed write. |
479 */ |
479 */ |
480 final void forgetNext() { |
480 final void forgetNext() { |
481 UNSAFE.putObject(this, nextOffset, this); |
481 U.putObject(this, NEXT, this); |
482 } |
482 } |
483 |
483 |
484 /** |
484 /** |
485 * Sets item to self and waiter to null, to avoid garbage |
485 * Sets item to self and waiter to null, to avoid garbage |
486 * retention after matching or cancelling. Uses relaxed writes |
486 * retention after matching or cancelling. Uses relaxed writes |
567 /** The number of apparent failures to unsplice removed nodes */ |
565 /** The number of apparent failures to unsplice removed nodes */ |
568 private transient volatile int sweepVotes; |
566 private transient volatile int sweepVotes; |
569 |
567 |
570 // CAS methods for fields |
568 // CAS methods for fields |
571 private boolean casTail(Node cmp, Node val) { |
569 private boolean casTail(Node cmp, Node val) { |
572 return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); |
570 return U.compareAndSwapObject(this, TAIL, cmp, val); |
573 } |
571 } |
574 |
572 |
575 private boolean casHead(Node cmp, Node val) { |
573 private boolean casHead(Node cmp, Node val) { |
576 return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); |
574 return U.compareAndSwapObject(this, HEAD, cmp, val); |
577 } |
575 } |
578 |
576 |
579 private boolean casSweepVotes(int cmp, int val) { |
577 private boolean casSweepVotes(int cmp, int val) { |
580 return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val); |
578 return U.compareAndSwapInt(this, SWEEPVOTES, cmp, val); |
581 } |
579 } |
582 |
580 |
583 /* |
581 /* |
584 * Possible values for "how" argument in xfer method. |
582 * Possible values for "how" argument in xfer method. |
585 */ |
583 */ |
586 private static final int NOW = 0; // for untimed poll, tryTransfer |
584 private static final int NOW = 0; // for untimed poll, tryTransfer |
587 private static final int ASYNC = 1; // for offer, put, add |
585 private static final int ASYNC = 1; // for offer, put, add |
588 private static final int SYNC = 2; // for transfer, take |
586 private static final int SYNC = 2; // for transfer, take |
589 private static final int TIMED = 3; // for timed poll, tryTransfer |
587 private static final int TIMED = 3; // for timed poll, tryTransfer |
590 |
|
591 @SuppressWarnings("unchecked") |
|
592 static <E> E cast(Object item) { |
|
593 // assert item == null || item.getClass() != Node.class; |
|
594 return (E) item; |
|
595 } |
|
596 |
588 |
597 /** |
589 /** |
598 * Implements all queuing methods. See above for explanation. |
590 * Implements all queuing methods. See above for explanation. |
599 * |
591 * |
600 * @param e the item or null for take |
592 * @param e the item or null for take |
766 Node next = p.next; |
759 Node next = p.next; |
767 return (p == next) ? head : next; |
760 return (p == next) ? head : next; |
768 } |
761 } |
769 |
762 |
770 /** |
763 /** |
771 * Returns the first unmatched node of the given mode, or null if |
764 * Returns the first unmatched data node, or null if none. |
772 * none. Used by methods isEmpty, hasWaitingConsumer. |
765 * Callers must recheck if the returned node's item field is null |
773 */ |
766 * or self-linked before using. |
774 private Node firstOfMode(boolean isData) { |
|
775 for (Node p = head; p != null; p = succ(p)) { |
|
776 if (!p.isMatched()) |
|
777 return (p.isData == isData) ? p : null; |
|
778 } |
|
779 return null; |
|
780 } |
|
781 |
|
782 /** |
|
783 * Version of firstOfMode used by Spliterator. Callers must |
|
784 * recheck if the returned node's item field is null or |
|
785 * self-linked before using. |
|
786 */ |
767 */ |
787 final Node firstDataNode() { |
768 final Node firstDataNode() { |
788 for (Node p = head; p != null;) { |
769 restartFromHead: for (;;) { |
789 Object item = p.item; |
770 for (Node p = head; p != null;) { |
790 if (p.isData) { |
771 Object item = p.item; |
791 if (item != null && item != p) |
772 if (p.isData) { |
792 return p; |
773 if (item != null && item != p) |
793 } |
774 return p; |
794 else if (item == null) |
775 } |
795 break; |
776 else if (item == null) |
796 if (p == (p = p.next)) |
777 break; |
797 p = head; |
778 if (p == (p = p.next)) |
798 } |
779 continue restartFromHead; |
799 return null; |
780 } |
800 } |
781 return null; |
801 |
782 } |
802 /** |
|
803 * Returns the item in the first unmatched node with isData; or |
|
804 * null if none. Used by peek. |
|
805 */ |
|
806 private E firstDataItem() { |
|
807 for (Node p = head; p != null; p = succ(p)) { |
|
808 Object item = p.item; |
|
809 if (p.isData) { |
|
810 if (item != null && item != p) |
|
811 return LinkedTransferQueue.<E>cast(item); |
|
812 } |
|
813 else if (item == null) |
|
814 return null; |
|
815 } |
|
816 return null; |
|
817 } |
783 } |
818 |
784 |
819 /** |
785 /** |
820 * Traverses and counts unmatched nodes of the given mode. |
786 * Traverses and counts unmatched nodes of the given mode. |
821 * Used by methods size and getWaitingConsumerCount. |
787 * Used by methods size and getWaitingConsumerCount. |
822 */ |
788 */ |
823 private int countOfMode(boolean data) { |
789 private int countOfMode(boolean data) { |
824 int count = 0; |
790 restartFromHead: for (;;) { |
825 for (Node p = head; p != null; ) { |
791 int count = 0; |
826 if (!p.isMatched()) { |
792 for (Node p = head; p != null;) { |
827 if (p.isData != data) |
793 if (!p.isMatched()) { |
828 return 0; |
794 if (p.isData != data) |
829 if (++count == Integer.MAX_VALUE) // saturated |
795 return 0; |
|
796 if (++count == Integer.MAX_VALUE) |
|
797 break; // @see Collection.size() |
|
798 } |
|
799 if (p == (p = p.next)) |
|
800 continue restartFromHead; |
|
801 } |
|
802 return count; |
|
803 } |
|
804 } |
|
805 |
|
806 public String toString() { |
|
807 String[] a = null; |
|
808 restartFromHead: for (;;) { |
|
809 int charLength = 0; |
|
810 int size = 0; |
|
811 for (Node p = head; p != null;) { |
|
812 Object item = p.item; |
|
813 if (p.isData) { |
|
814 if (item != null && item != p) { |
|
815 if (a == null) |
|
816 a = new String[4]; |
|
817 else if (size == a.length) |
|
818 a = Arrays.copyOf(a, 2 * size); |
|
819 String s = item.toString(); |
|
820 a[size++] = s; |
|
821 charLength += s.length(); |
|
822 } |
|
823 } else if (item == null) |
830 break; |
824 break; |
831 } |
825 if (p == (p = p.next)) |
832 Node n = p.next; |
826 continue restartFromHead; |
833 if (n != p) |
827 } |
834 p = n; |
828 |
835 else { |
829 if (size == 0) |
836 count = 0; |
830 return "[]"; |
837 p = head; |
831 |
838 } |
832 return Helpers.toString(a, size, charLength); |
839 } |
833 } |
840 return count; |
834 } |
|
835 |
|
836 private Object[] toArrayInternal(Object[] a) { |
|
837 Object[] x = a; |
|
838 restartFromHead: for (;;) { |
|
839 int size = 0; |
|
840 for (Node p = head; p != null;) { |
|
841 Object item = p.item; |
|
842 if (p.isData) { |
|
843 if (item != null && item != p) { |
|
844 if (x == null) |
|
845 x = new Object[4]; |
|
846 else if (size == x.length) |
|
847 x = Arrays.copyOf(x, 2 * (size + 4)); |
|
848 x[size++] = item; |
|
849 } |
|
850 } else if (item == null) |
|
851 break; |
|
852 if (p == (p = p.next)) |
|
853 continue restartFromHead; |
|
854 } |
|
855 if (x == null) |
|
856 return new Object[0]; |
|
857 else if (a != null && size <= a.length) { |
|
858 if (a != x) |
|
859 System.arraycopy(x, 0, a, 0, size); |
|
860 if (size < a.length) |
|
861 a[size] = null; |
|
862 return a; |
|
863 } |
|
864 return (size == x.length) ? x : Arrays.copyOf(x, size); |
|
865 } |
|
866 } |
|
867 |
|
868 /** |
|
869 * Returns an array containing all of the elements in this queue, in |
|
870 * proper sequence. |
|
871 * |
|
872 * <p>The returned array will be "safe" in that no references to it are |
|
873 * maintained by this queue. (In other words, this method must allocate |
|
874 * a new array). The caller is thus free to modify the returned array. |
|
875 * |
|
876 * <p>This method acts as bridge between array-based and collection-based |
|
877 * APIs. |
|
878 * |
|
879 * @return an array containing all of the elements in this queue |
|
880 */ |
|
881 public Object[] toArray() { |
|
882 return toArrayInternal(null); |
|
883 } |
|
884 |
|
885 /** |
|
886 * Returns an array containing all of the elements in this queue, in |
|
887 * proper sequence; the runtime type of the returned array is that of |
|
888 * the specified array. If the queue fits in the specified array, it |
|
889 * is returned therein. Otherwise, a new array is allocated with the |
|
890 * runtime type of the specified array and the size of this queue. |
|
891 * |
|
892 * <p>If this queue fits in the specified array with room to spare |
|
893 * (i.e., the array has more elements than this queue), the element in |
|
894 * the array immediately following the end of the queue is set to |
|
895 * {@code null}. |
|
896 * |
|
897 * <p>Like the {@link #toArray()} method, this method acts as bridge between |
|
898 * array-based and collection-based APIs. Further, this method allows |
|
899 * precise control over the runtime type of the output array, and may, |
|
900 * under certain circumstances, be used to save allocation costs. |
|
901 * |
|
902 * <p>Suppose {@code x} is a queue known to contain only strings. |
|
903 * The following code can be used to dump the queue into a newly |
|
904 * allocated array of {@code String}: |
|
905 * |
|
906 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> |
|
907 * |
|
908 * Note that {@code toArray(new Object[0])} is identical in function to |
|
909 * {@code toArray()}. |
|
910 * |
|
911 * @param a the array into which the elements of the queue are to |
|
912 * be stored, if it is big enough; otherwise, a new array of the |
|
913 * same runtime type is allocated for this purpose |
|
914 * @return an array containing all of the elements in this queue |
|
915 * @throws ArrayStoreException if the runtime type of the specified array |
|
916 * is not a supertype of the runtime type of every element in |
|
917 * this queue |
|
918 * @throws NullPointerException if the specified array is null |
|
919 */ |
|
920 @SuppressWarnings("unchecked") |
|
921 public <T> T[] toArray(T[] a) { |
|
922 if (a == null) throw new NullPointerException(); |
|
923 return (T[]) toArrayInternal(a); |
841 } |
924 } |
842 |
925 |
843 final class Itr implements Iterator<E> { |
926 final class Itr implements Iterator<E> { |
844 private Node nextNode; // next node to return item for |
927 private Node nextNode; // next node to return item for |
845 private E nextItem; // the corresponding item |
928 private E nextItem; // the corresponding item |
932 unsplice(lastPred, lastRet); |
1016 unsplice(lastPred, lastRet); |
933 } |
1017 } |
934 } |
1018 } |
935 |
1019 |
936 /** A customized variant of Spliterators.IteratorSpliterator */ |
1020 /** A customized variant of Spliterators.IteratorSpliterator */ |
937 static final class LTQSpliterator<E> implements Spliterator<E> { |
1021 final class LTQSpliterator<E> implements Spliterator<E> { |
938 static final int MAX_BATCH = 1 << 25; // max batch array size; |
1022 static final int MAX_BATCH = 1 << 25; // max batch array size; |
939 final LinkedTransferQueue<E> queue; |
1023 Node current; // current node; null until initialized |
940 Node current; // current node; null until initialized |
|
941 int batch; // batch size for splits |
1024 int batch; // batch size for splits |
942 boolean exhausted; // true when no more nodes |
1025 boolean exhausted; // true when no more nodes |
943 LTQSpliterator(LinkedTransferQueue<E> queue) { |
1026 LTQSpliterator() {} |
944 this.queue = queue; |
|
945 } |
|
946 |
1027 |
947 public Spliterator<E> trySplit() { |
1028 public Spliterator<E> trySplit() { |
948 Node p; |
1029 Node p; |
949 final LinkedTransferQueue<E> q = this.queue; |
|
950 int b = batch; |
1030 int b = batch; |
951 int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; |
1031 int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; |
952 if (!exhausted && |
1032 if (!exhausted && |
953 ((p = current) != null || (p = q.firstDataNode()) != null) && |
1033 ((p = current) != null || (p = firstDataNode()) != null) && |
954 p.next != null) { |
1034 p.next != null) { |
955 Object[] a = new Object[n]; |
1035 Object[] a = new Object[n]; |
956 int i = 0; |
1036 int i = 0; |
957 do { |
1037 do { |
958 Object e = p.item; |
1038 Object e = p.item; |
959 if (e != p && (a[i] = e) != null) |
1039 if (e != p && (a[i] = e) != null) |
960 ++i; |
1040 ++i; |
961 if (p == (p = p.next)) |
1041 if (p == (p = p.next)) |
962 p = q.firstDataNode(); |
1042 p = firstDataNode(); |
963 } while (p != null && i < n && p.isData); |
1043 } while (p != null && i < n && p.isData); |
964 if ((current = p) == null) |
1044 if ((current = p) == null) |
965 exhausted = true; |
1045 exhausted = true; |
966 if (i > 0) { |
1046 if (i > 0) { |
967 batch = i; |
1047 batch = i; |
968 return Spliterators.spliterator |
1048 return Spliterators.spliterator |
969 (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL | |
1049 (a, 0, i, (Spliterator.ORDERED | |
970 Spliterator.CONCURRENT); |
1050 Spliterator.NONNULL | |
|
1051 Spliterator.CONCURRENT)); |
971 } |
1052 } |
972 } |
1053 } |
973 return null; |
1054 return null; |
974 } |
1055 } |
975 |
1056 |
976 @SuppressWarnings("unchecked") |
1057 @SuppressWarnings("unchecked") |
977 public void forEachRemaining(Consumer<? super E> action) { |
1058 public void forEachRemaining(Consumer<? super E> action) { |
978 Node p; |
1059 Node p; |
979 if (action == null) throw new NullPointerException(); |
1060 if (action == null) throw new NullPointerException(); |
980 final LinkedTransferQueue<E> q = this.queue; |
|
981 if (!exhausted && |
1061 if (!exhausted && |
982 ((p = current) != null || (p = q.firstDataNode()) != null)) { |
1062 ((p = current) != null || (p = firstDataNode()) != null)) { |
983 exhausted = true; |
1063 exhausted = true; |
984 do { |
1064 do { |
985 Object e = p.item; |
1065 Object e = p.item; |
986 if (e != null && e != p) |
1066 if (e != null && e != p) |
987 action.accept((E)e); |
1067 action.accept((E)e); |
988 if (p == (p = p.next)) |
1068 if (p == (p = p.next)) |
989 p = q.firstDataNode(); |
1069 p = firstDataNode(); |
990 } while (p != null && p.isData); |
1070 } while (p != null && p.isData); |
991 } |
1071 } |
992 } |
1072 } |
993 |
1073 |
994 @SuppressWarnings("unchecked") |
1074 @SuppressWarnings("unchecked") |
995 public boolean tryAdvance(Consumer<? super E> action) { |
1075 public boolean tryAdvance(Consumer<? super E> action) { |
996 Node p; |
1076 Node p; |
997 if (action == null) throw new NullPointerException(); |
1077 if (action == null) throw new NullPointerException(); |
998 final LinkedTransferQueue<E> q = this.queue; |
|
999 if (!exhausted && |
1078 if (!exhausted && |
1000 ((p = current) != null || (p = q.firstDataNode()) != null)) { |
1079 ((p = current) != null || (p = firstDataNode()) != null)) { |
1001 Object e; |
1080 Object e; |
1002 do { |
1081 do { |
1003 if ((e = p.item) == p) |
1082 if ((e = p.item) == p) |
1004 e = null; |
1083 e = null; |
1005 if (p == (p = p.next)) |
1084 if (p == (p = p.next)) |
1006 p = q.firstDataNode(); |
1085 p = firstDataNode(); |
1007 } while (e == null && p != null && p.isData); |
1086 } while (e == null && p != null && p.isData); |
1008 if ((current = p) == null) |
1087 if ((current = p) == null) |
1009 exhausted = true; |
1088 exhausted = true; |
1010 if (e != null) { |
1089 if (e != null) { |
1011 action.accept((E)e); |
1090 action.accept((E)e); |
1330 public Iterator<E> iterator() { |
1409 public Iterator<E> iterator() { |
1331 return new Itr(); |
1410 return new Itr(); |
1332 } |
1411 } |
1333 |
1412 |
1334 public E peek() { |
1413 public E peek() { |
1335 return firstDataItem(); |
1414 restartFromHead: for (;;) { |
|
1415 for (Node p = head; p != null;) { |
|
1416 Object item = p.item; |
|
1417 if (p.isData) { |
|
1418 if (item != null && item != p) { |
|
1419 @SuppressWarnings("unchecked") E e = (E) item; |
|
1420 return e; |
|
1421 } |
|
1422 } |
|
1423 else if (item == null) |
|
1424 break; |
|
1425 if (p == (p = p.next)) |
|
1426 continue restartFromHead; |
|
1427 } |
|
1428 return null; |
|
1429 } |
1336 } |
1430 } |
1337 |
1431 |
1338 /** |
1432 /** |
1339 * Returns {@code true} if this queue contains no elements. |
1433 * Returns {@code true} if this queue contains no elements. |
1340 * |
1434 * |
1341 * @return {@code true} if this queue contains no elements |
1435 * @return {@code true} if this queue contains no elements |
1342 */ |
1436 */ |
1343 public boolean isEmpty() { |
1437 public boolean isEmpty() { |
1344 for (Node p = head; p != null; p = succ(p)) { |
1438 return firstDataNode() == null; |
1345 if (!p.isMatched()) |
|
1346 return !p.isData; |
|
1347 } |
|
1348 return true; |
|
1349 } |
1439 } |
1350 |
1440 |
1351 public boolean hasWaitingConsumer() { |
1441 public boolean hasWaitingConsumer() { |
1352 return firstOfMode(false) != null; |
1442 restartFromHead: for (;;) { |
|
1443 for (Node p = head; p != null;) { |
|
1444 Object item = p.item; |
|
1445 if (p.isData) { |
|
1446 if (item != null && item != p) |
|
1447 break; |
|
1448 } |
|
1449 else if (item == null) |
|
1450 return true; |
|
1451 if (p == (p = p.next)) |
|
1452 continue restartFromHead; |
|
1453 } |
|
1454 return false; |
|
1455 } |
1353 } |
1456 } |
1354 |
1457 |
1355 /** |
1458 /** |
1356 * Returns the number of elements in this queue. If this queue |
1459 * Returns the number of elements in this queue. If this queue |
1357 * contains more than {@code Integer.MAX_VALUE} elements, returns |
1460 * contains more than {@code Integer.MAX_VALUE} elements, returns |