8177935: java/net/httpclient/http2/FixedThreadPoolTest.java fails frequently
Summary: fixes a race condition in AsyncWriteQueue
Reviewed-by: chegar
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLDelegate.java Thu Aug 24 16:37:10 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLDelegate.java Thu Aug 17 16:48:45 2017 +0100
@@ -196,7 +196,7 @@
* This same method is called to try and resume output after a blocking
* handshaking operation has completed.
*/
- private void upperWrite(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
+ private boolean upperWrite(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
// currently delayCallback is not used. Use it when it's needed to execute handshake in another thread.
try {
ByteBuffer[] buffers = ByteBufferReference.toBuffers(refs);
@@ -230,6 +230,9 @@
closeExceptionally(t);
errorHandler.accept(t);
}
+ // We always return true: either all the data was sent, or
+ // an exception happened and we have closed the queue.
+ return true;
}
// Connecting at this level means the initial handshake has completed.
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Thu Aug 24 16:37:10 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Thu Aug 17 16:48:45 2017 +0100
@@ -231,7 +231,7 @@
assert false;
}
- void asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
+ boolean asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
try {
ByteBuffer[] bufs = ByteBufferReference.toBuffers(refs);
while (Utils.remaining(bufs) > 0) {
@@ -239,13 +239,14 @@
if (n == 0) {
delayCallback.setDelayed(refs);
client.registerEvent(new WriteEvent());
- return;
+ return false;
}
}
ByteBufferReference.clear(refs);
} catch (IOException e) {
shutdown();
}
+ return true;
}
@Override
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java Thu Aug 24 16:37:10 2017 +0200
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java Thu Aug 17 16:48:45 2017 +0100
@@ -27,17 +27,31 @@
import java.io.Closeable;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiConsumer;
public class AsyncWriteQueue implements Closeable {
+ @FunctionalInterface
+ public static interface AsyncConsumer {
+ /**
+ * Takes an array of buffer reference and attempt to send the data
+ * downstream. If not all the data can be sent, then push back
+ * to the source queue by calling {@code source.setDelayed(buffers)}
+ * and return false. If all the data was successfully sent downstream
+ * then returns true.
+ * @param buffers An array of ButeBufferReference containing data
+ * to send downstream.
+ * @param source This AsyncWriteQueue.
+ * @return true if all the data could be sent downstream, false otherwise.
+ */
+ boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source);
+ }
+
private static final int IDLE = 0; // nobody is flushing from the queue
private static final int FLUSHING = 1; // there is the only thread flushing from the queue
private static final int REFLUSHING = 2; // while one thread was flushing from the queue
@@ -51,7 +65,7 @@
private final AtomicInteger state = new AtomicInteger(IDLE);
private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
- private final BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction;
+ private final AsyncConsumer consumeAction;
// Queue may be processed in two modes:
// 1. if(!doFullDrain) - invoke callback on each chunk
@@ -60,11 +74,11 @@
private ByteBufferReference[] delayedElement = null;
- public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction) {
+ public AsyncWriteQueue(AsyncConsumer consumeAction) {
this(consumeAction, true);
}
- public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction, boolean doFullDrain) {
+ public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) {
this.consumeAction = consumeAction;
this.doFullDrain = doFullDrain;
}
@@ -156,8 +170,7 @@
}
while(true) {
while (element != null) {
- consumeAction.accept(element, this);
- if (state.get() == DELAYED) {
+ if (!consumeAction.trySend(element, this)) {
return;
}
element = drain();
--- a/jdk/test/java/net/httpclient/http2/FixedThreadPoolTest.java Thu Aug 24 16:37:10 2017 +0200
+++ b/jdk/test/java/net/httpclient/http2/FixedThreadPoolTest.java Thu Aug 17 16:48:45 2017 +0100
@@ -23,8 +23,7 @@
/*
* @test
- * @bug 8087112
- * @key intermittent
+ * @bug 8087112 8177935
* @library /lib/testlibrary server
* @build jdk.testlibrary.SimpleSSLContext
* @modules jdk.incubator.httpclient/jdk.incubator.http.internal.common