8175274: Fix httpclient asynchronous usage
authorskuksenko
Tue, 21 Feb 2017 11:08:34 +0000
changeset 43984 f33383dcb1fb
parent 43983 d35bbdbe9d36
child 43985 348c43a85dcf
8175274: Fix httpclient asynchronous usage Reviewed-by: dfuchs, michaelm
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SSLConnection.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncDataReadQueue.java
jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java
jdk/test/java/net/httpclient/http2/FixedThreadPoolTest.java
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java	Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java	Tue Feb 21 11:08:34 2017 +0000
@@ -27,7 +27,6 @@
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.Proxy;
 import java.net.ProxySelector;
 import java.net.SocketPermission;
 import java.net.URI;
@@ -71,7 +70,6 @@
     final Executor parentExecutor;
     final HttpRequest.BodyProcessor requestProcessor;
     boolean upgrading; // to HTTP/2
-    volatile Executor responseExecutor;
     final PushGroup<?,T> pushGroup;
 
     // buffer for receiving response headers
@@ -139,7 +137,7 @@
     }
 
     public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
-        return exchImpl.readBodyAsync(handler, true, responseExecutor);
+        return exchImpl.readBodyAsync(handler, true, parentExecutor);
     }
 
     public void cancel() {
@@ -224,7 +222,8 @@
 
             return checkForUpgrade(resp, exchImpl);
         } else {
-            exchImpl.sendRequest();
+            exchImpl.sendHeadersOnly();
+            exchImpl.sendBody();
             Response resp = exchImpl.getResponse();
             HttpResponseImpl.logResponse(resp);
             return checkForUpgrade(resp, exchImpl);
@@ -235,8 +234,6 @@
     // will be a non null responseAsync if expect continue returns an error
 
     public CompletableFuture<Response> responseAsync() {
-        // take one thread from supplied executor to handle response headers and body
-        responseExecutor = Utils.singleThreadExecutor(parentExecutor);
         return responseAsyncImpl(null);
     }
 
@@ -267,20 +264,18 @@
             Log.logTrace("Sending Expect: 100-Continue");
             return exchImpl
                     .sendHeadersAsync()
-                    .thenCompose((v) -> exchImpl.getResponseAsync(responseExecutor))
+                    .thenCompose(v -> exchImpl.getResponseAsync(parentExecutor))
                     .thenCompose((Response r1) -> {
                         HttpResponseImpl.logResponse(r1);
                         int rcode = r1.statusCode();
                         if (rcode == 100) {
                             Log.logTrace("Received 100-Continue: sending body");
-                            return exchImpl.sendBodyAsync(parentExecutor)
-                                .thenCompose((v) -> exchImpl.getResponseAsync(responseExecutor))
-                                .thenCompose((Response r2) -> {
-                                    return checkForUpgradeAsync(r2, exchImpl);
-                                }).thenApply((Response r) -> {
-                                    HttpResponseImpl.logResponse(r);
-                                    return r;
-                                });
+                            CompletableFuture<Response> cf =
+                                    exchImpl.sendBodyAsync()
+                                            .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
+                            cf = wrapForUpgrade(cf);
+                            cf = wrapForLog(cf);
+                            return cf;
                         } else {
                             Log.logTrace("Expectation failed: Received {0}",
                                          rcode);
@@ -289,26 +284,38 @@
                                         "Unable to handle 101 while waiting for 100");
                                 return MinimalFuture.failedFuture(failed);
                             }
-                            return exchImpl.readBodyAsync(this::ignoreBody, false, responseExecutor)
-                                  .thenApply((v) -> {
-                                      return r1;
-                                  });
+                            return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor)
+                                  .thenApply(v ->  r1);
                         }
                     });
         } else {
-            return exchImpl
-                .sendRequestAsync(parentExecutor)
-                .thenCompose((v) -> exchImpl.getResponseAsync(responseExecutor))
-                .thenCompose((Response r1) -> {
-                    return checkForUpgradeAsync(r1, exchImpl);
-                })
-                .thenApply((Response response) -> {
-                    HttpResponseImpl.logResponse(response);
-                    return response;
-                });
+            CompletableFuture<Response> cf = exchImpl
+                    .sendHeadersAsync()
+                    .thenCompose(ExchangeImpl::sendBodyAsync)
+                    .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
+            cf = wrapForUpgrade(cf);
+            cf = wrapForLog(cf);
+            return cf;
         }
     }
 
+    private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> cf) {
+        if (upgrading) {
+            return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl));
+        }
+        return cf;
+    }
+
+    private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) {
+        if (Log.requests()) {
+            return cf.thenApply(response -> {
+                HttpResponseImpl.logResponse(response);
+                return response;
+            });
+        }
+        return cf;
+    }
+
     HttpResponse.BodyProcessor<T> ignoreBody(int status, HttpHeaders hdrs) {
         return HttpResponse.BodyProcessor.discard((T)null);
     }
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java	Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java	Tue Feb 21 11:08:34 2017 +0000
@@ -102,16 +102,12 @@
 
     // Blocking impl but in async style
 
