# HG changeset patch # User dfuchs # Date 1502984925 -3600 # Node ID 2684bef9803357c0435e31bff17f395efda77a9c # Parent 4586bc5d28d13d3147b993e6237eaf29a7073bbb 8177935: java/net/httpclient/http2/FixedThreadPoolTest.java fails frequently Summary: fixes a race condition in AsyncWriteQueue Reviewed-by: chegar diff -r 4586bc5d28d1 -r 2684bef98033 jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLDelegate.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLDelegate.java Thu Aug 24 16:37:10 2017 +0200 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLDelegate.java Thu Aug 17 16:48:45 2017 +0100 @@ -196,7 +196,7 @@ * This same method is called to try and resume output after a blocking * handshaking operation has completed. */ - private void upperWrite(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) { + private boolean upperWrite(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) { // currently delayCallback is not used. Use it when it's needed to execute handshake in another thread. try { ByteBuffer[] buffers = ByteBufferReference.toBuffers(refs); @@ -230,6 +230,9 @@ closeExceptionally(t); errorHandler.accept(t); } + // We always return true: either all the data was sent, or + // an exception happened and we have closed the queue. + return true; } // Connecting at this level means the initial handshake has completed. diff -r 4586bc5d28d1 -r 2684bef98033 jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Thu Aug 24 16:37:10 2017 +0200 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Thu Aug 17 16:48:45 2017 +0100 @@ -231,7 +231,7 @@ assert false; } - void asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) { + boolean asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) { try { ByteBuffer[] bufs = ByteBufferReference.toBuffers(refs); while (Utils.remaining(bufs) > 0) { @@ -239,13 +239,14 @@ if (n == 0) { delayCallback.setDelayed(refs); client.registerEvent(new WriteEvent()); - return; + return false; } } ByteBufferReference.clear(refs); } catch (IOException e) { shutdown(); } + return true; } @Override diff -r 4586bc5d28d1 -r 2684bef98033 jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java Thu Aug 24 16:37:10 2017 +0200 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java Thu Aug 17 16:48:45 2017 +0100 @@ -27,17 +27,31 @@ import java.io.Closeable; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Deque; import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; public class AsyncWriteQueue implements Closeable { + @FunctionalInterface + public static interface AsyncConsumer { + /** + * Takes an array of buffer reference and attempt to send the data + * downstream. If not all the data can be sent, then push back + * to the source queue by calling {@code source.setDelayed(buffers)} + * and return false. If all the data was successfully sent downstream + * then returns true. + * @param buffers An array of ButeBufferReference containing data + * to send downstream. + * @param source This AsyncWriteQueue. + * @return true if all the data could be sent downstream, false otherwise. + */ + boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source); + } + private static final int IDLE = 0; // nobody is flushing from the queue private static final int FLUSHING = 1; // there is the only thread flushing from the queue private static final int REFLUSHING = 2; // while one thread was flushing from the queue @@ -51,7 +65,7 @@ private final AtomicInteger state = new AtomicInteger(IDLE); private final Deque queue = new ConcurrentLinkedDeque<>(); - private final BiConsumer consumeAction; + private final AsyncConsumer consumeAction; // Queue may be processed in two modes: // 1. if(!doFullDrain) - invoke callback on each chunk @@ -60,11 +74,11 @@ private ByteBufferReference[] delayedElement = null; - public AsyncWriteQueue(BiConsumer consumeAction) { + public AsyncWriteQueue(AsyncConsumer consumeAction) { this(consumeAction, true); } - public AsyncWriteQueue(BiConsumer consumeAction, boolean doFullDrain) { + public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) { this.consumeAction = consumeAction; this.doFullDrain = doFullDrain; } @@ -156,8 +170,7 @@ } while(true) { while (element != null) { - consumeAction.accept(element, this); - if (state.get() == DELAYED) { + if (!consumeAction.trySend(element, this)) { return; } element = drain(); diff -r 4586bc5d28d1 -r 2684bef98033 jdk/test/java/net/httpclient/http2/FixedThreadPoolTest.java --- a/jdk/test/java/net/httpclient/http2/FixedThreadPoolTest.java Thu Aug 24 16:37:10 2017 +0200 +++ b/jdk/test/java/net/httpclient/http2/FixedThreadPoolTest.java Thu Aug 17 16:48:45 2017 +0100 @@ -23,8 +23,7 @@ /* * @test - * @bug 8087112 - * @key intermittent + * @bug 8087112 8177935 * @library /lib/testlibrary server * @build jdk.testlibrary.SimpleSSLContext * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.common