jdk/src/java.base/share/classes/java/util/concurrent/LinkedBlockingDeque.java
changeset 42926 8b9cacdadb2d
parent 42319 0193886267c3
child 43521 60e247b8d9a4
equal deleted inserted replaced
42925:5ea771a26665 42926:8b9cacdadb2d
    37 
    37 
    38 import java.util.AbstractQueue;
    38 import java.util.AbstractQueue;
    39 import java.util.Collection;
    39 import java.util.Collection;
    40 import java.util.Iterator;
    40 import java.util.Iterator;
    41 import java.util.NoSuchElementException;
    41 import java.util.NoSuchElementException;
       
    42 import java.util.Objects;
    42 import java.util.Spliterator;
    43 import java.util.Spliterator;
    43 import java.util.Spliterators;
    44 import java.util.Spliterators;
    44 import java.util.concurrent.locks.Condition;
    45 import java.util.concurrent.locks.Condition;
    45 import java.util.concurrent.locks.ReentrantLock;
    46 import java.util.concurrent.locks.ReentrantLock;
    46 import java.util.function.Consumer;
    47 import java.util.function.Consumer;
   738      * @throws ClassCastException            {@inheritDoc}
   739      * @throws ClassCastException            {@inheritDoc}
   739      * @throws NullPointerException          {@inheritDoc}
   740      * @throws NullPointerException          {@inheritDoc}
   740      * @throws IllegalArgumentException      {@inheritDoc}
   741      * @throws IllegalArgumentException      {@inheritDoc}
   741      */
   742      */
   742     public int drainTo(Collection<? super E> c, int maxElements) {
   743     public int drainTo(Collection<? super E> c, int maxElements) {
   743         if (c == null)
   744         Objects.requireNonNull(c);
   744             throw new NullPointerException();
       
   745         if (c == this)
   745         if (c == this)
   746             throw new IllegalArgumentException();
   746             throw new IllegalArgumentException();
   747         if (maxElements <= 0)
   747         if (maxElements <= 0)
   748             return 0;
   748             return 0;
   749         final ReentrantLock lock = this.lock;
   749         final ReentrantLock lock = this.lock;
   984             lock.unlock();
   984             lock.unlock();
   985         }
   985         }
   986     }
   986     }
   987 
   987 
   988     /**
   988     /**
       
   989      * Used for any element traversal that is not entirely under lock.
       
   990      * Such traversals must handle both:
       
   991      * - dequeued nodes (p.next == p)
       
   992      * - (possibly multiple) interior removed nodes (p.item == null)
       
   993      */
       
   994     Node<E> succ(Node<E> p) {
       
   995         return (p == (p = p.next)) ? first : p;
       
   996     }
       
   997 
       
   998     /**
   989      * Returns an iterator over the elements in this deque in proper sequence.
   999      * Returns an iterator over the elements in this deque in proper sequence.
   990      * The elements will be returned in order from first (head) to last (tail).
  1000      * The elements will be returned in order from first (head) to last (tail).
   991      *
  1001      *
   992      * <p>The returned iterator is
  1002      * <p>The returned iterator is
   993      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
  1003      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
  1022         Node<E> next;
  1032         Node<E> next;
  1023 
  1033 
  1024         /**
  1034         /**
  1025          * nextItem holds on to item fields because once we claim that
  1035          * nextItem holds on to item fields because once we claim that
  1026          * an element exists in hasNext(), we must return item read
  1036          * an element exists in hasNext(), we must return item read
  1027          * under lock (in advance()) even if it was in the process of
  1037          * under lock even if it was in the process of being removed
  1028          * being removed when hasNext() was called.
  1038          * when hasNext() was called.
  1029          */
  1039          */
  1030         E nextItem;
  1040         E nextItem;
  1031 
  1041 
  1032         /**
  1042         /**
  1033          * Node returned by most recent call to next. Needed by remove.
  1043          * Node returned by most recent call to next. Needed by remove.
  1036         private Node<E> lastRet;
  1046         private Node<E> lastRet;
  1037 
  1047 
  1038         abstract Node<E> firstNode();
  1048         abstract Node<E> firstNode();
  1039         abstract Node<E> nextNode(Node<E> n);
  1049         abstract Node<E> nextNode(Node<E> n);
  1040 
  1050 
       
  1051         private Node<E> succ(Node<E> p) {
       
  1052             return (p == (p = nextNode(p))) ? firstNode() : p;
       
  1053         }
       
  1054 
  1041         AbstractItr() {
  1055         AbstractItr() {
  1042             // set to initial position
  1056             // set to initial position
  1043             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
  1057             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
  1044             lock.lock();
  1058             lock.lock();
  1045             try {
  1059             try {
  1046                 next = firstNode();
  1060                 if ((next = firstNode()) != null)
  1047                 nextItem = (next == null) ? null : next.item;
  1061                     nextItem = next.item;
  1048             } finally {
  1062             } finally {
  1049                 lock.unlock();
  1063                 lock.unlock();
  1050             }
  1064             }
  1051         }
  1065         }
  1052 
  1066 
  1053         /**
  1067         public boolean hasNext() {
  1054          * Returns the successor node of the given non-null, but
  1068             return next != null;
  1055          * possibly previously deleted, node.
  1069         }
  1056          */
  1070 
  1057         private Node<E> succ(Node<E> n) {
  1071         public E next() {
  1058             // Chains of deleted nodes ending in null or self-links
  1072             Node<E> p;
  1059             // are possible if multiple interior nodes are removed.
  1073             if ((p = next) == null)
  1060             for (;;) {
  1074                 throw new NoSuchElementException();
  1061                 Node<E> s = nextNode(n);
  1075             lastRet = p;
  1062                 if (s == null)
  1076             E x = nextItem;
  1063                     return null;
       
  1064                 else if (s.item != null)
       
  1065                     return s;
       
  1066                 else if (s == n)
       
  1067                     return firstNode();
       
  1068                 else
       
  1069                     n = s;
       
  1070             }
       
  1071         }
       
  1072 
       
  1073         /**
       
  1074          * Advances next.
       
  1075          */
       
  1076         void advance() {
       
  1077             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
  1077             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
  1078             lock.lock();
  1078             lock.lock();
  1079             try {
  1079             try {
  1080                 // assert next != null;
  1080                 E e = null;
  1081                 next = succ(next);
  1081                 for (p = nextNode(p); p != null && (e = p.item) == null; )
  1082                 nextItem = (next == null) ? null : next.item;
  1082                     p = succ(p);
       
  1083                 next = p;
       
  1084                 nextItem = e;
  1083             } finally {
  1085             } finally {
  1084                 lock.unlock();
  1086                 lock.unlock();
  1085             }
  1087             }
  1086         }
       
  1087 
       
  1088         public boolean hasNext() {
       
  1089             return next != null;
       
  1090         }
       
  1091 
       
  1092         public E next() {
       
  1093             if (next == null)
       
  1094                 throw new NoSuchElementException();
       
  1095             lastRet = next;
       
  1096             E x = nextItem;
       
  1097             advance();
       
  1098             return x;
  1088             return x;
       
  1089         }
       
  1090 
       
  1091         public void forEachRemaining(Consumer<? super E> action) {
       
  1092             // A variant of forEachFrom
       
  1093             Objects.requireNonNull(action);
       
  1094             Node<E> p;
       
  1095             if ((p = next) == null) return;
       
  1096             lastRet = p;
       
  1097             next = null;
       
  1098             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
       
  1099             final int batchSize = 32;
       
  1100             Object[] es = null;
       
  1101             int n, len = 1;
       
  1102             do {
       
  1103                 lock.lock();
       
  1104                 try {
       
  1105                     if (es == null) {
       
  1106                         p = nextNode(p);
       
  1107                         for (Node<E> q = p; q != null; q = succ(q))
       
  1108                             if (q.item != null && ++len == batchSize)
       
  1109                                 break;
       
  1110                         es = new Object[len];
       
  1111                         es[0] = nextItem;
       
  1112                         nextItem = null;
       
  1113                         n = 1;
       
  1114                     } else
       
  1115                         n = 0;
       
  1116                     for (; p != null && n < len; p = succ(p))
       
  1117                         if ((es[n] = p.item) != null) {
       
  1118                             lastRet = p;
       
  1119                             n++;
       
  1120                         }
       
  1121                 } finally {
       
  1122                     lock.unlock();
       
  1123                 }
       
  1124                 for (int i = 0; i < n; i++) {
       
  1125                     @SuppressWarnings("unchecked") E e = (E) es[i];
       
  1126                     action.accept(e);
       
  1127                 }
       
  1128             } while (n > 0 && p != null);
  1099         }
  1129         }
  1100 
  1130 
  1101         public void remove() {
  1131         public void remove() {
  1102             Node<E> n = lastRet;
  1132             Node<E> n = lastRet;
  1103             if (n == null)
  1133             if (n == null)
  1114         }
  1144         }
  1115     }
  1145     }
  1116 
  1146 
  1117     /** Forward iterator */
  1147     /** Forward iterator */
  1118     private class Itr extends AbstractItr {
  1148     private class Itr extends AbstractItr {
       
  1149         Itr() {}                        // prevent access constructor creation
  1119         Node<E> firstNode() { return first; }
  1150         Node<E> firstNode() { return first; }
  1120         Node<E> nextNode(Node<E> n) { return n.next; }
  1151         Node<E> nextNode(Node<E> n) { return n.next; }
  1121     }
  1152     }
  1122 
  1153 
  1123     /** Descending iterator */
  1154     /** Descending iterator */
  1124     private class DescendingItr extends AbstractItr {
  1155     private class DescendingItr extends AbstractItr {
       
  1156         DescendingItr() {}              // prevent access constructor creation
  1125         Node<E> firstNode() { return last; }
  1157         Node<E> firstNode() { return last; }
  1126         Node<E> nextNode(Node<E> n) { return n.prev; }
  1158         Node<E> nextNode(Node<E> n) { return n.prev; }
  1127     }
  1159     }
  1128 
  1160 
  1129     /** A customized variant of Spliterators.IteratorSpliterator */
  1161     /**
       
  1162      * A customized variant of Spliterators.IteratorSpliterator.
       
  1163      * Keep this class in sync with (very similar) LBQSpliterator.
       
  1164      */
  1130     private final class LBDSpliterator implements Spliterator<E> {
  1165     private final class LBDSpliterator implements Spliterator<E> {
  1131         static final int MAX_BATCH = 1 << 25;  // max batch array size;
  1166         static final int MAX_BATCH = 1 << 25;  // max batch array size;
  1132         Node<E> current;    // current node; null until initialized
  1167         Node<E> current;    // current node; null until initialized
  1133         int batch;          // batch size for splits
  1168         int batch;          // batch size for splits
  1134         boolean exhausted;  // true when no more nodes
  1169         boolean exhausted;  // true when no more nodes
  1135         long est;           // size estimate
  1170         long est = size();  // size estimate
  1136 
  1171 
  1137         LBDSpliterator() { est = size(); }
  1172         LBDSpliterator() {}
  1138 
  1173 
  1139         public long estimateSize() { return est; }
  1174         public long estimateSize() { return est; }
  1140 
  1175 
  1141         public Spliterator<E> trySplit() {
  1176         public Spliterator<E> trySplit() {
  1142             Node<E> h;
  1177             Node<E> h;
  1143             int b = batch;
  1178             int b = batch;
  1144             int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
  1179             int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
  1145             if (!exhausted &&
  1180             if (!exhausted &&
  1146                 (((h = current) != null && h != h.next)
  1181                 ((h = current) != null || (h = first) != null)
  1147                  || (h = first) != null)
       
  1148                 && h.next != null) {
  1182                 && h.next != null) {
  1149                 Object[] a = new Object[n];
  1183                 Object[] a = new Object[n];
  1150                 final ReentrantLock lock = LinkedBlockingDeque.this.lock;
  1184                 final ReentrantLock lock = LinkedBlockingDeque.this.lock;
  1151                 int i = 0;
  1185                 int i = 0;
  1152                 Node<E> p = current;
  1186                 Node<E> p = current;
  1153                 lock.lock();
  1187                 lock.lock();
  1154                 try {
  1188                 try {
  1155                     if (((p != null && p != p.next) || (p = first) != null)
  1189                     if (p != null || (p = first) != null)
  1156                         && p.item != null)
  1190                         for (; p != null && i < n; p = succ(p))
  1157                         for (; p != null && i < n; p = p.next)
  1191                             if ((a[i] = p.item) != null)
  1158                             a[i++] = p.item;
  1192                                 i++;
  1159                 } finally {
  1193                 } finally {
  1160                     lock.unlock();
  1194                     lock.unlock();
  1161                 }
  1195                 }
  1162                 if ((current = p) == null) {
  1196                 if ((current = p) == null) {
  1163                     est = 0L;
  1197                     est = 0L;
  1174                 }
  1208                 }
  1175             }
  1209             }
  1176             return null;
  1210             return null;
  1177         }
  1211         }
  1178 
  1212 
  1179         public void forEachRemaining(Consumer<? super E> action) {
  1213         public boolean tryAdvance(Consumer<? super E> action) {
  1180             if (action == null) throw new NullPointerException();
  1214             Objects.requireNonNull(action);
  1181             if (exhausted)
  1215             if (!exhausted) {
  1182                 return;
       
  1183             exhausted = true;
       
  1184             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
       
  1185             Node<E> p = current;
       
  1186             current = null;
       
  1187             do {
       
  1188                 E e = null;
  1216                 E e = null;
       
  1217                 final ReentrantLock lock = LinkedBlockingDeque.this.lock;
  1189                 lock.lock();
  1218                 lock.lock();
  1190                 try {
  1219                 try {
  1191                     if ((p != null && p != p.next) || (p = first) != null) {
  1220                     Node<E> p;
  1192                         e = p.item;
  1221                     if ((p = current) != null || (p = first) != null)
  1193                         p = p.next;
  1222                         do {
  1194                     }
  1223                             e = p.item;
       
  1224                             p = succ(p);
       
  1225                         } while (e == null && p != null);
       
  1226                     exhausted = ((current = p) == null);
  1195                 } finally {
  1227                 } finally {
  1196                     lock.unlock();
  1228                     lock.unlock();
  1197                 }
  1229                 }
  1198                 if (e != null)
  1230                 if (e != null) {
  1199                     action.accept(e);
  1231                     action.accept(e);
  1200             } while (p != null);
  1232                     return true;
  1201         }
       
  1202 
       
  1203         public boolean tryAdvance(Consumer<? super E> action) {
       
  1204             if (action == null) throw new NullPointerException();
       
  1205             if (exhausted)
       
  1206                 return false;
       
  1207             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
       
  1208             Node<E> p = current;
       
  1209             E e = null;
       
  1210             lock.lock();
       
  1211             try {
       
  1212                 if ((p != null && p != p.next) || (p = first) != null) {
       
  1213                     e = p.item;
       
  1214                     p = p.next;
       
  1215                 }
  1233                 }
  1216             } finally {
  1234             }
  1217                 lock.unlock();
  1235             return false;
  1218             }
  1236         }
  1219             exhausted = ((current = p) == null);
  1237 
  1220             if (e == null)
  1238         public void forEachRemaining(Consumer<? super E> action) {
  1221                 return false;
  1239             Objects.requireNonNull(action);
  1222             action.accept(e);
  1240             if (!exhausted) {
  1223             return true;
  1241                 exhausted = true;
       
  1242                 Node<E> p = current;
       
  1243                 current = null;
       
  1244                 forEachFrom(action, p);
       
  1245             }
  1224         }
  1246         }
  1225 
  1247 
  1226         public int characteristics() {
  1248         public int characteristics() {
  1227             return (Spliterator.ORDERED |
  1249             return (Spliterator.ORDERED |
  1228                     Spliterator.NONNULL |
  1250                     Spliterator.NONNULL |
  1246      * @return a {@code Spliterator} over the elements in this deque
  1268      * @return a {@code Spliterator} over the elements in this deque
  1247      * @since 1.8
  1269      * @since 1.8
  1248      */
  1270      */
  1249     public Spliterator<E> spliterator() {
  1271     public Spliterator<E> spliterator() {
  1250         return new LBDSpliterator();
  1272         return new LBDSpliterator();
       
  1273     }
       
  1274 
       
  1275     /**
       
  1276      * @throws NullPointerException {@inheritDoc}
       
  1277      */
       
  1278     public void forEach(Consumer<? super E> action) {
       
  1279         Objects.requireNonNull(action);
       
  1280         forEachFrom(action, null);
       
  1281     }
       
  1282 
       
  1283     /**
       
  1284      * Runs action on each element found during a traversal starting at p.
       
  1285      * If p is null, traversal starts at head.
       
  1286      */
       
  1287     void forEachFrom(Consumer<? super E> action, Node<E> p) {
       
  1288         // Extract batches of elements while holding the lock; then
       
  1289         // run the action on the elements while not
       
  1290         final ReentrantLock lock = this.lock;
       
  1291         final int batchSize = 32;       // max number of elements per batch
       
  1292         Object[] es = null;             // container for batch of elements
       
  1293         int n, len = 0;
       
  1294         do {
       
  1295             lock.lock();
       
  1296             try {
       
  1297                 if (es == null) {
       
  1298                     if (p == null) p = first;
       
  1299                     for (Node<E> q = p; q != null; q = succ(q))
       
  1300                         if (q.item != null && ++len == batchSize)
       
  1301                             break;
       
  1302                     es = new Object[len];
       
  1303                 }
       
  1304                 for (n = 0; p != null && n < len; p = succ(p))
       
  1305                     if ((es[n] = p.item) != null)
       
  1306                         n++;
       
  1307             } finally {
       
  1308                 lock.unlock();
       
  1309             }
       
  1310             for (int i = 0; i < n; i++) {
       
  1311                 @SuppressWarnings("unchecked") E e = (E) es[i];
       
  1312                 action.accept(e);
       
  1313             }
       
  1314         } while (n > 0 && p != null);
  1251     }
  1315     }
  1252 
  1316 
  1253     /**
  1317     /**
  1254      * Saves this deque to a stream (that is, serializes it).
  1318      * Saves this deque to a stream (that is, serializes it).
  1255      *
  1319      *
  1288         count = 0;
  1352         count = 0;
  1289         first = null;
  1353         first = null;
  1290         last = null;
  1354         last = null;
  1291         // Read in all elements and place in queue
  1355         // Read in all elements and place in queue
  1292         for (;;) {
  1356         for (;;) {
  1293             @SuppressWarnings("unchecked")
  1357             @SuppressWarnings("unchecked") E item = (E)s.readObject();
  1294             E item = (E)s.readObject();
       
  1295             if (item == null)
  1358             if (item == null)
  1296                 break;
  1359                 break;
  1297             add(item);
  1360             add(item);
  1298         }
  1361         }
  1299     }
  1362     }