jdk/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java
changeset 32991 b27c76b82713
parent 31151 5535c077def0
child 33674 566777f73c32
equal deleted inserted replaced
32990:299a81977f48 32991:b27c76b82713
    34  */
    34  */
    35 
    35 
    36 package java.util.concurrent;
    36 package java.util.concurrent;
    37 
    37 
    38 import java.util.AbstractQueue;
    38 import java.util.AbstractQueue;
       
    39 import java.util.Arrays;
    39 import java.util.Collection;
    40 import java.util.Collection;
    40 import java.util.Iterator;
    41 import java.util.Iterator;
    41 import java.util.NoSuchElementException;
    42 import java.util.NoSuchElementException;
    42 import java.util.Queue;
    43 import java.util.Queue;
    43 import java.util.concurrent.TimeUnit;
       
    44 import java.util.concurrent.locks.LockSupport;
       
    45 import java.util.Spliterator;
    44 import java.util.Spliterator;
    46 import java.util.Spliterators;
    45 import java.util.Spliterators;
       
    46 import java.util.concurrent.locks.LockSupport;
    47 import java.util.function.Consumer;
    47 import java.util.function.Consumer;
    48 
    48 
    49 /**
    49 /**
    50  * An unbounded {@link TransferQueue} based on linked nodes.
    50  * An unbounded {@link TransferQueue} based on linked nodes.
    51  * This queue orders elements FIFO (first-in-first-out) with respect
    51  * This queue orders elements FIFO (first-in-first-out) with respect
    81  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
    81  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
    82  * Java Collections Framework</a>.
    82  * Java Collections Framework</a>.
    83  *
    83  *
    84  * @since 1.7
    84  * @since 1.7
    85  * @author Doug Lea
    85  * @author Doug Lea
    86  * @param <E> the type of elements held in this collection
    86  * @param <E> the type of elements held in this queue
    87  */
    87  */
    88 public class LinkedTransferQueue<E> extends AbstractQueue<E>
    88 public class LinkedTransferQueue<E> extends AbstractQueue<E>
    89     implements TransferQueue<E>, java.io.Serializable {
    89     implements TransferQueue<E>, java.io.Serializable {
    90     private static final long serialVersionUID = -3223113410248163686L;
    90     private static final long serialVersionUID = -3223113410248163686L;
    91 
    91 
   106      * block.  Dual Transfer Queues support all of these modes, as
   106      * block.  Dual Transfer Queues support all of these modes, as
   107      * dictated by callers.
   107      * dictated by callers.
   108      *
   108      *
   109      * A FIFO dual queue may be implemented using a variation of the
   109      * A FIFO dual queue may be implemented using a variation of the
   110      * Michael & Scott (M&S) lock-free queue algorithm
   110      * Michael & Scott (M&S) lock-free queue algorithm
   111      * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
   111      * (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf).
   112      * It maintains two pointer fields, "head", pointing to a
   112      * It maintains two pointer fields, "head", pointing to a
   113      * (matched) node that in turn points to the first actual
   113      * (matched) node that in turn points to the first actual
   114      * (unmatched) queue node (or null if empty); and "tail" that
   114      * (unmatched) queue node (or null if empty); and "tail" that
   115      * points to the last node on the queue (or again null if
   115      * points to the last node on the queue (or again null if
   116      * empty). For example, here is a possible queue with four data
   116      * empty). For example, here is a possible queue with four data
   213      *
   213      *
   214      * These ideas must be further extended to avoid unbounded amounts
   214      * These ideas must be further extended to avoid unbounded amounts
   215      * of costly-to-reclaim garbage caused by the sequential "next"
   215      * of costly-to-reclaim garbage caused by the sequential "next"
   216      * links of nodes starting at old forgotten head nodes: As first
   216      * links of nodes starting at old forgotten head nodes: As first
   217      * described in detail by Boehm
   217      * described in detail by Boehm
   218      * (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC
   218      * (http://portal.acm.org/citation.cfm?doid=503272.503282), if a GC
   219      * delays noticing that any arbitrarily old node has become
   219      * delays noticing that any arbitrarily old node has become
   220      * garbage, all newer dead nodes will also be unreclaimed.
   220      * garbage, all newer dead nodes will also be unreclaimed.
   221      * (Similar issues arise in non-GC environments.)  To cope with
   221      * (Similar issues arise in non-GC environments.)  To cope with
   222      * this in our implementation, upon CASing to advance the head
   222      * this in our implementation, upon CASing to advance the head
   223      * pointer, we set the "next" link of the previous head to point
   223      * pointer, we set the "next" link of the previous head to point
   454         volatile Node next;
   454         volatile Node next;
   455         volatile Thread waiter; // null until waiting
   455         volatile Thread waiter; // null until waiting
   456 
   456 
   457         // CAS methods for fields
   457         // CAS methods for fields
   458         final boolean casNext(Node cmp, Node val) {
   458         final boolean casNext(Node cmp, Node val) {
   459             return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
   459             return U.compareAndSwapObject(this, NEXT, cmp, val);
   460         }
   460         }
   461 
   461 
   462         final boolean casItem(Object cmp, Object val) {
   462         final boolean casItem(Object cmp, Object val) {
   463             // assert cmp == null || cmp.getClass() != Node.class;
   463             // assert cmp == null || cmp.getClass() != Node.class;
   464             return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
   464             return U.compareAndSwapObject(this, ITEM, cmp, val);
   465         }
   465         }
   466 
   466 
   467         /**
   467         /**
   468          * Constructs a new node.  Uses relaxed write because item can
   468          * Constructs a new node.  Uses relaxed write because item can
   469          * only be seen after publication via casNext.
   469          * only be seen after publication via casNext.
   470          */
   470          */
   471         Node(Object item, boolean isData) {
   471         Node(Object item, boolean isData) {
   472             UNSAFE.putObject(this, itemOffset, item); // relaxed write
   472             U.putObject(this, ITEM, item); // relaxed write
   473             this.isData = isData;
   473             this.isData = isData;
   474         }
   474         }
   475 
   475 
   476         /**
   476         /**
   477          * Links node to itself to avoid garbage retention.  Called
   477          * Links node to itself to avoid garbage retention.  Called
   478          * only after CASing head field, so uses relaxed write.
   478          * only after CASing head field, so uses relaxed write.
   479          */
   479          */
   480         final void forgetNext() {
   480         final void forgetNext() {
   481             UNSAFE.putObject(this, nextOffset, this);
   481             U.putObject(this, NEXT, this);
   482         }
   482         }
   483 
   483 
   484         /**
   484         /**
   485          * Sets item to self and waiter to null, to avoid garbage
   485          * Sets item to self and waiter to null, to avoid garbage
   486          * retention after matching or cancelling. Uses relaxed writes
   486          * retention after matching or cancelling. Uses relaxed writes
   489          * mechanics that extract items.  Similarly, clearing waiter
   489          * mechanics that extract items.  Similarly, clearing waiter
   490          * follows either CAS or return from park (if ever parked;
   490          * follows either CAS or return from park (if ever parked;
   491          * else we don't care).
   491          * else we don't care).
   492          */
   492          */
   493         final void forgetContents() {
   493         final void forgetContents() {
   494             UNSAFE.putObject(this, itemOffset, this);
   494             U.putObject(this, ITEM, this);
   495             UNSAFE.putObject(this, waiterOffset, null);
   495             U.putObject(this, WAITER, null);
   496         }
   496         }
   497 
   497 
   498         /**
   498         /**
   499          * Returns true if this node has been matched, including the
   499          * Returns true if this node has been matched, including the
   500          * case of artificial matches due to cancellation.
   500          * case of artificial matches due to cancellation.
   536         }
   536         }
   537 
   537 
   538         private static final long serialVersionUID = -3375979862319811754L;
   538         private static final long serialVersionUID = -3375979862319811754L;
   539 
   539 
   540         // Unsafe mechanics
   540         // Unsafe mechanics
   541         private static final sun.misc.Unsafe UNSAFE;
   541         private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
   542         private static final long itemOffset;
   542         private static final long ITEM;
   543         private static final long nextOffset;
   543         private static final long NEXT;
   544         private static final long waiterOffset;
   544         private static final long WAITER;
   545         static {
   545         static {
   546             try {
   546             try {
   547                 UNSAFE = sun.misc.Unsafe.getUnsafe();
   547                 ITEM = U.objectFieldOffset
   548                 Class<?> k = Node.class;
   548                     (Node.class.getDeclaredField("item"));
   549                 itemOffset = UNSAFE.objectFieldOffset
   549                 NEXT = U.objectFieldOffset
   550                     (k.getDeclaredField("item"));
   550                     (Node.class.getDeclaredField("next"));
   551                 nextOffset = UNSAFE.objectFieldOffset
   551                 WAITER = U.objectFieldOffset
   552                     (k.getDeclaredField("next"));
   552                     (Node.class.getDeclaredField("waiter"));
   553                 waiterOffset = UNSAFE.objectFieldOffset
   553             } catch (ReflectiveOperationException e) {
   554                     (k.getDeclaredField("waiter"));
       
   555             } catch (Exception e) {
       
   556                 throw new Error(e);
   554                 throw new Error(e);
   557             }
   555             }
   558         }
   556         }
   559     }
   557     }
   560 
   558 
   567     /** The number of apparent failures to unsplice removed nodes */
   565     /** The number of apparent failures to unsplice removed nodes */
   568     private transient volatile int sweepVotes;
   566     private transient volatile int sweepVotes;
   569 
   567 
   570     // CAS methods for fields
   568     // CAS methods for fields
   571     private boolean casTail(Node cmp, Node val) {
   569     private boolean casTail(Node cmp, Node val) {
   572         return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
   570         return U.compareAndSwapObject(this, TAIL, cmp, val);
   573     }
   571     }
   574 
   572 
   575     private boolean casHead(Node cmp, Node val) {
   573     private boolean casHead(Node cmp, Node val) {
   576         return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
   574         return U.compareAndSwapObject(this, HEAD, cmp, val);
   577     }
   575     }
   578 
   576 
   579     private boolean casSweepVotes(int cmp, int val) {
   577     private boolean casSweepVotes(int cmp, int val) {
   580         return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
   578         return U.compareAndSwapInt(this, SWEEPVOTES, cmp, val);
   581     }
   579     }
   582 
   580 
   583     /*
   581     /*
   584      * Possible values for "how" argument in xfer method.
   582      * Possible values for "how" argument in xfer method.
   585      */
   583      */
   586     private static final int NOW   = 0; // for untimed poll, tryTransfer
   584     private static final int NOW   = 0; // for untimed poll, tryTransfer
   587     private static final int ASYNC = 1; // for offer, put, add
   585     private static final int ASYNC = 1; // for offer, put, add
   588     private static final int SYNC  = 2; // for transfer, take
   586     private static final int SYNC  = 2; // for transfer, take
   589     private static final int TIMED = 3; // for timed poll, tryTransfer
   587     private static final int TIMED = 3; // for timed poll, tryTransfer
   590 
       
   591     @SuppressWarnings("unchecked")
       
   592     static <E> E cast(Object item) {
       
   593         // assert item == null || item.getClass() != Node.class;
       
   594         return (E) item;
       
   595     }
       
   596 
   588 
   597     /**
   589     /**
   598      * Implements all queuing methods. See above for explanation.
   590      * Implements all queuing methods. See above for explanation.
   599      *
   591      *
   600      * @param e the item or null for take
   592      * @param e the item or null for take
   628                             if ((h = head)   == null ||
   620                             if ((h = head)   == null ||
   629                                 (q = h.next) == null || !q.isMatched())
   621                                 (q = h.next) == null || !q.isMatched())
   630                                 break;        // unless slack < 2
   622                                 break;        // unless slack < 2
   631                         }
   623                         }
   632                         LockSupport.unpark(p.waiter);
   624                         LockSupport.unpark(p.waiter);
   633                         return LinkedTransferQueue.<E>cast(item);
   625                         @SuppressWarnings("unchecked") E itemE = (E) item;
       
   626                         return itemE;
   634                     }
   627                     }
   635                 }
   628                 }
   636                 Node n = p.next;
   629                 Node n = p.next;
   637                 p = (p != n) ? n : (h = head); // Use head if p offlist
   630                 p = (p != n) ? n : (h = head); // Use head if p offlist
   638             }
   631             }
   706         for (;;) {
   699         for (;;) {
   707             Object item = s.item;
   700             Object item = s.item;
   708             if (item != e) {                  // matched
   701             if (item != e) {                  // matched
   709                 // assert item != s;
   702                 // assert item != s;
   710                 s.forgetContents();           // avoid garbage
   703                 s.forgetContents();           // avoid garbage
   711                 return LinkedTransferQueue.<E>cast(item);
   704                 @SuppressWarnings("unchecked") E itemE = (E) item;
   712             }
   705                 return itemE;
   713             if ((w.isInterrupted() || (timed && nanos <= 0)) &&
   706             }
   714                     s.casItem(e, s)) {        // cancel
   707             else if (w.isInterrupted() || (timed && nanos <= 0L)) {
   715                 unsplice(pred, s);
   708                 unsplice(pred, s);           // try to unlink and cancel
   716                 return e;
   709                 if (s.casItem(e, s))         // return normally if lost CAS
   717             }
   710                     return e;
   718 
   711             }
   719             if (spins < 0) {                  // establish spins at/near front
   712             else if (spins < 0) {            // establish spins at/near front
   720                 if ((spins = spinsFor(pred, s.isData)) > 0)
   713                 if ((spins = spinsFor(pred, s.isData)) > 0)
   721                     randomYields = ThreadLocalRandom.current();
   714                     randomYields = ThreadLocalRandom.current();
   722             }
   715             }
   723             else if (spins > 0) {             // spin
   716             else if (spins > 0) {             // spin
   724                 --spins;
   717                 --spins;
   766         Node next = p.next;
   759         Node next = p.next;
   767         return (p == next) ? head : next;
   760         return (p == next) ? head : next;
   768     }
   761     }
   769 
   762 
   770     /**
   763     /**
   771      * Returns the first unmatched node of the given mode, or null if
   764      * Returns the first unmatched data node, or null if none.
   772      * none.  Used by methods isEmpty, hasWaitingConsumer.
   765      * Callers must recheck if the returned node's item field is null
   773      */
   766      * or self-linked before using.
   774     private Node firstOfMode(boolean isData) {
       
   775         for (Node p = head; p != null; p = succ(p)) {
       
   776             if (!p.isMatched())
       
   777                 return (p.isData == isData) ? p : null;
       
   778         }
       
   779         return null;
       
   780     }
       
   781 
       
   782     /**
       
   783      * Version of firstOfMode used by Spliterator. Callers must
       
   784      * recheck if the returned node's item field is null or
       
   785      * self-linked before using.
       
   786      */
   767      */
   787     final Node firstDataNode() {
   768     final Node firstDataNode() {
   788         for (Node p = head; p != null;) {
   769         restartFromHead: for (;;) {
   789             Object item = p.item;
   770             for (Node p = head; p != null;) {
   790             if (p.isData) {
   771                 Object item = p.item;
   791                 if (item != null && item != p)
   772                 if (p.isData) {
   792                     return p;
   773                     if (item != null && item != p)
   793             }
   774                         return p;
   794             else if (item == null)
   775                 }
   795                 break;
   776                 else if (item == null)
   796             if (p == (p = p.next))
   777                     break;
   797                 p = head;
   778                 if (p == (p = p.next))
   798         }
   779                     continue restartFromHead;
   799         return null;
   780             }
   800     }
   781             return null;
   801 
   782         }
   802     /**
       
   803      * Returns the item in the first unmatched node with isData; or
       
   804      * null if none.  Used by peek.
       
   805      */
       
   806     private E firstDataItem() {
       
   807         for (Node p = head; p != null; p = succ(p)) {
       
   808             Object item = p.item;
       
   809             if (p.isData) {
       
   810                 if (item != null && item != p)
       
   811                     return LinkedTransferQueue.<E>cast(item);
       
   812             }
       
   813             else if (item == null)
       
   814                 return null;
       
   815         }
       
   816         return null;
       
   817     }
   783     }
   818 
   784 
   819     /**
   785     /**
   820      * Traverses and counts unmatched nodes of the given mode.
   786      * Traverses and counts unmatched nodes of the given mode.
   821      * Used by methods size and getWaitingConsumerCount.
   787      * Used by methods size and getWaitingConsumerCount.
   822      */
   788      */
   823     private int countOfMode(boolean data) {
   789     private int countOfMode(boolean data) {
   824         int count = 0;
   790         restartFromHead: for (;;) {
   825         for (Node p = head; p != null; ) {
   791             int count = 0;
   826             if (!p.isMatched()) {
   792             for (Node p = head; p != null;) {
   827                 if (p.isData != data)
   793                 if (!p.isMatched()) {
   828                     return 0;
   794                     if (p.isData != data)
   829                 if (++count == Integer.MAX_VALUE) // saturated
   795                         return 0;
       
   796                     if (++count == Integer.MAX_VALUE)
       
   797                         break;  // @see Collection.size()
       
   798                 }
       
   799                 if (p == (p = p.next))
       
   800                     continue restartFromHead;
       
   801             }
       
   802             return count;
       
   803         }
       
   804     }
       
   805 
       
   806     public String toString() {
       
   807         String[] a = null;
       
   808         restartFromHead: for (;;) {
       
   809             int charLength = 0;
       
   810             int size = 0;
       
   811             for (Node p = head; p != null;) {
       
   812                 Object item = p.item;
       
   813                 if (p.isData) {
       
   814                     if (item != null && item != p) {
       
   815                         if (a == null)
       
   816                             a = new String[4];
       
   817                         else if (size == a.length)
       
   818                             a = Arrays.copyOf(a, 2 * size);
       
   819                         String s = item.toString();
       
   820                         a[size++] = s;
       
   821                         charLength += s.length();
       
   822                     }
       
   823                 } else if (item == null)
   830                     break;
   824                     break;
   831             }
   825                 if (p == (p = p.next))
   832             Node n = p.next;
   826                     continue restartFromHead;
   833             if (n != p)
   827             }
   834                 p = n;
   828 
   835             else {
   829             if (size == 0)
   836                 count = 0;
   830                 return "[]";
   837                 p = head;
   831 
   838             }
   832             return Helpers.toString(a, size, charLength);
   839         }
   833         }
   840         return count;
   834     }
       
   835 
       
   836     private Object[] toArrayInternal(Object[] a) {
       
   837         Object[] x = a;
       
   838         restartFromHead: for (;;) {
       
   839             int size = 0;
       
   840             for (Node p = head; p != null;) {
       
   841                 Object item = p.item;
       
   842                 if (p.isData) {
       
   843                     if (item != null && item != p) {
       
   844                         if (x == null)
       
   845                             x = new Object[4];
       
   846                         else if (size == x.length)
       
   847                             x = Arrays.copyOf(x, 2 * (size + 4));
       
   848                         x[size++] = item;
       
   849                     }
       
   850                 } else if (item == null)
       
   851                     break;
       
   852                 if (p == (p = p.next))
       
   853                     continue restartFromHead;
       
   854             }
       
   855             if (x == null)
       
   856                 return new Object[0];
       
   857             else if (a != null && size <= a.length) {
       
   858                 if (a != x)
       
   859                     System.arraycopy(x, 0, a, 0, size);
       
   860                 if (size < a.length)
       
   861                     a[size] = null;
       
   862                 return a;
       
   863             }
       
   864             return (size == x.length) ? x : Arrays.copyOf(x, size);
       
   865         }
       
   866     }
       
   867 
       
   868     /**
       
   869      * Returns an array containing all of the elements in this queue, in
       
   870      * proper sequence.
       
   871      *
       
   872      * <p>The returned array will be "safe" in that no references to it are
       
   873      * maintained by this queue.  (In other words, this method must allocate
       
   874      * a new array).  The caller is thus free to modify the returned array.
       
   875      *
       
   876      * <p>This method acts as bridge between array-based and collection-based
       
   877      * APIs.
       
   878      *
       
   879      * @return an array containing all of the elements in this queue
       
   880      */
       
   881     public Object[] toArray() {
       
   882         return toArrayInternal(null);
       
   883     }
       
   884 
       
   885     /**
       
   886      * Returns an array containing all of the elements in this queue, in
       
   887      * proper sequence; the runtime type of the returned array is that of
       
   888      * the specified array.  If the queue fits in the specified array, it
       
   889      * is returned therein.  Otherwise, a new array is allocated with the
       
   890      * runtime type of the specified array and the size of this queue.
       
   891      *
       
   892      * <p>If this queue fits in the specified array with room to spare
       
   893      * (i.e., the array has more elements than this queue), the element in
       
   894      * the array immediately following the end of the queue is set to
       
   895      * {@code null}.
       
   896      *
       
   897      * <p>Like the {@link #toArray()} method, this method acts as bridge between
       
   898      * array-based and collection-based APIs.  Further, this method allows
       
   899      * precise control over the runtime type of the output array, and may,
       
   900      * under certain circumstances, be used to save allocation costs.
       
   901      *
       
   902      * <p>Suppose {@code x} is a queue known to contain only strings.
       
   903      * The following code can be used to dump the queue into a newly
       
   904      * allocated array of {@code String}:
       
   905      *
       
   906      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
       
   907      *
       
   908      * Note that {@code toArray(new Object[0])} is identical in function to
       
   909      * {@code toArray()}.
       
   910      *
       
   911      * @param a the array into which the elements of the queue are to
       
   912      *          be stored, if it is big enough; otherwise, a new array of the
       
   913      *          same runtime type is allocated for this purpose
       
   914      * @return an array containing all of the elements in this queue
       
   915      * @throws ArrayStoreException if the runtime type of the specified array
       
   916      *         is not a supertype of the runtime type of every element in
       
   917      *         this queue
       
   918      * @throws NullPointerException if the specified array is null
       
   919      */
       
   920     @SuppressWarnings("unchecked")
       
   921     public <T> T[] toArray(T[] a) {
       
   922         if (a == null) throw new NullPointerException();
       
   923         return (T[]) toArrayInternal(a);
   841     }
   924     }
   842 
   925 
   843     final class Itr implements Iterator<E> {
   926     final class Itr implements Iterator<E> {
   844         private Node nextNode;   // next node to return item for
   927         private Node nextNode;   // next node to return item for
   845         private E nextItem;      // the corresponding item
   928         private E nextItem;      // the corresponding item
   884                     continue;
   967                     continue;
   885                 }
   968                 }
   886                 Object item = s.item;
   969                 Object item = s.item;
   887                 if (s.isData) {
   970                 if (s.isData) {
   888                     if (item != null && item != s) {
   971                     if (item != null && item != s) {
   889                         nextItem = LinkedTransferQueue.<E>cast(item);
   972                         @SuppressWarnings("unchecked") E itemE = (E) item;
       
   973                         nextItem = itemE;
   890                         nextNode = s;
   974                         nextNode = s;
   891                         return;
   975                         return;
   892                     }
   976                     }
   893                 }
   977                 }
   894                 else if (item == null)
   978                 else if (item == null)
   932                 unsplice(lastPred, lastRet);
  1016                 unsplice(lastPred, lastRet);
   933         }
  1017         }
   934     }
  1018     }
   935 
  1019 
   936     /** A customized variant of Spliterators.IteratorSpliterator */
  1020     /** A customized variant of Spliterators.IteratorSpliterator */
   937     static final class LTQSpliterator<E> implements Spliterator<E> {
  1021     final class LTQSpliterator<E> implements Spliterator<E> {
   938         static final int MAX_BATCH = 1 << 25;  // max batch array size;
  1022         static final int MAX_BATCH = 1 << 25;  // max batch array size;
   939         final LinkedTransferQueue<E> queue;
  1023         Node current;       // current node; null until initialized
   940         Node current;    // current node; null until initialized
       
   941         int batch;          // batch size for splits
  1024         int batch;          // batch size for splits
   942         boolean exhausted;  // true when no more nodes
  1025         boolean exhausted;  // true when no more nodes
   943         LTQSpliterator(LinkedTransferQueue<E> queue) {
  1026         LTQSpliterator() {}
   944             this.queue = queue;
       
   945         }
       
   946 
  1027 
   947         public Spliterator<E> trySplit() {
  1028         public Spliterator<E> trySplit() {
   948             Node p;
  1029             Node p;
   949             final LinkedTransferQueue<E> q = this.queue;
       
   950             int b = batch;
  1030             int b = batch;
   951             int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
  1031             int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
   952             if (!exhausted &&
  1032             if (!exhausted &&
   953                 ((p = current) != null || (p = q.firstDataNode()) != null) &&
  1033                 ((p = current) != null || (p = firstDataNode()) != null) &&
   954                 p.next != null) {
  1034                 p.next != null) {
   955                 Object[] a = new Object[n];
  1035                 Object[] a = new Object[n];
   956                 int i = 0;
  1036                 int i = 0;
   957                 do {
  1037                 do {
   958                     Object e = p.item;
  1038                     Object e = p.item;
   959                     if (e != p && (a[i] = e) != null)
  1039                     if (e != p && (a[i] = e) != null)
   960                         ++i;
  1040                         ++i;
   961                     if (p == (p = p.next))
  1041                     if (p == (p = p.next))
   962                         p = q.firstDataNode();
  1042                         p = firstDataNode();
   963                 } while (p != null && i < n && p.isData);
  1043                 } while (p != null && i < n && p.isData);
   964                 if ((current = p) == null)
  1044                 if ((current = p) == null)
   965                     exhausted = true;
  1045                     exhausted = true;
   966                 if (i > 0) {
  1046                 if (i > 0) {
   967                     batch = i;
  1047                     batch = i;
   968                     return Spliterators.spliterator
  1048                     return Spliterators.spliterator
   969                         (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
  1049                         (a, 0, i, (Spliterator.ORDERED |
   970                          Spliterator.CONCURRENT);
  1050                                    Spliterator.NONNULL |
       
  1051                                    Spliterator.CONCURRENT));
   971                 }
  1052                 }
   972             }
  1053             }
   973             return null;
  1054             return null;
   974         }
  1055         }
   975 
  1056 
   976         @SuppressWarnings("unchecked")
  1057         @SuppressWarnings("unchecked")
   977         public void forEachRemaining(Consumer<? super E> action) {
  1058         public void forEachRemaining(Consumer<? super E> action) {
   978             Node p;
  1059             Node p;
   979             if (action == null) throw new NullPointerException();
  1060             if (action == null) throw new NullPointerException();
   980             final LinkedTransferQueue<E> q = this.queue;
       
   981             if (!exhausted &&
  1061             if (!exhausted &&
   982                 ((p = current) != null || (p = q.firstDataNode()) != null)) {
  1062                 ((p = current) != null || (p = firstDataNode()) != null)) {
   983                 exhausted = true;
  1063                 exhausted = true;
   984                 do {
  1064                 do {
   985                     Object e = p.item;
  1065                     Object e = p.item;
   986                     if (e != null && e != p)
  1066                     if (e != null && e != p)
   987                         action.accept((E)e);
  1067                         action.accept((E)e);
   988                     if (p == (p = p.next))
  1068                     if (p == (p = p.next))
   989                         p = q.firstDataNode();
  1069                         p = firstDataNode();
   990                 } while (p != null && p.isData);
  1070                 } while (p != null && p.isData);
   991             }
  1071             }
   992         }
  1072         }
   993 
  1073 
   994         @SuppressWarnings("unchecked")
  1074         @SuppressWarnings("unchecked")
   995         public boolean tryAdvance(Consumer<? super E> action) {
  1075         public boolean tryAdvance(Consumer<? super E> action) {
   996             Node p;
  1076             Node p;
   997             if (action == null) throw new NullPointerException();
  1077             if (action == null) throw new NullPointerException();
   998             final LinkedTransferQueue<E> q = this.queue;
       
   999             if (!exhausted &&
  1078             if (!exhausted &&
  1000                 ((p = current) != null || (p = q.firstDataNode()) != null)) {
  1079                 ((p = current) != null || (p = firstDataNode()) != null)) {
  1001                 Object e;
  1080                 Object e;
  1002                 do {
  1081                 do {
  1003                     if ((e = p.item) == p)
  1082                     if ((e = p.item) == p)
  1004                         e = null;
  1083                         e = null;
  1005                     if (p == (p = p.next))
  1084                     if (p == (p = p.next))
  1006                         p = q.firstDataNode();
  1085                         p = firstDataNode();
  1007                 } while (e == null && p != null && p.isData);
  1086                 } while (e == null && p != null && p.isData);
  1008                 if ((current = p) == null)
  1087                 if ((current = p) == null)
  1009                     exhausted = true;
  1088                     exhausted = true;
  1010                 if (e != null) {
  1089                 if (e != null) {
  1011                     action.accept((E)e);
  1090                     action.accept((E)e);
  1038      *
  1117      *
  1039      * @return a {@code Spliterator} over the elements in this queue
  1118      * @return a {@code Spliterator} over the elements in this queue
  1040      * @since 1.8
  1119      * @since 1.8
  1041      */
  1120      */
  1042     public Spliterator<E> spliterator() {
  1121     public Spliterator<E> spliterator() {
  1043         return new LTQSpliterator<E>(this);
  1122         return new LTQSpliterator<E>();
  1044     }
  1123     }
  1045 
  1124 
  1046     /* -------------- Removal methods -------------- */
  1125     /* -------------- Removal methods -------------- */
  1047 
  1126 
  1048     /**
  1127     /**
  1052      * @param pred a node that was at one time known to be the
  1131      * @param pred a node that was at one time known to be the
  1053      * predecessor of s, or null or s itself if s is/was at head
  1132      * predecessor of s, or null or s itself if s is/was at head
  1054      * @param s the node to be unspliced
  1133      * @param s the node to be unspliced
  1055      */
  1134      */
  1056     final void unsplice(Node pred, Node s) {
  1135     final void unsplice(Node pred, Node s) {
  1057         s.forgetContents(); // forget unneeded fields
  1136         s.waiter = null; // disable signals
  1058         /*
  1137         /*
  1059          * See above for rationale. Briefly: if pred still points to
  1138          * See above for rationale. Briefly: if pred still points to
  1060          * s, try to unlink s.  If s cannot be unlinked, because it is
  1139          * s, try to unlink s.  If s cannot be unlinked, because it is
  1061          * trailing node or pred might be unlinked, and neither pred
  1140          * trailing node or pred might be unlinked, and neither pred
  1062          * nor s are head or offlist, add to sweepVotes, and if enough
  1141          * nor s are head or offlist, add to sweepVotes, and if enough
  1330     public Iterator<E> iterator() {
  1409     public Iterator<E> iterator() {
  1331         return new Itr();
  1410         return new Itr();
  1332     }
  1411     }
  1333 
  1412 
  1334     public E peek() {
  1413     public E peek() {
  1335         return firstDataItem();
  1414         restartFromHead: for (;;) {
       
  1415             for (Node p = head; p != null;) {
       
  1416                 Object item = p.item;
       
  1417                 if (p.isData) {
       
  1418                     if (item != null && item != p) {
       
  1419                         @SuppressWarnings("unchecked") E e = (E) item;
       
  1420                         return e;
       
  1421                     }
       
  1422                 }
       
  1423                 else if (item == null)
       
  1424                     break;
       
  1425                 if (p == (p = p.next))
       
  1426                     continue restartFromHead;
       
  1427             }
       
  1428             return null;
       
  1429         }
  1336     }
  1430     }
  1337 
  1431 
  1338     /**
  1432     /**
  1339      * Returns {@code true} if this queue contains no elements.
  1433      * Returns {@code true} if this queue contains no elements.
  1340      *
  1434      *
  1341      * @return {@code true} if this queue contains no elements
  1435      * @return {@code true} if this queue contains no elements
  1342      */
  1436      */
  1343     public boolean isEmpty() {
  1437     public boolean isEmpty() {
  1344         for (Node p = head; p != null; p = succ(p)) {
  1438         return firstDataNode() == null;
  1345             if (!p.isMatched())
       
  1346                 return !p.isData;
       
  1347         }
       
  1348         return true;
       
  1349     }
  1439     }
  1350 
  1440 
  1351     public boolean hasWaitingConsumer() {
  1441     public boolean hasWaitingConsumer() {
  1352         return firstOfMode(false) != null;
  1442         restartFromHead: for (;;) {
       
  1443             for (Node p = head; p != null;) {
       
  1444                 Object item = p.item;
       
  1445                 if (p.isData) {
       
  1446                     if (item != null && item != p)
       
  1447                         break;
       
  1448                 }
       
  1449                 else if (item == null)
       
  1450                     return true;
       
  1451                 if (p == (p = p.next))
       
  1452                     continue restartFromHead;
       
  1453             }
       
  1454             return false;
       
  1455         }
  1353     }
  1456     }
  1354 
  1457 
  1355     /**
  1458     /**
  1356      * Returns the number of elements in this queue.  If this queue
  1459      * Returns the number of elements in this queue.  If this queue
  1357      * contains more than {@code Integer.MAX_VALUE} elements, returns
  1460      * contains more than {@code Integer.MAX_VALUE} elements, returns
  1394      *
  1497      *
  1395      * @param o object to be checked for containment in this queue
  1498      * @param o object to be checked for containment in this queue
  1396      * @return {@code true} if this queue contains the specified element
  1499      * @return {@code true} if this queue contains the specified element
  1397      */
  1500      */
  1398     public boolean contains(Object o) {
  1501     public boolean contains(Object o) {
  1399         if (o == null) return false;
  1502         if (o != null) {
  1400         for (Node p = head; p != null; p = succ(p)) {
  1503             for (Node p = head; p != null; p = succ(p)) {
  1401             Object item = p.item;
  1504                 Object item = p.item;
  1402             if (p.isData) {
  1505                 if (p.isData) {
  1403                 if (item != null && item != p && o.equals(item))
  1506                     if (item != null && item != p && o.equals(item))
  1404                     return true;
  1507                         return true;
  1405             }
  1508                 }
  1406             else if (item == null)
  1509                 else if (item == null)
  1407                 break;
  1510                     break;
       
  1511             }
  1408         }
  1512         }
  1409         return false;
  1513         return false;
  1410     }
  1514     }
  1411 
  1515 
  1412     /**
  1516     /**
  1458         }
  1562         }
  1459     }
  1563     }
  1460 
  1564 
  1461     // Unsafe mechanics
  1565     // Unsafe mechanics
  1462 
  1566 
  1463     private static final sun.misc.Unsafe UNSAFE;
  1567     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
  1464     private static final long headOffset;
  1568     private static final long HEAD;
  1465     private static final long tailOffset;
  1569     private static final long TAIL;
  1466     private static final long sweepVotesOffset;
  1570     private static final long SWEEPVOTES;
  1467     static {
  1571     static {
  1468         try {
  1572         try {
  1469             UNSAFE = sun.misc.Unsafe.getUnsafe();
  1573             HEAD = U.objectFieldOffset
  1470             Class<?> k = LinkedTransferQueue.class;
  1574                 (LinkedTransferQueue.class.getDeclaredField("head"));
  1471             headOffset = UNSAFE.objectFieldOffset
  1575             TAIL = U.objectFieldOffset
  1472                 (k.getDeclaredField("head"));
  1576                 (LinkedTransferQueue.class.getDeclaredField("tail"));
  1473             tailOffset = UNSAFE.objectFieldOffset
  1577             SWEEPVOTES = U.objectFieldOffset
  1474                 (k.getDeclaredField("tail"));
  1578                 (LinkedTransferQueue.class.getDeclaredField("sweepVotes"));
  1475             sweepVotesOffset = UNSAFE.objectFieldOffset
  1579         } catch (ReflectiveOperationException e) {
  1476                 (k.getDeclaredField("sweepVotes"));
       
  1477         } catch (Exception e) {
       
  1478             throw new Error(e);
  1580             throw new Error(e);
  1479         }
  1581         }
       
  1582 
       
  1583         // Reduce the risk of rare disastrous classloading in first call to
       
  1584         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
       
  1585         Class<?> ensureLoaded = LockSupport.class;
  1480     }
  1586     }
  1481 }
  1587 }