-    CompletableFuture<Void> sendHeadersAsync() {
-        CompletableFuture<Void> cf = new MinimalFuture<>();
-        try {
+    CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
+        // this is blocking. cf will already be completed.
+        return MinimalFuture.supply(() -> {
             sendHeadersOnly();
-            cf.complete(null);
-        } catch (Throwable t) {
-            cf.completeExceptionally(t);
-        }
-        // this is blocking. cf will already be completed.
-        return cf;
+            return this;
+        });
     }
 
     /**
@@ -156,40 +152,14 @@
 
     // Async version of sendBody(). This only used when body sent separately
     // to headers (100 continue)
-    CompletableFuture<Void> sendBodyAsync(Executor executor) {
-        CompletableFuture<Void> cf = new MinimalFuture<>();
-        executor.execute(() -> {
-            try {
-                sendBody();
-                cf.complete(null);
-            } catch (Throwable t) {
-                cf.completeExceptionally(t);
-            }
+    CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
+        return MinimalFuture.supply(() -> {
+            sendBody();
+            return this;
         });
-        return cf;
     }
 
     /**
-     * Sends the entire request (headers and body) blocking.
-     */
-    void sendRequest() throws IOException, InterruptedException {
-        sendHeadersOnly();
-        sendBody();
-    }
-
-    CompletableFuture<Void> sendRequestAsync(Executor executor) {
-        CompletableFuture<Void> cf = new MinimalFuture<>();
-        executor.execute(() -> {
-            try {
-                sendRequest();
-                cf.complete(null);
-            } catch (Throwable t) {
-                cf.completeExceptionally(t);
-            }
-        });
-        return cf;
-    }
-    /**
      * Cancels a request.  Not currently exposed through API.
      */
     abstract void cancel();
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java	Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java	Tue Feb 21 11:08:34 2017 +0000
@@ -194,17 +194,11 @@
     }
 
     CompletableFuture<Response> getResponseAsyncImpl(Executor executor) {
-        CompletableFuture<Response> cf = new MinimalFuture<>();
-        executor.execute(() -> {
-            try {
-                response = new Http1Response<>(connection, Http1Exchange.this);
-                response.readHeaders();
-                cf.complete(response.response());
-            } catch (Throwable e) {
-                cf.completeExceptionally(e);
-            }
-        });
-        return cf;
+        return MinimalFuture.supply( () -> {
+            response = new Http1Response<>(connection, Http1Exchange.this);
+            response.readHeaders();
+            return response.response();
+        }, executor);
     }
 
     @Override
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Tue Feb 21 11:08:34 2017 +0000
@@ -241,14 +241,7 @@
                                                           Http2ClientImpl client2,
                                                           Exchange<?> exchange,
                                                           ByteBuffer initial) {
-        CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
-        try {
-            Http2Connection c = new Http2Connection(connection, client2, exchange, initial);
-            cf.complete(c);
-        } catch (IOException | InterruptedException e) {
-            cf.completeExceptionally(e);
-        }
-        return cf;
+        return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
     }
 
     /**
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java	Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java	Tue Feb 21 11:08:34 2017 +0000
@@ -235,7 +235,7 @@
     sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
     {
         MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
-        return mex.responseAsync(null)
+        return mex.responseAsync()
                   .thenApply((HttpResponseImpl<T> b) -> (HttpResponse<T>) b);
     }
 
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java	Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java	Tue Feb 21 11:08:34 2017 +0000
@@ -35,6 +35,8 @@
 import java.util.concurrent.ExecutionException;
 import java.util.function.BiFunction;
 import java.util.concurrent.Executor;
+import java.util.function.UnaryOperator;
+
 import jdk.incubator.http.internal.common.Log;
 import jdk.incubator.http.internal.common.MinimalFuture;
 import jdk.incubator.http.internal.common.Pair;
@@ -150,8 +152,7 @@
                 Exchange<T> currExchange = getExchange();
                 requestFilters(r);
                 Response response = currExchange.response();
-                Pair<Response, HttpRequestImpl> filterResult = responseFilters(response);
-                HttpRequestImpl newreq = filterResult.second;
+                HttpRequestImpl newreq = responseFilters(response);
                 if (newreq == null) {
                     if (attempts > 1) {
                         Log.logError("Succeeded on attempt: " + attempts);
@@ -213,23 +214,7 @@
         Log.logTrace("All filters applied");
     }
 
-    // Filters are assumed to be non-blocking so the async
-    // versions of these methods just call the blocking ones
-
-    private CompletableFuture<Void> requestFiltersAsync(HttpRequestImpl r) {
-        CompletableFuture<Void> cf = new MinimalFuture<>();
-        try {
-            requestFilters(r);
-            cf.complete(null);
-        } catch(Throwable e) {
-            cf.completeExceptionally(e);
-        }
-        return cf;
-    }
-
-
-    private Pair<Response,HttpRequestImpl>
-    responseFilters(Response response) throws IOException
+    private HttpRequestImpl responseFilters(Response response) throws IOException
     {
         Log.logTrace("Applying response filters");
         for (HeaderFilter filter : filters) {
@@ -237,24 +222,11 @@
             HttpRequestImpl newreq = filter.response(response);
             if (newreq != null) {
                 Log.logTrace("New request: stopping filters");
-                return pair(null, newreq);
+                return newreq;
             }
         }
         Log.logTrace("All filters applied");
-        return pair(response, null);
-    }
-
-    private CompletableFuture<Pair<Response,HttpRequestImpl>>
-    responseFiltersAsync(Response response)
-    {
-        CompletableFuture<Pair<Response,HttpRequestImpl>> cf = new MinimalFuture<>();
-        try {
-            Pair<Response,HttpRequestImpl> n = responseFilters(response); // assumed to be fast
-            cf.complete(n);
-        } catch (Throwable e) {
-            cf.completeExceptionally(e);
-        }
-        return cf;
+        return null;
     }
 
     public void cancel() {
@@ -267,24 +239,27 @@
         getExchange().cancel(cause);
     }
 
-    public CompletableFuture<HttpResponseImpl<T>> responseAsync(Void v) {
-        return responseAsync1(null)
+    public CompletableFuture<HttpResponseImpl<T>> responseAsync() {
+        CompletableFuture<Void> start = new MinimalFuture<>();
+        CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
+        start.completeAsync( () -> null, executor); // trigger execution
+        return cf;
+    }
+
+    private CompletableFuture<HttpResponseImpl<T>> responseAsync0(CompletableFuture<Void> start) {
+        return start.thenCompose( v -> responseAsyncImpl())
             .thenCompose((Response r) -> {
                 Exchange<T> exch = getExchange();
                 return exch.readBodyAsync(responseHandler)
-                        .thenApply((T body) -> {
-                            Pair<Response,T> result = new Pair<>(r, body);
-                            return result;
-                        });
-            })
-            .thenApply((Pair<Response,T> result) -> {
-                return new HttpResponseImpl<>(userRequest, result.first, result.second, getExchange());
+                        .thenApply((T body) ->  new HttpResponseImpl<>(userRequest, r, body, exch));
             });
     }
 
     CompletableFuture<U> multiResponseAsync() {
-        CompletableFuture<HttpResponse<T>> mainResponse = responseAsync(null)
-                  .thenApply((HttpResponseImpl<T> b) -> {
+        CompletableFuture<Void> start = new MinimalFuture<>();
+        CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
+        CompletableFuture<HttpResponse<T>> mainResponse =
+                cf.thenApply((HttpResponseImpl<T> b) -> {
                       multiResponseHandler.onResponse(b);
                       return (HttpResponse<T>)b;
                    });
@@ -295,10 +270,12 @@
             // All push promises received by now.
             pushGroup.noMorePushes(true);
         });
-        return multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF());
+        CompletableFuture<U> res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF());
+        start.completeAsync( () -> null, executor); // trigger execution
+        return res;
     }
 
-    private CompletableFuture<Response> responseAsync1(Void v) {
+    private CompletableFuture<Response> responseAsyncImpl() {
         CompletableFuture<Response> cf;
         if (++attempts > max_attempts) {
             cf = MinimalFuture.failedFuture(new IOException("Too many retries"));
@@ -307,48 +284,51 @@
                 timedEvent = new TimedEvent(currentreq.duration());
                 client.registerTimer(timedEvent);
             }
+            try {
+                // 1. Apply request filters
+                requestFilters(currentreq);
+            } catch (IOException e) {
+                return MinimalFuture.failedFuture(e);
+            }
             Exchange<T> exch = getExchange();
-            // 1. Apply request filters
-            cf = requestFiltersAsync(currentreq)
-                // 2. get response
-                .thenCompose((v1) -> {
-                    return exch.responseAsync();
-                })
-                // 3. Apply response filters
-                .thenCompose(this::responseFiltersAsync)
-                // 4. Check filter result and repeat or continue
-                .thenCompose((Pair<Response,HttpRequestImpl> pair) -> {
-                    Response resp = pair.first;
-                    if (resp != null) {
+            // 2. get response
+            cf = exch.responseAsync()
+                .thenCompose((Response response) -> {
+                    HttpRequestImpl newrequest = null;
+                    try {
+                        // 3. Apply response filters
+                        newrequest = responseFilters(response);
+                    } catch (IOException e) {
+                        return MinimalFuture.failedFuture(e);
+                    }
+                    // 4. Check filter result and repeat or continue
+                    if (newrequest == null) {
                         if (attempts > 1) {
                             Log.logError("Succeeded on attempt: " + attempts);
                         }
-                        return MinimalFuture.completedFuture(resp);
+                        return MinimalFuture.completedFuture(response);
                     } else {
-                        currentreq = pair.second;
-                        Exchange<T> previous = exch;
+                        currentreq = newrequest;
                         setExchange(new Exchange<>(currentreq, this, acc));
                         //reads body off previous, and then waits for next response
-                        return responseAsync1(null);
+                        return responseAsyncImpl();
                     }
                 })
-            // 5. Convert result to Pair
-            .handle((BiFunction<Response, Throwable, Pair<Response, Throwable>>) Pair::new)
-            // 6. Handle errors and cancel any timer set
-            .thenCompose((Pair<Response,Throwable> obj) -> {
-                Response response = obj.first;
+            // 5. Handle errors and cancel any timer set
+            .handle((response, ex) -> {
                 if (response != null) {
                     return MinimalFuture.completedFuture(response);
                 }
                 // all exceptions thrown are handled here
-                CompletableFuture<Response> error = getExceptionalCF(obj.second);
+                CompletableFuture<Response> error = getExceptionalCF(ex);
                 if (error == null) {
                     cancelTimer();
-                    return responseAsync1(null);
+                    return responseAsyncImpl();
                 } else {
                     return error;
                 }
-            });
+            })
+            .thenCompose(UnaryOperator.identity());
         }
         return cf;
     }
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java	Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java	Tue Feb 21 11:08:34 2017 +0000
@@ -50,7 +50,7 @@
             .thenCompose((Void v) -> {
                 HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address);
                 MultiExchange<Void,Void> mconnectExchange = new MultiExchange<>(req, client, this::ignore);
-                return mconnectExchange.responseAsync(null)
+                return mconnectExchange.responseAsync()
                     .thenCompose((HttpResponseImpl<Void> resp) -> {
                         CompletableFuture<Void> cf = new MinimalFuture<>();
                         if (resp.statusCode() != 200) {
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SSLConnection.java	Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SSLConnection.java	Tue Feb 21 11:08:34 2017 +0000
@@ -50,18 +50,11 @@
     @Override
     public CompletableFuture<Void> connectAsync() {
         return delegate.connectAsync()
-                .thenCompose((Void v) -> {
-                    CompletableFuture<Void> cf = new MinimalFuture<>();
-                    try {
-                        this.sslDelegate = new SSLDelegate(delegate.channel(),
-                                                           client,
-                                                           alpn);
-                        cf.complete(null);
-                    } catch (IOException e) {
-                        cf.completeExceptionally(e);
-                    }
-                    return cf;
-                });
+                .thenCompose((Void v) ->
+                                MinimalFuture.supply( () -> {
+                                    this.sslDelegate = new SSLDelegate(delegate.channel(), client, alpn);
+                                    return null;
+                                }));
     }
 
     @Override
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Tue Feb 21 11:08:34 2017 +0000
@@ -39,6 +39,8 @@
 import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
 import jdk.incubator.http.internal.common.*;
 import jdk.incubator.http.internal.frame.*;
 import jdk.incubator.http.internal.hpack.DecodingCallback;
@@ -96,7 +98,7 @@
  */
 class Stream<T> extends ExchangeImpl<T> {
 
-    final Queue<Http2Frame> inputQ;
+    final AsyncDataReadQueue inputQ = new AsyncDataReadQueue();
 
     /**
      * This stream's identifier. Assigned lazily by the HTTP2Connection before
@@ -169,7 +171,7 @@
     {
         CompletableFuture<T> cf = readBodyAsync(handler,
                                                 returnToCache,
-                                                this::executeInline);
+                                                null);
         try {
             return cf.join();
         } catch (CompletionException e) {
@@ -177,10 +179,6 @@
         }
     }
 
-    void executeInline(Runnable r) {
-        r.run();
-    }
-
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
@@ -189,60 +187,69 @@
         return sb.toString();
     }
 
+    private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
+        if (frame instanceof ResetFrame) {
+            handleReset((ResetFrame) frame);
+            return true;
+        } else if (!(frame instanceof DataFrame)) {
+            assert false;
+            return true;
+        }
+        DataFrame df = (DataFrame) frame;
+        // RFC 7540 6.1:
+        // The entire DATA frame payload is included in flow control,
+        // including the Pad Length and Padding fields if present
+        int len = df.payloadLength();
+        ByteBufferReference[] buffers = df.getData();
+        for (ByteBufferReference b : buffers) {
+            ByteBuffer buf = b.get();
+            if (buf.hasRemaining()) {
+                publisher.acceptData(Optional.of(buf));
+            }
+        }
+        connection.windowUpdater.update(len);
+        if (df.getFlag(DataFrame.END_STREAM)) {
+            setEndStreamReceived();
+            publisher.acceptData(Optional.empty());
+            return false;
+        }
+        // Don't send window update on a stream which is
+        // closed or half closed.
+        windowUpdater.update(len);
+        return true;
+    }
+
     // pushes entire response body into response processor
     // blocking when required by local or remote flow control
     CompletableFuture<T> receiveData(Executor executor) {
         CompletableFuture<T> cf = responseProcessor
                 .getBody()
                 .toCompletableFuture();
-
-        executor.execute(() -> {
-            Http2Frame frame;
-            DataFrame df = null;
-            try {
-                if (!endStreamReceived()) {
-                    do {
-                        frame = inputQ.take();
-                        if (frame instanceof ResetFrame) {
-                            handleReset((ResetFrame)frame);
-                            continue;
-                        } else if (!(frame instanceof DataFrame)) {
-                            assert false;
-                            continue;
-                        }
-                        df = (DataFrame) frame;
-                        // RFC 7540 6.1:
-                        // The entire DATA frame payload is included in flow control,
-                        // including the Pad Length and Padding fields if present
-                        int len = df.payloadLength();
-                        ByteBufferReference[] buffers = df.getData();
-                        for (ByteBufferReference b : buffers) {
-                            publisher.acceptData(Optional.of(b.get()));
-                        }
-                        connection.windowUpdater.update(len);
-                        if (df.getFlag(DataFrame.END_STREAM)) {
-                            break;
-                        }
-                        // Don't send window update on a stream which is
-                        // closed or half closed.
-                        windowUpdater.update(len);
-                    } while (true);
-                    setEndStreamReceived();
-                }
-                publisher.acceptData(Optional.empty());
-            } catch (Throwable e) {
-                Log.logTrace("receiveData: {0}", e.toString());
-                e.printStackTrace();
-                cf.completeExceptionally(e);
-                publisher.acceptError(e);
-            }
-        });
+        Consumer<Throwable> onError = e -> {
+            Log.logTrace("receiveData: {0}", e.toString());
+            e.printStackTrace();
+            cf.completeExceptionally(e);
+            publisher.acceptError(e);
+        };
+        if (executor == null) {
+            inputQ.blockingReceive(this::receiveDataFrame, onError);
+        } else {
+            inputQ.asyncReceive(executor, this::receiveDataFrame, onError);
+        }
         return cf;
     }
 
     @Override
-    void sendBody() throws IOException, InterruptedException {
-        sendBodyImpl();
+    void sendBody() throws IOException {
+        try {
+            sendBodyImpl().join();
+        } catch (CompletionException e) {
+            throw Utils.getIOException(e);
+        }
+    }
+
+    CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
+        return sendBodyImpl().thenApply( v -> this);
     }
 
     @SuppressWarnings("unchecked")
@@ -268,7 +275,6 @@
         };
         this.requestPseudoHeaders = new HttpHeadersImpl();
         // NEW
-        this.inputQ = new Queue<>();
         this.publisher = new BlockingPushPublisher<>();
         this.windowUpdater = new StreamWindowUpdateSender(connection);
     }
@@ -673,6 +679,10 @@
                 response_cfs.add(cf);
             }
         }
+        if (executor != null && !cf.isDone()) {
+            // protect from executing later chain of CompletableFuture operations from SelectorManager thread
+            cf = cf.thenApplyAsync(r -> r, executor);
+        }
         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
         PushGroup<?,?> pg = exchange.getPushGroup();
         if (pg != null) {
@@ -743,20 +753,12 @@
         }
     }
 
-    private void waitForCompletion() throws IOException {
-        try {
-            requestBodyCF.join();
-        } catch (CompletionException e) {
-            throw Utils.getIOException(e);
-        }
-    }
-
-    void sendBodyImpl() throws IOException, InterruptedException {
+    CompletableFuture<Void> sendBodyImpl() {
         RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
         subscriber.setClient(client);
         requestProcessor.subscribe(subscriber);
-        waitForCompletion();
-        requestSent();
+        requestBodyCF.whenComplete((v,t) -> requestSent());
+        return requestBodyCF;
     }
 
     @Override
@@ -846,30 +848,24 @@
         // error record it in the PushGroup. The error method is called
         // with a null value when no error occurred (is a no-op)
         @Override
-        CompletableFuture<Void> sendBodyAsync(Executor executor) {
-            return super.sendBodyAsync(executor)
-                        .whenComplete((Void v, Throwable t) -> {
-                            pushGroup.pushError(t);
-                        });
+        CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
+            return super.sendBodyAsync()
+                        .whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t));
         }
 
         @Override
-        CompletableFuture<Void> sendHeadersAsync() {
+        CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
             return super.sendHeadersAsync()
-                        .whenComplete((Void v, Throwable t) -> {
-                            pushGroup.pushError(t);
-                         });
-        }
-
-        @Override
-        CompletableFuture<Void> sendRequestAsync(Executor executor) {
-            return super.sendRequestAsync(executor)
-                        .whenComplete((v, t) -> pushGroup.pushError(t));
+                        .whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t));
         }
 
         @Override
         CompletableFuture<Response> getResponseAsync(Executor executor) {
-            return pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
+            CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
+            if(executor!=null && !cf.isDone()) {
+                cf  = cf.thenApplyAsync( r -> r, executor);
+            }
+            return cf;
         }
 
         @Override
@@ -887,7 +883,8 @@
             HttpResponseImpl.logResponse(r);
             pushCF.complete(r); // not strictly required for push API
             // start reading the body using the obtained BodyProcessor
-            readBodyAsync(getPushHandler(), false, getExchange().executor())
+            CompletableFuture<Void> start = new MinimalFuture<>();
+            start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
                 .whenComplete((T body, Throwable t) -> {
                     if (t != null) {
                         responseCF.completeExceptionally(t);
@@ -896,6 +893,7 @@
                         responseCF.complete(response);
                     }
                 });
+            start.completeAsync(() -> null, getExchange().executor());
         }
 
         @Override
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncDataReadQueue.java	Tue Feb 21 11:08:34 2017 +0000
@@ -0,0 +1,212 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package jdk.incubator.http.internal.common;
+
+import jdk.incubator.http.internal.frame.DataFrame;
+import jdk.incubator.http.internal.frame.Http2Frame;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+/**
+ * Http2Frame Producer-Consumer queue which either allows to consume all frames in blocking way
+ * or allows to consume it asynchronously. In the latter case put operation from the producer thread
+ * executes consume operation in the given executor.
+ */
+public class AsyncDataReadQueue implements Closeable {
+
+    @FunctionalInterface
+    public interface DataConsumer {
+        /**
+         *
+         * @param t - frame
+         * @return true if consuming should be continued. false when END_STREAM was received.
+         * @throws Throwable
+         */
+        boolean accept(Http2Frame t) throws Throwable;
+    }
+
+    private static final int BLOCKING = 0;
+    private static final int FLUSHING = 1;
+    private static final int REFLUSHING = 2;
+    private static final int ASYNC  = 3;
+    private static final int CLOSED = 4;
+
+
+    private final AtomicInteger state = new AtomicInteger(BLOCKING);
+    private final BlockingQueue<Http2Frame> queue = new LinkedBlockingQueue<>();
+    private Executor executor;
+    private DataConsumer onData;
+    private Consumer<Throwable> onError;
+
+    public AsyncDataReadQueue() {
+    }
+
+    public boolean tryPut(Http2Frame f) {
+        if(state.get() == CLOSED) {
+            return false;
+        } else {
+            queue.offer(f);
+            flushAsync(false);
+            return true;
+        }
+    }
+
+    public void put(Http2Frame f) throws IOException {
+        if(!tryPut(f))
+            throw new IOException("stream closed");
+    }
+
+    public void blockingReceive(DataConsumer onData, Consumer<Throwable> onError) {
+        if (state.get() == CLOSED) {
+            onError.accept(new IOException("stream closed"));
+            return;
+        }
+        assert state.get() == BLOCKING;
+        try {
+            while (onData.accept(queue.take()));
+            assert state.get() == CLOSED;
+        } catch (Throwable e) {
+            onError.accept(e);
+        }
+    }
+
+    public void asyncReceive(Executor executor, DataConsumer onData,
+                             Consumer<Throwable> onError) {
+        if (state.get() == CLOSED) {
+            onError.accept(new IOException("stream closed"));
+            return;
+        }
+
+        assert state.get() == BLOCKING;
+
+        // Validates that fields not already set.
+        if (!checkCanSet("executor", this.executor, onError)
+            || !checkCanSet("onData", this.onData, onError)
+            || !checkCanSet("onError", this.onError, onError)) {
+            return;
+        }
+
+        this.executor = executor;
+        this.onData = onData;
+        this.onError = onError;
+
+        // This will report an error if asyncReceive is called twice,
+        // because we won't be in BLOCKING state if that happens
+        if (!this.state.compareAndSet(BLOCKING, ASYNC)) {
+            onError.accept(new IOException(
+                  new IllegalStateException("State: "+this.state.get())));
+            return;
+        }
+
+        flushAsync(true);
+    }
+
+    private static <T> boolean checkCanSet(String name, T oldval, Consumer<Throwable> onError) {
+        if (oldval != null) {
+            onError.accept(new IOException(
+                     new IllegalArgumentException(name)));
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void close() {
+        int prevState = state.getAndSet(CLOSED);
+        if(prevState == BLOCKING) {
+            // wake up blocked take()
+            queue.offer(new DataFrame(0, DataFrame.END_STREAM, new ByteBufferReference[0]));
+        }
+    }
+
+    private void flushAsync(boolean alreadyInExecutor) {
+        while(true) {
+            switch (state.get()) {
+                case BLOCKING:
+                case CLOSED:
+                case REFLUSHING:
+                    return;
+                case ASYNC:
+                    if(state.compareAndSet(ASYNC, FLUSHING)) {
+                        if(alreadyInExecutor) {
+                            flushLoop();
+                        } else {
+                            executor.execute(this::flushLoop);
+                        }
+                        return;
+                    }
+                    break;
+                case FLUSHING:
+                    if(state.compareAndSet(FLUSHING, REFLUSHING)) {
+                        return;
+                    }
+                    break;
+            }
+        }
+    }
+
+    private void flushLoop() {
+        try {
+            while(true) {
+                Http2Frame frame = queue.poll();
+                while (frame != null) {
+                    if(!onData.accept(frame)) {
+                        assert state.get() == CLOSED;
+                        return; // closed
+                    }
+                    frame = queue.poll();
+                }
+                switch (state.get()) {
+                    case BLOCKING:
+                        assert false;
+                        break;
+                    case ASYNC:
+                        throw new RuntimeException("Shouldn't happen");
+                    case FLUSHING:
+                        if(state.compareAndSet(FLUSHING, ASYNC)) {
+                            return;
+                        }
+                        break;
+                    case REFLUSHING:
+                        // We need to check if new elements were put after last
+                        // poll() and do graceful exit
+                        state.compareAndSet(REFLUSHING, FLUSHING);
+                        break;
+                    case CLOSED:
+                        return;
+                }
+            }
+        } catch (Throwable e) {
+            onError.accept(e);
+            close();
+        }
+    }
+}
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java	Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java	Tue Feb 21 11:08:34 2017 +0000
@@ -26,6 +26,7 @@
 package jdk.incubator.http.internal.common;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
 import java.util.function.BiConsumer;
@@ -40,6 +41,11 @@
  */
 public final class MinimalFuture<T> extends CompletableFuture<T> {
 
+    @FunctionalInterface
+    public interface ExceptionalSupplier<U> {
+        U get() throws Throwable;
+    }
+
     final static AtomicLong TOKENS = new AtomicLong();
     final long id;
 
@@ -56,6 +62,29 @@
         return f;
     }
 
+    public static <U> CompletableFuture<U> supply(ExceptionalSupplier<U> supplier) {
+        CompletableFuture<U> cf = new MinimalFuture<>();
+        try {
+            U value = supplier.get();
+            cf.complete(value);
+        } catch (Throwable t) {
+            cf.completeExceptionally(t);
+        }
+        return cf;
+    }
+
+    public static <U> CompletableFuture<U> supply(ExceptionalSupplier<U> supplier, Executor executor) {
+        CompletableFuture<U> cf = new MinimalFuture<>();
+        cf.completeAsync( () -> {
+            try {
+                return supplier.get();
+            } catch (Throwable ex) {
+                throw new CompletionException(ex);
+            }
+        }, executor);
+        return cf;
+    }
+
     public MinimalFuture() {
         super();
         this.id = TOKENS.incrementAndGet();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/http2/FixedThreadPoolTest.java	Tue Feb 21 11:08:34 2017 +0000
@@ -0,0 +1,241 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @bug 8087112
+ * @library /lib/testlibrary server
+ * @build jdk.testlibrary.SimpleSSLContext
+ * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.common
+ *          jdk.incubator.httpclient/jdk.incubator.http.internal.frame
+ *          jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
+ * @run testng/othervm -Djdk.httpclient.HttpClient.log=ssl,requests,responses,errors FixedThreadPoolTest
+ */
+
+import java.net.*;
+import jdk.incubator.http.*;
+import static jdk.incubator.http.HttpClient.Version.HTTP_2;
+import javax.net.ssl.*;
+import java.nio.file.*;
+import java.util.concurrent.*;
+import jdk.testlibrary.SimpleSSLContext;
+import static jdk.incubator.http.HttpRequest.BodyProcessor.fromFile;
+import static jdk.incubator.http.HttpRequest.BodyProcessor.fromString;
+import static jdk.incubator.http.HttpResponse.BodyHandler.asFile;
+import static jdk.incubator.http.HttpResponse.BodyHandler.asString;
+
+import org.testng.annotations.Test;
+
+@Test
+public class FixedThreadPoolTest {
+    static int httpPort, httpsPort;
+    static Http2TestServer httpServer, httpsServer;
+    static HttpClient client = null;
+    static ExecutorService exec;
+    static SSLContext sslContext;
+
+    static String httpURIString, httpsURIString;
+
+    static void initialize() throws Exception {
+        try {
+            SimpleSSLContext sslct = new SimpleSSLContext();
+            sslContext = sslct.get();
+            client = getClient();
+            httpServer = new Http2TestServer(false, 0, exec, sslContext);
+            httpServer.addHandler(new EchoHandler(), "/");
+            httpPort = httpServer.getAddress().getPort();
+
+            httpsServer = new Http2TestServer(true, 0, exec, sslContext);
+            httpsServer.addHandler(new EchoHandler(), "/");
+
+            httpsPort = httpsServer.getAddress().getPort();
+            httpURIString = "http://127.0.0.1:" + httpPort + "/foo/";
+            httpsURIString = "https://127.0.0.1:" + httpsPort + "/bar/";
+
+            httpServer.start();
+            httpsServer.start();
+        } catch (Throwable e) {
+            System.err.println("Throwing now");
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    @Test(timeOut=3000000)
+    public static void test() throws Exception {
+        try {
+            initialize();
+            simpleTest(false);
+            simpleTest(true);
+            streamTest(false);
+            streamTest(true);
+            paramsTest();
+            Thread.sleep(1000 * 4);
+        } catch (Exception | Error tt) {
+            tt.printStackTrace();
+            throw tt;
+        } finally {
+            httpServer.stop();
+            httpsServer.stop();
+            exec.shutdownNow();
+        }
+    }
+
+    static HttpClient getClient() {
+        if (client == null) {
+            exec = Executors.newCachedThreadPool();
+            client = HttpClient.newBuilder()
+                               .executor(Executors.newFixedThreadPool(2))
+                               .sslContext(sslContext)
+                               .version(HTTP_2)
+                               .build();
+        }
+        return client;
+    }
+
+    static URI getURI(boolean secure) {
+        if (secure)
+            return URI.create(httpsURIString);
+        else
+            return URI.create(httpURIString);
+    }
+
+    static void checkStatus(int expected, int found) throws Exception {
+        if (expected != found) {
+            System.err.printf ("Test failed: wrong status code %d/%d\n",
+                expected, found);
+            throw new RuntimeException("Test failed");
+        }
+    }
+
+    static void checkStrings(String expected, String found) throws Exception {
+        if (!expected.equals(found)) {
+            System.err.printf ("Test failed: wrong string %s/%s\n",
+                expected, found);
+            throw new RuntimeException("Test failed");
+        }
+    }
+
+    static Void compareFiles(Path path1, Path path2) {
+        return TestUtil.compareFiles(path1, path2);
+    }
+
+    static Path tempFile() {
+        return TestUtil.tempFile();
+    }
+
+    static final String SIMPLE_STRING = "Hello world Goodbye world";
+
+    static final int LOOPS = 32;
+    static final int FILESIZE = 64 * 1024 + 200;
+
+    static void streamTest(boolean secure) throws Exception {
+        URI uri = getURI(secure);
+        System.err.printf("streamTest %b to %s\n" , secure, uri);
+
+        HttpClient client = getClient();
+        Path src = TestUtil.getAFile(FILESIZE * 4);
+        HttpRequest req = HttpRequest.newBuilder(uri)
+                                     .POST(fromFile(src))
+                                     .build();
+
+        Path dest = Paths.get("streamtest.txt");
+        dest.toFile().delete();
+        CompletableFuture<Path> response = client.sendAsync(req, asFile(dest))
+                .thenApply(resp -> {
+                    if (resp.statusCode() != 200)
+                        throw new RuntimeException();
+                    return resp.body();
+                });
+        response.join();
+        compareFiles(src, dest);
+        System.err.println("DONE");
+    }
+
+    static void paramsTest() throws Exception {
+        System.err.println("paramsTest");
+        Http2TestServer server = new Http2TestServer(true, 0, exec, sslContext);
+        server.addHandler((t -> {
+            SSLSession s = t.getSSLSession();
+            String prot = s.getProtocol();
+            if (prot.equals("TLSv1.2")) {
+                t.sendResponseHeaders(200, -1);
+            } else {
+                System.err.printf("Protocols =%s\n", prot);
+                t.sendResponseHeaders(500, -1);
+            }
+        }), "/");
+        server.start();
+        int port = server.getAddress().getPort();
+        URI u = new URI("https://127.0.0.1:"+port+"/foo");
+        HttpClient client = getClient();
+        HttpRequest req = HttpRequest.newBuilder(u).build();
+        HttpResponse<String> resp = client.sendAsync(req, asString()).get();
+        int stat = resp.statusCode();
+        if (stat != 200) {
+            throw new RuntimeException("paramsTest failed "
+                + Integer.toString(stat));
+        }
+    }
+
+    static void simpleTest(boolean secure) throws Exception {
+        URI uri = getURI(secure);
+        System.err.println("Request to " + uri);
+
+        // Do a simple warmup request
+
+        HttpClient client = getClient();
+        HttpRequest req = HttpRequest.newBuilder(uri)
+                                     .POST(fromString(SIMPLE_STRING))
+                                     .build();
+        HttpResponse<String> response = client.sendAsync(req, asString()).get();
+        HttpHeaders h = response.headers();
+
+        checkStatus(200, response.statusCode());
+
+        String responseBody = response.body();
+        checkStrings(SIMPLE_STRING, responseBody);
+
+        checkStrings(h.firstValue("x-hello").get(), "world");
+        checkStrings(h.firstValue("x-bye").get(), "universe");
+
+        // Do loops asynchronously
+
+        CompletableFuture[] responses = new CompletableFuture[LOOPS];
+        final Path source = TestUtil.getAFile(FILESIZE);
+        HttpRequest request = HttpRequest.newBuilder(uri)
+                                         .POST(fromFile(source))
+                                         .build();
+        for (int i = 0; i < LOOPS; i++) {
+            responses[i] = client.sendAsync(request, asFile(tempFile()))
+                //.thenApply(resp -> compareFiles(resp.body(), source));
+                .thenApply(resp -> {
+                    System.out.printf("Resp status %d body size %d\n",
+                                      resp.statusCode(), resp.body().toFile().length());
+                    return compareFiles(resp.body(), source);
+                });
+        }
+        CompletableFuture.allOf(responses).join();
+        System.err.println("DONE");
+    }
+}