--- a/jdk/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java Tue Oct 13 16:35:22 2015 -0700
+++ b/jdk/src/java.base/share/classes/java/util/concurrent/LinkedTransferQueue.java Tue Oct 13 16:45:35 2015 -0700
@@ -36,14 +36,14 @@
package java.util.concurrent;
import java.util.AbstractQueue;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.LockSupport;
import java.util.Spliterator;
import java.util.Spliterators;
+import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
/**
@@ -83,7 +83,7 @@
*
* @since 1.7
* @author Doug Lea
- * @param <E> the type of elements held in this collection
+ * @param <E> the type of elements held in this queue
*/
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable {
@@ -108,7 +108,7 @@
*
* A FIFO dual queue may be implemented using a variation of the
* Michael & Scott (M&S) lock-free queue algorithm
- * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
+ * (http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf).
* It maintains two pointer fields, "head", pointing to a
* (matched) node that in turn points to the first actual
* (unmatched) queue node (or null if empty); and "tail" that
@@ -215,7 +215,7 @@
* of costly-to-reclaim garbage caused by the sequential "next"
* links of nodes starting at old forgotten head nodes: As first
* described in detail by Boehm
- * (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC
+ * (http://portal.acm.org/citation.cfm?doid=503272.503282), if a GC
* delays noticing that any arbitrarily old node has become
* garbage, all newer dead nodes will also be unreclaimed.
* (Similar issues arise in non-GC environments.) To cope with
@@ -456,12 +456,12 @@
// CAS methods for fields
final boolean casNext(Node cmp, Node val) {
- return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
+ return U.compareAndSwapObject(this, NEXT, cmp, val);
}
final boolean casItem(Object cmp, Object val) {
// assert cmp == null || cmp.getClass() != Node.class;
- return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
+ return U.compareAndSwapObject(this, ITEM, cmp, val);
}
/**
@@ -469,7 +469,7 @@
* only be seen after publication via casNext.
*/
Node(Object item, boolean isData) {
- UNSAFE.putObject(this, itemOffset, item); // relaxed write
+ U.putObject(this, ITEM, item); // relaxed write
this.isData = isData;
}
@@ -478,7 +478,7 @@
* only after CASing head field, so uses relaxed write.
*/
final void forgetNext() {
- UNSAFE.putObject(this, nextOffset, this);
+ U.putObject(this, NEXT, this);
}
/**
@@ -491,8 +491,8 @@
* else we don't care).
*/
final void forgetContents() {
- UNSAFE.putObject(this, itemOffset, this);
- UNSAFE.putObject(this, waiterOffset, null);
+ U.putObject(this, ITEM, this);
+ U.putObject(this, WAITER, null);
}
/**
@@ -538,21 +538,19 @@
private static final long serialVersionUID = -3375979862319811754L;
// Unsafe mechanics
- private static final sun.misc.Unsafe UNSAFE;
- private static final long itemOffset;
- private static final long nextOffset;
- private static final long waiterOffset;
+ private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
+ private static final long ITEM;
+ private static final long NEXT;
+ private static final long WAITER;
static {
try {
- UNSAFE = sun.misc.Unsafe.getUnsafe();
- Class<?> k = Node.class;
- itemOffset = UNSAFE.objectFieldOffset
- (k.getDeclaredField("item"));
- nextOffset = UNSAFE.objectFieldOffset
- (k.getDeclaredField("next"));
- waiterOffset = UNSAFE.objectFieldOffset
- (k.getDeclaredField("waiter"));
- } catch (Exception e) {
+ ITEM = U.objectFieldOffset
+ (Node.class.getDeclaredField("item"));
+ NEXT = U.objectFieldOffset
+ (Node.class.getDeclaredField("next"));
+ WAITER = U.objectFieldOffset
+ (Node.class.getDeclaredField("waiter"));
+ } catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
@@ -569,15 +567,15 @@
// CAS methods for fields
private boolean casTail(Node cmp, Node val) {
- return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
+ return U.compareAndSwapObject(this, TAIL, cmp, val);
}
private boolean casHead(Node cmp, Node val) {
- return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
+ return U.compareAndSwapObject(this, HEAD, cmp, val);
}
private boolean casSweepVotes(int cmp, int val) {
- return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
+ return U.compareAndSwapInt(this, SWEEPVOTES, cmp, val);
}
/*
@@ -588,12 +586,6 @@
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
- @SuppressWarnings("unchecked")
- static <E> E cast(Object item) {
- // assert item == null || item.getClass() != Node.class;
- return (E) item;
- }
-
/**
* Implements all queuing methods. See above for explanation.
*
@@ -630,7 +622,8 @@
break; // unless slack < 2
}
LockSupport.unpark(p.waiter);
- return LinkedTransferQueue.<E>cast(item);
+ @SuppressWarnings("unchecked") E itemE = (E) item;
+ return itemE;
}
}
Node n = p.next;
@@ -708,15 +701,15 @@
if (item != e) { // matched
// assert item != s;
s.forgetContents(); // avoid garbage
- return LinkedTransferQueue.<E>cast(item);
+ @SuppressWarnings("unchecked") E itemE = (E) item;
+ return itemE;
}
- if ((w.isInterrupted() || (timed && nanos <= 0)) &&
- s.casItem(e, s)) { // cancel
- unsplice(pred, s);
- return e;
+ else if (w.isInterrupted() || (timed && nanos <= 0L)) {
+ unsplice(pred, s); // try to unlink and cancel
+ if (s.casItem(e, s)) // return normally if lost CAS
+ return e;
}
-
- if (spins < 0) { // establish spins at/near front
+ else if (spins < 0) { // establish spins at/near front
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
@@ -768,52 +761,25 @@
}
/**
- * Returns the first unmatched node of the given mode, or null if
- * none. Used by methods isEmpty, hasWaitingConsumer.
- */
- private Node firstOfMode(boolean isData) {
- for (Node p = head; p != null; p = succ(p)) {
- if (!p.isMatched())
- return (p.isData == isData) ? p : null;
- }
- return null;
- }
-
- /**
- * Version of firstOfMode used by Spliterator. Callers must
- * recheck if the returned node's item field is null or
- * self-linked before using.
+ * Returns the first unmatched data node, or null if none.
+ * Callers must recheck if the returned node's item field is null
+ * or self-linked before using.
*/
final Node firstDataNode() {
- for (Node p = head; p != null;) {
- Object item = p.item;
- if (p.isData) {
- if (item != null && item != p)
- return p;
+ restartFromHead: for (;;) {
+ for (Node p = head; p != null;) {
+ Object item = p.item;
+ if (p.isData) {
+ if (item != null && item != p)
+ return p;
+ }
+ else if (item == null)
+ break;
+ if (p == (p = p.next))
+ continue restartFromHead;
}
- else if (item == null)
- break;
- if (p == (p = p.next))
- p = head;
+ return null;
}
- return null;
- }
-
- /**
- * Returns the item in the first unmatched node with isData; or
- * null if none. Used by peek.
- */
- private E firstDataItem() {
- for (Node p = head; p != null; p = succ(p)) {
- Object item = p.item;
- if (p.isData) {
- if (item != null && item != p)
- return LinkedTransferQueue.<E>cast(item);
- }
- else if (item == null)
- return null;
- }
- return null;
}
/**
@@ -821,23 +787,140 @@
* Used by methods size and getWaitingConsumerCount.
*/
private int countOfMode(boolean data) {
- int count = 0;
- for (Node p = head; p != null; ) {
- if (!p.isMatched()) {
- if (p.isData != data)
- return 0;
- if (++count == Integer.MAX_VALUE) // saturated
+ restartFromHead: for (;;) {
+ int count = 0;
+ for (Node p = head; p != null;) {
+ if (!p.isMatched()) {
+ if (p.isData != data)
+ return 0;
+ if (++count == Integer.MAX_VALUE)
+ break; // @see Collection.size()
+ }
+ if (p == (p = p.next))
+ continue restartFromHead;
+ }
+ return count;
+ }
+ }
+
+ public String toString() {
+ String[] a = null;
+ restartFromHead: for (;;) {
+ int charLength = 0;
+ int size = 0;
+ for (Node p = head; p != null;) {
+ Object item = p.item;
+ if (p.isData) {
+ if (item != null && item != p) {
+ if (a == null)
+ a = new String[4];
+ else if (size == a.length)
+ a = Arrays.copyOf(a, 2 * size);
+ String s = item.toString();
+ a[size++] = s;
+ charLength += s.length();
+ }
+ } else if (item == null)
break;
+ if (p == (p = p.next))
+ continue restartFromHead;
+ }
+
+ if (size == 0)
+ return "[]";
+
+ return Helpers.toString(a, size, charLength);
+ }
+ }
+
+ private Object[] toArrayInternal(Object[] a) {
+ Object[] x = a;
+ restartFromHead: for (;;) {
+ int size = 0;
+ for (Node p = head; p != null;) {
+ Object item = p.item;
+ if (p.isData) {
+ if (item != null && item != p) {
+ if (x == null)
+ x = new Object[4];
+ else if (size == x.length)
+ x = Arrays.copyOf(x, 2 * (size + 4));
+ x[size++] = item;
+ }
+ } else if (item == null)
+ break;
+ if (p == (p = p.next))
+ continue restartFromHead;
}
- Node n = p.next;
- if (n != p)
- p = n;
- else {
- count = 0;
- p = head;
+ if (x == null)
+ return new Object[0];
+ else if (a != null && size <= a.length) {
+ if (a != x)
+ System.arraycopy(x, 0, a, 0, size);
+ if (size < a.length)
+ a[size] = null;
+ return a;
}
+ return (size == x.length) ? x : Arrays.copyOf(x, size);
}
- return count;
+ }
+
+ /**
+ * Returns an array containing all of the elements in this queue, in
+ * proper sequence.
+ *
+ * <p>The returned array will be "safe" in that no references to it are
+ * maintained by this queue. (In other words, this method must allocate
+ * a new array). The caller is thus free to modify the returned array.
+ *
+ * <p>This method acts as bridge between array-based and collection-based
+ * APIs.
+ *
+ * @return an array containing all of the elements in this queue
+ */
+ public Object[] toArray() {
+ return toArrayInternal(null);
+ }
+
+ /**
+ * Returns an array containing all of the elements in this queue, in
+ * proper sequence; the runtime type of the returned array is that of
+ * the specified array. If the queue fits in the specified array, it
+ * is returned therein. Otherwise, a new array is allocated with the
+ * runtime type of the specified array and the size of this queue.
+ *
+ * <p>If this queue fits in the specified array with room to spare
+ * (i.e., the array has more elements than this queue), the element in
+ * the array immediately following the end of the queue is set to
+ * {@code null}.
+ *
+ * <p>Like the {@link #toArray()} method, this method acts as bridge between
+ * array-based and collection-based APIs. Further, this method allows
+ * precise control over the runtime type of the output array, and may,
+ * under certain circumstances, be used to save allocation costs.
+ *
+ * <p>Suppose {@code x} is a queue known to contain only strings.
+ * The following code can be used to dump the queue into a newly
+ * allocated array of {@code String}:
+ *
+ * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
+ *
+ * Note that {@code toArray(new Object[0])} is identical in function to
+ * {@code toArray()}.
+ *
+ * @param a the array into which the elements of the queue are to
+ * be stored, if it is big enough; otherwise, a new array of the
+ * same runtime type is allocated for this purpose
+ * @return an array containing all of the elements in this queue
+ * @throws ArrayStoreException if the runtime type of the specified array
+ * is not a supertype of the runtime type of every element in
+ * this queue
+ * @throws NullPointerException if the specified array is null
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T[] toArray(T[] a) {
+ if (a == null) throw new NullPointerException();
+ return (T[]) toArrayInternal(a);
}
final class Itr implements Iterator<E> {
@@ -886,7 +969,8 @@
Object item = s.item;
if (s.isData) {
if (item != null && item != s) {
- nextItem = LinkedTransferQueue.<E>cast(item);
+ @SuppressWarnings("unchecked") E itemE = (E) item;
+ nextItem = itemE;
nextNode = s;
return;
}
@@ -934,23 +1018,19 @@
}
/** A customized variant of Spliterators.IteratorSpliterator */
- static final class LTQSpliterator<E> implements Spliterator<E> {
+ final class LTQSpliterator<E> implements Spliterator<E> {
static final int MAX_BATCH = 1 << 25; // max batch array size;
- final LinkedTransferQueue<E> queue;
- Node current; // current node; null until initialized
+ Node current; // current node; null until initialized
int batch; // batch size for splits
boolean exhausted; // true when no more nodes
- LTQSpliterator(LinkedTransferQueue<E> queue) {
- this.queue = queue;
- }
+ LTQSpliterator() {}
public Spliterator<E> trySplit() {
Node p;
- final LinkedTransferQueue<E> q = this.queue;
int b = batch;
int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
if (!exhausted &&
- ((p = current) != null || (p = q.firstDataNode()) != null) &&
+ ((p = current) != null || (p = firstDataNode()) != null) &&
p.next != null) {
Object[] a = new Object[n];
int i = 0;
@@ -959,15 +1039,16 @@
if (e != p && (a[i] = e) != null)
++i;
if (p == (p = p.next))
- p = q.firstDataNode();
+ p = firstDataNode();
} while (p != null && i < n && p.isData);
if ((current = p) == null)
exhausted = true;
if (i > 0) {
batch = i;
return Spliterators.spliterator
- (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
- Spliterator.CONCURRENT);
+ (a, 0, i, (Spliterator.ORDERED |
+ Spliterator.NONNULL |
+ Spliterator.CONCURRENT));
}
}
return null;
@@ -977,16 +1058,15 @@
public void forEachRemaining(Consumer<? super E> action) {
Node p;
if (action == null) throw new NullPointerException();
- final LinkedTransferQueue<E> q = this.queue;
if (!exhausted &&
- ((p = current) != null || (p = q.firstDataNode()) != null)) {
+ ((p = current) != null || (p = firstDataNode()) != null)) {
exhausted = true;
do {
Object e = p.item;
if (e != null && e != p)
action.accept((E)e);
if (p == (p = p.next))
- p = q.firstDataNode();
+ p = firstDataNode();
} while (p != null && p.isData);
}
}
@@ -995,15 +1075,14 @@
public boolean tryAdvance(Consumer<? super E> action) {
Node p;
if (action == null) throw new NullPointerException();
- final LinkedTransferQueue<E> q = this.queue;
if (!exhausted &&
- ((p = current) != null || (p = q.firstDataNode()) != null)) {
+ ((p = current) != null || (p = firstDataNode()) != null)) {
Object e;
do {
if ((e = p.item) == p)
e = null;
if (p == (p = p.next))
- p = q.firstDataNode();
+ p = firstDataNode();
} while (e == null && p != null && p.isData);
if ((current = p) == null)
exhausted = true;
@@ -1040,7 +1119,7 @@
* @since 1.8
*/
public Spliterator<E> spliterator() {
- return new LTQSpliterator<E>(this);
+ return new LTQSpliterator<E>();
}
/* -------------- Removal methods -------------- */
@@ -1054,7 +1133,7 @@
* @param s the node to be unspliced
*/
final void unsplice(Node pred, Node s) {
- s.forgetContents(); // forget unneeded fields
+ s.waiter = null; // disable signals
/*
* See above for rationale. Briefly: if pred still points to
* s, try to unlink s. If s cannot be unlinked, because it is
@@ -1332,7 +1411,22 @@
}
public E peek() {
- return firstDataItem();
+ restartFromHead: for (;;) {
+ for (Node p = head; p != null;) {
+ Object item = p.item;
+ if (p.isData) {
+ if (item != null && item != p) {
+ @SuppressWarnings("unchecked") E e = (E) item;
+ return e;
+ }
+ }
+ else if (item == null)
+ break;
+ if (p == (p = p.next))
+ continue restartFromHead;
+ }
+ return null;
+ }
}
/**
@@ -1341,15 +1435,24 @@
* @return {@code true} if this queue contains no elements
*/
public boolean isEmpty() {
- for (Node p = head; p != null; p = succ(p)) {
- if (!p.isMatched())
- return !p.isData;
- }
- return true;
+ return firstDataNode() == null;
}
public boolean hasWaitingConsumer() {
- return firstOfMode(false) != null;
+ restartFromHead: for (;;) {
+ for (Node p = head; p != null;) {
+ Object item = p.item;
+ if (p.isData) {
+ if (item != null && item != p)
+ break;
+ }
+ else if (item == null)
+ return true;
+ if (p == (p = p.next))
+ continue restartFromHead;
+ }
+ return false;
+ }
}
/**
@@ -1396,15 +1499,16 @@
* @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
- if (o == null) return false;
- for (Node p = head; p != null; p = succ(p)) {
- Object item = p.item;
- if (p.isData) {
- if (item != null && item != p && o.equals(item))
- return true;
+ if (o != null) {
+ for (Node p = head; p != null; p = succ(p)) {
+ Object item = p.item;
+ if (p.isData) {
+ if (item != null && item != p && o.equals(item))
+ return true;
+ }
+ else if (item == null)
+ break;
}
- else if (item == null)
- break;
}
return false;
}
@@ -1460,22 +1564,24 @@
// Unsafe mechanics
- private static final sun.misc.Unsafe UNSAFE;
- private static final long headOffset;
- private static final long tailOffset;
- private static final long sweepVotesOffset;
+ private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
+ private static final long HEAD;
+ private static final long TAIL;
+ private static final long SWEEPVOTES;
static {
try {
- UNSAFE = sun.misc.Unsafe.getUnsafe();
- Class<?> k = LinkedTransferQueue.class;
- headOffset = UNSAFE.objectFieldOffset
- (k.getDeclaredField("head"));
- tailOffset = UNSAFE.objectFieldOffset
- (k.getDeclaredField("tail"));
- sweepVotesOffset = UNSAFE.objectFieldOffset
- (k.getDeclaredField("sweepVotes"));
- } catch (Exception e) {
+ HEAD = U.objectFieldOffset
+ (LinkedTransferQueue.class.getDeclaredField("head"));
+ TAIL = U.objectFieldOffset
+ (LinkedTransferQueue.class.getDeclaredField("tail"));
+ SWEEPVOTES = U.objectFieldOffset
+ (LinkedTransferQueue.class.getDeclaredField("sweepVotes"));
+ } catch (ReflectiveOperationException e) {
throw new Error(e);
}
+
+ // Reduce the risk of rare disastrous classloading in first call to
+ // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
+ Class<?> ensureLoaded = LockSupport.class;
}
}