jdk/src/java.base/share/classes/java/util/concurrent/LinkedBlockingQueue.java
changeset 42926 8b9cacdadb2d
parent 32991 b27c76b82713
child 43521 60e247b8d9a4
equal deleted inserted replaced
42925:5ea771a26665 42926:8b9cacdadb2d
    37 
    37 
    38 import java.util.AbstractQueue;
    38 import java.util.AbstractQueue;
    39 import java.util.Collection;
    39 import java.util.Collection;
    40 import java.util.Iterator;
    40 import java.util.Iterator;
    41 import java.util.NoSuchElementException;
    41 import java.util.NoSuchElementException;
       
    42 import java.util.Objects;
    42 import java.util.Spliterator;
    43 import java.util.Spliterator;
    43 import java.util.Spliterators;
    44 import java.util.Spliterators;
    44 import java.util.concurrent.atomic.AtomicInteger;
    45 import java.util.concurrent.atomic.AtomicInteger;
    45 import java.util.concurrent.locks.Condition;
    46 import java.util.concurrent.locks.Condition;
    46 import java.util.concurrent.locks.ReentrantLock;
    47 import java.util.concurrent.locks.ReentrantLock;
   232     void fullyUnlock() {
   233     void fullyUnlock() {
   233         takeLock.unlock();
   234         takeLock.unlock();
   234         putLock.unlock();
   235         putLock.unlock();
   235     }
   236     }
   236 
   237 
   237 //     /**
       
   238 //      * Tells whether both locks are held by current thread.
       
   239 //      */
       
   240 //     boolean isFullyLocked() {
       
   241 //         return (putLock.isHeldByCurrentThread() &&
       
   242 //                 takeLock.isHeldByCurrentThread());
       
   243 //     }
       
   244 
       
   245     /**
   238     /**
   246      * Creates a {@code LinkedBlockingQueue} with a capacity of
   239      * Creates a {@code LinkedBlockingQueue} with a capacity of
   247      * {@link Integer#MAX_VALUE}.
   240      * {@link Integer#MAX_VALUE}.
   248      */
   241      */
   249     public LinkedBlockingQueue() {
   242     public LinkedBlockingQueue() {
   515 
   508 
   516     /**
   509     /**
   517      * Unlinks interior Node p with predecessor trail.
   510      * Unlinks interior Node p with predecessor trail.
   518      */
   511      */
   519     void unlink(Node<E> p, Node<E> trail) {
   512     void unlink(Node<E> p, Node<E> trail) {
   520         // assert isFullyLocked();
   513         // assert putLock.isHeldByCurrentThread();
       
   514         // assert takeLock.isHeldByCurrentThread();
   521         // p.next is not changed, to allow iterators that are
   515         // p.next is not changed, to allow iterators that are
   522         // traversing p to maintain their weak-consistency guarantee.
   516         // traversing p to maintain their weak-consistency guarantee.
   523         p.item = null;
   517         p.item = null;
   524         trail.next = p.next;
   518         trail.next = p.next;
   525         if (last == p)
   519         if (last == p)
   699      * @throws ClassCastException            {@inheritDoc}
   693      * @throws ClassCastException            {@inheritDoc}
   700      * @throws NullPointerException          {@inheritDoc}
   694      * @throws NullPointerException          {@inheritDoc}
   701      * @throws IllegalArgumentException      {@inheritDoc}
   695      * @throws IllegalArgumentException      {@inheritDoc}
   702      */
   696      */
   703     public int drainTo(Collection<? super E> c, int maxElements) {
   697     public int drainTo(Collection<? super E> c, int maxElements) {
   704         if (c == null)
   698         Objects.requireNonNull(c);
   705             throw new NullPointerException();
       
   706         if (c == this)
   699         if (c == this)
   707             throw new IllegalArgumentException();
   700             throw new IllegalArgumentException();
   708         if (maxElements <= 0)
   701         if (maxElements <= 0)
   709             return 0;
   702             return 0;
   710         boolean signalNotFull = false;
   703         boolean signalNotFull = false;
   739                 signalNotFull();
   732                 signalNotFull();
   740         }
   733         }
   741     }
   734     }
   742 
   735 
   743     /**
   736     /**
       
   737      * Used for any element traversal that is not entirely under lock.
       
   738      * Such traversals must handle both:
       
   739      * - dequeued nodes (p.next == p)
       
   740      * - (possibly multiple) interior removed nodes (p.item == null)
       
   741      */
       
   742     Node<E> succ(Node<E> p) {
       
   743         return (p == (p = p.next)) ? head.next : p;
       
   744     }
       
   745 
       
   746     /**
   744      * Returns an iterator over the elements in this queue in proper sequence.
   747      * Returns an iterator over the elements in this queue in proper sequence.
   745      * The elements will be returned in order from first (head) to last (tail).
   748      * The elements will be returned in order from first (head) to last (tail).
   746      *
   749      *
   747      * <p>The returned iterator is
   750      * <p>The returned iterator is
   748      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
   751      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
   758          * Basic weakly-consistent iterator.  At all times hold the next
   761          * Basic weakly-consistent iterator.  At all times hold the next
   759          * item to hand out so that if hasNext() reports true, we will
   762          * item to hand out so that if hasNext() reports true, we will
   760          * still have it to return even if lost race with a take etc.
   763          * still have it to return even if lost race with a take etc.
   761          */
   764          */
   762 
   765 
   763         private Node<E> current;
   766         private Node<E> next;
       
   767         private E nextItem;
   764         private Node<E> lastRet;
   768         private Node<E> lastRet;
   765         private E currentElement;
       
   766 
   769 
   767         Itr() {
   770         Itr() {
   768             fullyLock();
   771             fullyLock();
   769             try {
   772             try {
   770                 current = head.next;
   773                 if ((next = head.next) != null)
   771                 if (current != null)
   774                     nextItem = next.item;
   772                     currentElement = current.item;
       
   773             } finally {
   775             } finally {
   774                 fullyUnlock();
   776                 fullyUnlock();
   775             }
   777             }
   776         }
   778         }
   777 
   779 
   778         public boolean hasNext() {
   780         public boolean hasNext() {
   779             return current != null;
   781             return next != null;
   780         }
   782         }
   781 
   783 
   782         public E next() {
   784         public E next() {
       
   785             Node<E> p;
       
   786             if ((p = next) == null)
       
   787                 throw new NoSuchElementException();
       
   788             lastRet = p;
       
   789             E x = nextItem;
   783             fullyLock();
   790             fullyLock();
   784             try {
   791             try {
   785                 if (current == null)
   792                 E e = null;
   786                     throw new NoSuchElementException();
   793                 for (p = p.next; p != null && (e = p.item) == null; )
   787                 lastRet = current;
   794                     p = succ(p);
   788                 E item = null;
   795                 next = p;
   789                 // Unlike other traversal methods, iterators must handle both:
   796                 nextItem = e;
   790                 // - dequeued nodes (p.next == p)
       
   791                 // - (possibly multiple) interior removed nodes (p.item == null)
       
   792                 for (Node<E> p = current, q;; p = q) {
       
   793                     if ((q = p.next) == p)
       
   794                         q = head.next;
       
   795                     if (q == null || (item = q.item) != null) {
       
   796                         current = q;
       
   797                         E x = currentElement;
       
   798                         currentElement = item;
       
   799                         return x;
       
   800                     }
       
   801                 }
       
   802             } finally {
   797             } finally {
   803                 fullyUnlock();
   798                 fullyUnlock();
   804             }
   799             }
       
   800             return x;
       
   801         }
       
   802 
       
   803         public void forEachRemaining(Consumer<? super E> action) {
       
   804             // A variant of forEachFrom
       
   805             Objects.requireNonNull(action);
       
   806             Node<E> p;
       
   807             if ((p = next) == null) return;
       
   808             lastRet = p;
       
   809             next = null;
       
   810             final int batchSize = 32;
       
   811             Object[] es = null;
       
   812             int n, len = 1;
       
   813             do {
       
   814                 fullyLock();
       
   815                 try {
       
   816                     if (es == null) {
       
   817                         p = p.next;
       
   818                         for (Node<E> q = p; q != null; q = succ(q))
       
   819                             if (q.item != null && ++len == batchSize)
       
   820                                 break;
       
   821                         es = new Object[len];
       
   822                         es[0] = nextItem;
       
   823                         nextItem = null;
       
   824                         n = 1;
       
   825                     } else
       
   826                         n = 0;
       
   827                     for (; p != null && n < len; p = succ(p))
       
   828                         if ((es[n] = p.item) != null) {
       
   829                             lastRet = p;
       
   830                             n++;
       
   831                         }
       
   832                 } finally {
       
   833                     fullyUnlock();
       
   834                 }
       
   835                 for (int i = 0; i < n; i++) {
       
   836                     @SuppressWarnings("unchecked") E e = (E) es[i];
       
   837                     action.accept(e);
       
   838                 }
       
   839             } while (n > 0 && p != null);
   805         }
   840         }
   806 
   841 
   807         public void remove() {
   842         public void remove() {
   808             if (lastRet == null)
   843             if (lastRet == null)
   809                 throw new IllegalStateException();
   844                 throw new IllegalStateException();
   823                 fullyUnlock();
   858                 fullyUnlock();
   824             }
   859             }
   825         }
   860         }
   826     }
   861     }
   827 
   862 
   828     /** A customized variant of Spliterators.IteratorSpliterator */
   863     /**
   829     static final class LBQSpliterator<E> implements Spliterator<E> {
   864      * A customized variant of Spliterators.IteratorSpliterator.
       
   865      * Keep this class in sync with (very similar) LBDSpliterator.
       
   866      */
       
   867     private final class LBQSpliterator implements Spliterator<E> {
   830         static final int MAX_BATCH = 1 << 25;  // max batch array size;
   868         static final int MAX_BATCH = 1 << 25;  // max batch array size;
   831         final LinkedBlockingQueue<E> queue;
       
   832         Node<E> current;    // current node; null until initialized
   869         Node<E> current;    // current node; null until initialized
   833         int batch;          // batch size for splits
   870         int batch;          // batch size for splits
   834         boolean exhausted;  // true when no more nodes
   871         boolean exhausted;  // true when no more nodes
   835         long est;           // size estimate
   872         long est = size();  // size estimate
   836         LBQSpliterator(LinkedBlockingQueue<E> queue) {
   873 
   837             this.queue = queue;
   874         LBQSpliterator() {}
   838             this.est = queue.size();
       
   839         }
       
   840 
   875 
   841         public long estimateSize() { return est; }
   876         public long estimateSize() { return est; }
   842 
   877 
   843         public Spliterator<E> trySplit() {
   878         public Spliterator<E> trySplit() {
   844             Node<E> h;
   879             Node<E> h;
   845             final LinkedBlockingQueue<E> q = this.queue;
       
   846             int b = batch;
   880             int b = batch;
   847             int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
   881             int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
   848             if (!exhausted &&
   882             if (!exhausted &&
   849                 ((h = current) != null || (h = q.head.next) != null) &&
   883                 ((h = current) != null || (h = head.next) != null)
   850                 h.next != null) {
   884                 && h.next != null) {
   851                 Object[] a = new Object[n];
   885                 Object[] a = new Object[n];
   852                 int i = 0;
   886                 int i = 0;
   853                 Node<E> p = current;
   887                 Node<E> p = current;
   854                 q.fullyLock();
   888                 fullyLock();
   855                 try {
   889                 try {
   856                     if (p != null || (p = q.head.next) != null) {
   890                     if (p != null || (p = head.next) != null)
   857                         do {
   891                         for (; p != null && i < n; p = succ(p))
   858                             if ((a[i] = p.item) != null)
   892                             if ((a[i] = p.item) != null)
   859                                 ++i;
   893                                 i++;
   860                         } while ((p = p.next) != null && i < n);
       
   861                     }
       
   862                 } finally {
   894                 } finally {
   863                     q.fullyUnlock();
   895                     fullyUnlock();
   864                 }
   896                 }
   865                 if ((current = p) == null) {
   897                 if ((current = p) == null) {
   866                     est = 0L;
   898                     est = 0L;
   867                     exhausted = true;
   899                     exhausted = true;
   868                 }
   900                 }
   877                 }
   909                 }
   878             }
   910             }
   879             return null;
   911             return null;
   880         }
   912         }
   881 
   913 
       
   914         public boolean tryAdvance(Consumer<? super E> action) {
       
   915             Objects.requireNonNull(action);
       
   916             if (!exhausted) {
       
   917                 E e = null;
       
   918                 fullyLock();
       
   919                 try {
       
   920                     Node<E> p;
       
   921                     if ((p = current) != null || (p = head.next) != null)
       
   922                         do {
       
   923                             e = p.item;
       
   924                             p = succ(p);
       
   925                         } while (e == null && p != null);
       
   926                     exhausted = ((current = p) == null);
       
   927                 } finally {
       
   928                     fullyUnlock();
       
   929                 }
       
   930                 if (e != null) {
       
   931                     action.accept(e);
       
   932                     return true;
       
   933                 }
       
   934             }
       
   935             return false;
       
   936         }
       
   937 
   882         public void forEachRemaining(Consumer<? super E> action) {
   938         public void forEachRemaining(Consumer<? super E> action) {
   883             if (action == null) throw new NullPointerException();
   939             Objects.requireNonNull(action);
   884             final LinkedBlockingQueue<E> q = this.queue;
       
   885             if (!exhausted) {
   940             if (!exhausted) {
   886                 exhausted = true;
   941                 exhausted = true;
   887                 Node<E> p = current;
   942                 Node<E> p = current;
   888                 do {
   943                 current = null;
   889                     E e = null;
   944                 forEachFrom(action, p);
   890                     q.fullyLock();
   945             }
   891                     try {
       
   892                         if (p == null)
       
   893                             p = q.head.next;
       
   894                         while (p != null) {
       
   895                             e = p.item;
       
   896                             p = p.next;
       
   897                             if (e != null)
       
   898                                 break;
       
   899                         }
       
   900                     } finally {
       
   901                         q.fullyUnlock();
       
   902                     }
       
   903                     if (e != null)
       
   904                         action.accept(e);
       
   905                 } while (p != null);
       
   906             }
       
   907         }
       
   908 
       
   909         public boolean tryAdvance(Consumer<? super E> action) {
       
   910             if (action == null) throw new NullPointerException();
       
   911             final LinkedBlockingQueue<E> q = this.queue;
       
   912             if (!exhausted) {
       
   913                 E e = null;
       
   914                 q.fullyLock();
       
   915                 try {
       
   916                     if (current == null)
       
   917                         current = q.head.next;
       
   918                     while (current != null) {
       
   919                         e = current.item;
       
   920                         current = current.next;
       
   921                         if (e != null)
       
   922                             break;
       
   923                     }
       
   924                 } finally {
       
   925                     q.fullyUnlock();
       
   926                 }
       
   927                 if (current == null)
       
   928                     exhausted = true;
       
   929                 if (e != null) {
       
   930                     action.accept(e);
       
   931                     return true;
       
   932                 }
       
   933             }
       
   934             return false;
       
   935         }
   946         }
   936 
   947 
   937         public int characteristics() {
   948         public int characteristics() {
   938             return Spliterator.ORDERED | Spliterator.NONNULL |
   949             return (Spliterator.ORDERED |
   939                 Spliterator.CONCURRENT;
   950                     Spliterator.NONNULL |
       
   951                     Spliterator.CONCURRENT);
   940         }
   952         }
   941     }
   953     }
   942 
   954 
   943     /**
   955     /**
   944      * Returns a {@link Spliterator} over the elements in this queue.
   956      * Returns a {@link Spliterator} over the elements in this queue.
   955      *
   967      *
   956      * @return a {@code Spliterator} over the elements in this queue
   968      * @return a {@code Spliterator} over the elements in this queue
   957      * @since 1.8
   969      * @since 1.8
   958      */
   970      */
   959     public Spliterator<E> spliterator() {
   971     public Spliterator<E> spliterator() {
   960         return new LBQSpliterator<E>(this);
   972         return new LBQSpliterator();
       
   973     }
       
   974 
       
   975     /**
       
   976      * @throws NullPointerException {@inheritDoc}
       
   977      */
       
   978     public void forEach(Consumer<? super E> action) {
       
   979         Objects.requireNonNull(action);
       
   980         forEachFrom(action, null);
       
   981     }
       
   982 
       
   983     /**
       
   984      * Runs action on each element found during a traversal starting at p.
       
   985      * If p is null, traversal starts at head.
       
   986      */
       
   987     void forEachFrom(Consumer<? super E> action, Node<E> p) {
       
   988         // Extract batches of elements while holding the lock; then
       
   989         // run the action on the elements while not
       
   990         final int batchSize = 32;       // max number of elements per batch
       
   991         Object[] es = null;             // container for batch of elements
       
   992         int n, len = 0;
       
   993         do {
       
   994             fullyLock();
       
   995             try {
       
   996                 if (es == null) {
       
   997                     if (p == null) p = head.next;
       
   998                     for (Node<E> q = p; q != null; q = succ(q))
       
   999                         if (q.item != null && ++len == batchSize)
       
  1000                             break;
       
  1001                     es = new Object[len];
       
  1002                 }
       
  1003                 for (n = 0; p != null && n < len; p = succ(p))
       
  1004                     if ((es[n] = p.item) != null)
       
  1005                         n++;
       
  1006             } finally {
       
  1007                 fullyUnlock();
       
  1008             }
       
  1009             for (int i = 0; i < n; i++) {
       
  1010                 @SuppressWarnings("unchecked") E e = (E) es[i];
       
  1011                 action.accept(e);
       
  1012             }
       
  1013         } while (n > 0 && p != null);
   961     }
  1014     }
   962 
  1015 
   963     /**
  1016     /**
   964      * Saves this queue to a stream (that is, serializes it).
  1017      * Saves this queue to a stream (that is, serializes it).
   965      *
  1018      *