# HG changeset patch # User skuksenko # Date 1487675314 0 # Node ID f33383dcb1fba04064503358a082fc8e7a3bba0f # Parent d35bbdbe9d36f3ab4dfbc095a610b0d4dc05c8d8 8175274: Fix httpclient asynchronous usage Reviewed-by: dfuchs, michaelm diff -r d35bbdbe9d36 -r f33383dcb1fb jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Mon Feb 20 15:32:37 2017 +0800 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Tue Feb 21 11:08:34 2017 +0000 @@ -27,7 +27,6 @@ import java.io.IOException; import java.net.InetSocketAddress; -import java.net.Proxy; import java.net.ProxySelector; import java.net.SocketPermission; import java.net.URI; @@ -71,7 +70,6 @@ final Executor parentExecutor; final HttpRequest.BodyProcessor requestProcessor; boolean upgrading; // to HTTP/2 - volatile Executor responseExecutor; final PushGroup pushGroup; // buffer for receiving response headers @@ -139,7 +137,7 @@ } public CompletableFuture readBodyAsync(HttpResponse.BodyHandler handler) { - return exchImpl.readBodyAsync(handler, true, responseExecutor); + return exchImpl.readBodyAsync(handler, true, parentExecutor); } public void cancel() { @@ -224,7 +222,8 @@ return checkForUpgrade(resp, exchImpl); } else { - exchImpl.sendRequest(); + exchImpl.sendHeadersOnly(); + exchImpl.sendBody(); Response resp = exchImpl.getResponse(); HttpResponseImpl.logResponse(resp); return checkForUpgrade(resp, exchImpl); @@ -235,8 +234,6 @@ // will be a non null responseAsync if expect continue returns an error public CompletableFuture responseAsync() { - // take one thread from supplied executor to handle response headers and body - responseExecutor = Utils.singleThreadExecutor(parentExecutor); return responseAsyncImpl(null); } @@ -267,20 +264,18 @@ Log.logTrace("Sending Expect: 100-Continue"); return exchImpl .sendHeadersAsync() - .thenCompose((v) -> exchImpl.getResponseAsync(responseExecutor)) + .thenCompose(v -> exchImpl.getResponseAsync(parentExecutor)) .thenCompose((Response r1) -> { HttpResponseImpl.logResponse(r1); int rcode = r1.statusCode(); if (rcode == 100) { Log.logTrace("Received 100-Continue: sending body"); - return exchImpl.sendBodyAsync(parentExecutor) - .thenCompose((v) -> exchImpl.getResponseAsync(responseExecutor)) - .thenCompose((Response r2) -> { - return checkForUpgradeAsync(r2, exchImpl); - }).thenApply((Response r) -> { - HttpResponseImpl.logResponse(r); - return r; - }); + CompletableFuture cf = + exchImpl.sendBodyAsync() + .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor)); + cf = wrapForUpgrade(cf); + cf = wrapForLog(cf); + return cf; } else { Log.logTrace("Expectation failed: Received {0}", rcode); @@ -289,26 +284,38 @@ "Unable to handle 101 while waiting for 100"); return MinimalFuture.failedFuture(failed); } - return exchImpl.readBodyAsync(this::ignoreBody, false, responseExecutor) - .thenApply((v) -> { - return r1; - }); + return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor) + .thenApply(v -> r1); } }); } else { - return exchImpl - .sendRequestAsync(parentExecutor) - .thenCompose((v) -> exchImpl.getResponseAsync(responseExecutor)) - .thenCompose((Response r1) -> { - return checkForUpgradeAsync(r1, exchImpl); - }) - .thenApply((Response response) -> { - HttpResponseImpl.logResponse(response); - return response; - }); + CompletableFuture cf = exchImpl + .sendHeadersAsync() + .thenCompose(ExchangeImpl::sendBodyAsync) + .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor)); + cf = wrapForUpgrade(cf); + cf = wrapForLog(cf); + return cf; } } + private CompletableFuture wrapForUpgrade(CompletableFuture cf) { + if (upgrading) { + return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl)); + } + return cf; + } + + private CompletableFuture wrapForLog(CompletableFuture cf) { + if (Log.requests()) { + return cf.thenApply(response -> { + HttpResponseImpl.logResponse(response); + return response; + }); + } + return cf; + } + HttpResponse.BodyProcessor ignoreBody(int status, HttpHeaders hdrs) { return HttpResponse.BodyProcessor.discard((T)null); } diff -r d35bbdbe9d36 -r f33383dcb1fb jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Mon Feb 20 15:32:37 2017 +0800 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Tue Feb 21 11:08:34 2017 +0000 @@ -102,16 +102,12 @@ // Blocking impl but in async style - CompletableFuture sendHeadersAsync() { - CompletableFuture cf = new MinimalFuture<>(); - try { + CompletableFuture> sendHeadersAsync() { + // this is blocking. cf will already be completed. + return MinimalFuture.supply(() -> { sendHeadersOnly(); - cf.complete(null); - } catch (Throwable t) { - cf.completeExceptionally(t); - } - // this is blocking. cf will already be completed. - return cf; + return this; + }); } /** @@ -156,40 +152,14 @@ // Async version of sendBody(). This only used when body sent separately // to headers (100 continue) - CompletableFuture sendBodyAsync(Executor executor) { - CompletableFuture cf = new MinimalFuture<>(); - executor.execute(() -> { - try { - sendBody(); - cf.complete(null); - } catch (Throwable t) { - cf.completeExceptionally(t); - } + CompletableFuture> sendBodyAsync() { + return MinimalFuture.supply(() -> { + sendBody(); + return this; }); - return cf; } /** - * Sends the entire request (headers and body) blocking. - */ - void sendRequest() throws IOException, InterruptedException { - sendHeadersOnly(); - sendBody(); - } - - CompletableFuture sendRequestAsync(Executor executor) { - CompletableFuture cf = new MinimalFuture<>(); - executor.execute(() -> { - try { - sendRequest(); - cf.complete(null); - } catch (Throwable t) { - cf.completeExceptionally(t); - } - }); - return cf; - } - /** * Cancels a request. Not currently exposed through API. */ abstract void cancel(); diff -r d35bbdbe9d36 -r f33383dcb1fb jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Mon Feb 20 15:32:37 2017 +0800 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Tue Feb 21 11:08:34 2017 +0000 @@ -194,17 +194,11 @@ } CompletableFuture getResponseAsyncImpl(Executor executor) { - CompletableFuture cf = new MinimalFuture<>(); - executor.execute(() -> { - try { - response = new Http1Response<>(connection, Http1Exchange.this); - response.readHeaders(); - cf.complete(response.response()); - } catch (Throwable e) { - cf.completeExceptionally(e); - } - }); - return cf; + return MinimalFuture.supply( () -> { + response = new Http1Response<>(connection, Http1Exchange.this); + response.readHeaders(); + return response.response(); + }, executor); } @Override diff -r d35bbdbe9d36 -r f33383dcb1fb jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Mon Feb 20 15:32:37 2017 +0800 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Tue Feb 21 11:08:34 2017 +0000 @@ -241,14 +241,7 @@ Http2ClientImpl client2, Exchange exchange, ByteBuffer initial) { - CompletableFuture cf = new MinimalFuture<>(); - try { - Http2Connection c = new Http2Connection(connection, client2, exchange, initial); - cf.complete(c); - } catch (IOException | InterruptedException e) { - cf.completeExceptionally(e); - } - return cf; + return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial)); } /** diff -r d35bbdbe9d36 -r f33383dcb1fb jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java Mon Feb 20 15:32:37 2017 +0800 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java Tue Feb 21 11:08:34 2017 +0000 @@ -235,7 +235,7 @@ sendAsync(HttpRequest req, HttpResponse.BodyHandler responseHandler) { MultiExchange mex = new MultiExchange<>(req, this, responseHandler); - return mex.responseAsync(null) + return mex.responseAsync() .thenApply((HttpResponseImpl b) -> (HttpResponse) b); } diff -r d35bbdbe9d36 -r f33383dcb1fb jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java Mon Feb 20 15:32:37 2017 +0800 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java Tue Feb 21 11:08:34 2017 +0000 @@ -35,6 +35,8 @@ import java.util.concurrent.ExecutionException; import java.util.function.BiFunction; import java.util.concurrent.Executor; +import java.util.function.UnaryOperator; + import jdk.incubator.http.internal.common.Log; import jdk.incubator.http.internal.common.MinimalFuture; import jdk.incubator.http.internal.common.Pair; @@ -150,8 +152,7 @@ Exchange currExchange = getExchange(); requestFilters(r); Response response = currExchange.response(); - Pair filterResult = responseFilters(response); - HttpRequestImpl newreq = filterResult.second; + HttpRequestImpl newreq = responseFilters(response); if (newreq == null) { if (attempts > 1) { Log.logError("Succeeded on attempt: " + attempts); @@ -213,23 +214,7 @@ Log.logTrace("All filters applied"); } - // Filters are assumed to be non-blocking so the async - // versions of these methods just call the blocking ones - - private CompletableFuture requestFiltersAsync(HttpRequestImpl r) { - CompletableFuture cf = new MinimalFuture<>(); - try { - requestFilters(r); - cf.complete(null); - } catch(Throwable e) { - cf.completeExceptionally(e); - } - return cf; - } - - - private Pair - responseFilters(Response response) throws IOException + private HttpRequestImpl responseFilters(Response response) throws IOException { Log.logTrace("Applying response filters"); for (HeaderFilter filter : filters) { @@ -237,24 +222,11 @@ HttpRequestImpl newreq = filter.response(response); if (newreq != null) { Log.logTrace("New request: stopping filters"); - return pair(null, newreq); + return newreq; } } Log.logTrace("All filters applied"); - return pair(response, null); - } - - private CompletableFuture> - responseFiltersAsync(Response response) - { - CompletableFuture> cf = new MinimalFuture<>(); - try { - Pair n = responseFilters(response); // assumed to be fast - cf.complete(n); - } catch (Throwable e) { - cf.completeExceptionally(e); - } - return cf; + return null; } public void cancel() { @@ -267,24 +239,27 @@ getExchange().cancel(cause); } - public CompletableFuture> responseAsync(Void v) { - return responseAsync1(null) + public CompletableFuture> responseAsync() { + CompletableFuture start = new MinimalFuture<>(); + CompletableFuture> cf = responseAsync0(start); + start.completeAsync( () -> null, executor); // trigger execution + return cf; + } + + private CompletableFuture> responseAsync0(CompletableFuture start) { + return start.thenCompose( v -> responseAsyncImpl()) .thenCompose((Response r) -> { Exchange exch = getExchange(); return exch.readBodyAsync(responseHandler) - .thenApply((T body) -> { - Pair result = new Pair<>(r, body); - return result; - }); - }) - .thenApply((Pair result) -> { - return new HttpResponseImpl<>(userRequest, result.first, result.second, getExchange()); + .thenApply((T body) -> new HttpResponseImpl<>(userRequest, r, body, exch)); }); } CompletableFuture multiResponseAsync() { - CompletableFuture> mainResponse = responseAsync(null) - .thenApply((HttpResponseImpl b) -> { + CompletableFuture start = new MinimalFuture<>(); + CompletableFuture> cf = responseAsync0(start); + CompletableFuture> mainResponse = + cf.thenApply((HttpResponseImpl b) -> { multiResponseHandler.onResponse(b); return (HttpResponse)b; }); @@ -295,10 +270,12 @@ // All push promises received by now. pushGroup.noMorePushes(true); }); - return multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF()); + CompletableFuture res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF()); + start.completeAsync( () -> null, executor); // trigger execution + return res; } - private CompletableFuture responseAsync1(Void v) { + private CompletableFuture responseAsyncImpl() { CompletableFuture cf; if (++attempts > max_attempts) { cf = MinimalFuture.failedFuture(new IOException("Too many retries")); @@ -307,48 +284,51 @@ timedEvent = new TimedEvent(currentreq.duration()); client.registerTimer(timedEvent); } + try { + // 1. Apply request filters + requestFilters(currentreq); + } catch (IOException e) { + return MinimalFuture.failedFuture(e); + } Exchange exch = getExchange(); - // 1. Apply request filters - cf = requestFiltersAsync(currentreq) - // 2. get response - .thenCompose((v1) -> { - return exch.responseAsync(); - }) - // 3. Apply response filters - .thenCompose(this::responseFiltersAsync) - // 4. Check filter result and repeat or continue - .thenCompose((Pair pair) -> { - Response resp = pair.first; - if (resp != null) { + // 2. get response + cf = exch.responseAsync() + .thenCompose((Response response) -> { + HttpRequestImpl newrequest = null; + try { + // 3. Apply response filters + newrequest = responseFilters(response); + } catch (IOException e) { + return MinimalFuture.failedFuture(e); + } + // 4. Check filter result and repeat or continue + if (newrequest == null) { if (attempts > 1) { Log.logError("Succeeded on attempt: " + attempts); } - return MinimalFuture.completedFuture(resp); + return MinimalFuture.completedFuture(response); } else { - currentreq = pair.second; - Exchange previous = exch; + currentreq = newrequest; setExchange(new Exchange<>(currentreq, this, acc)); //reads body off previous, and then waits for next response - return responseAsync1(null); + return responseAsyncImpl(); } }) - // 5. Convert result to Pair - .handle((BiFunction>) Pair::new) - // 6. Handle errors and cancel any timer set - .thenCompose((Pair obj) -> { - Response response = obj.first; + // 5. Handle errors and cancel any timer set + .handle((response, ex) -> { if (response != null) { return MinimalFuture.completedFuture(response); } // all exceptions thrown are handled here - CompletableFuture error = getExceptionalCF(obj.second); + CompletableFuture error = getExceptionalCF(ex); if (error == null) { cancelTimer(); - return responseAsync1(null); + return responseAsyncImpl(); } else { return error; } - }); + }) + .thenCompose(UnaryOperator.identity()); } return cf; } diff -r d35bbdbe9d36 -r f33383dcb1fb jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java Mon Feb 20 15:32:37 2017 +0800 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java Tue Feb 21 11:08:34 2017 +0000 @@ -50,7 +50,7 @@ .thenCompose((Void v) -> { HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address); MultiExchange mconnectExchange = new MultiExchange<>(req, client, this::ignore); - return mconnectExchange.responseAsync(null) + return mconnectExchange.responseAsync() .thenCompose((HttpResponseImpl resp) -> { CompletableFuture cf = new MinimalFuture<>(); if (resp.statusCode() != 200) { diff -r d35bbdbe9d36 -r f33383dcb1fb jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SSLConnection.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SSLConnection.java Mon Feb 20 15:32:37 2017 +0800 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SSLConnection.java Tue Feb 21 11:08:34 2017 +0000 @@ -50,18 +50,11 @@ @Override public CompletableFuture connectAsync() { return delegate.connectAsync() - .thenCompose((Void v) -> { - CompletableFuture cf = new MinimalFuture<>(); - try { - this.sslDelegate = new SSLDelegate(delegate.channel(), - client, - alpn); - cf.complete(null); - } catch (IOException e) { - cf.completeExceptionally(e); - } - return cf; - }); + .thenCompose((Void v) -> + MinimalFuture.supply( () -> { + this.sslDelegate = new SSLDelegate(delegate.channel(), client, alpn); + return null; + })); } @Override diff -r d35bbdbe9d36 -r f33383dcb1fb jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Mon Feb 20 15:32:37 2017 +0800 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Tue Feb 21 11:08:34 2017 +0000 @@ -39,6 +39,8 @@ import java.util.concurrent.Flow.Subscription; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; + import jdk.incubator.http.internal.common.*; import jdk.incubator.http.internal.frame.*; import jdk.incubator.http.internal.hpack.DecodingCallback; @@ -96,7 +98,7 @@ */ class Stream extends ExchangeImpl { - final Queue inputQ; + final AsyncDataReadQueue inputQ = new AsyncDataReadQueue(); /** * This stream's identifier. Assigned lazily by the HTTP2Connection before @@ -169,7 +171,7 @@ { CompletableFuture cf = readBodyAsync(handler, returnToCache, - this::executeInline); + null); try { return cf.join(); } catch (CompletionException e) { @@ -177,10 +179,6 @@ } } - void executeInline(Runnable r) { - r.run(); - } - @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -189,60 +187,69 @@ return sb.toString(); } + private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException { + if (frame instanceof ResetFrame) { + handleReset((ResetFrame) frame); + return true; + } else if (!(frame instanceof DataFrame)) { + assert false; + return true; + } + DataFrame df = (DataFrame) frame; + // RFC 7540 6.1: + // The entire DATA frame payload is included in flow control, + // including the Pad Length and Padding fields if present + int len = df.payloadLength(); + ByteBufferReference[] buffers = df.getData(); + for (ByteBufferReference b : buffers) { + ByteBuffer buf = b.get(); + if (buf.hasRemaining()) { + publisher.acceptData(Optional.of(buf)); + } + } + connection.windowUpdater.update(len); + if (df.getFlag(DataFrame.END_STREAM)) { + setEndStreamReceived(); + publisher.acceptData(Optional.empty()); + return false; + } + // Don't send window update on a stream which is + // closed or half closed. + windowUpdater.update(len); + return true; + } + // pushes entire response body into response processor // blocking when required by local or remote flow control CompletableFuture receiveData(Executor executor) { CompletableFuture cf = responseProcessor .getBody() .toCompletableFuture(); - - executor.execute(() -> { - Http2Frame frame; - DataFrame df = null; - try { - if (!endStreamReceived()) { - do { - frame = inputQ.take(); - if (frame instanceof ResetFrame) { - handleReset((ResetFrame)frame); - continue; - } else if (!(frame instanceof DataFrame)) { - assert false; - continue; - } - df = (DataFrame) frame; - // RFC 7540 6.1: - // The entire DATA frame payload is included in flow control, - // including the Pad Length and Padding fields if present - int len = df.payloadLength(); - ByteBufferReference[] buffers = df.getData(); - for (ByteBufferReference b : buffers) { - publisher.acceptData(Optional.of(b.get())); - } - connection.windowUpdater.update(len); - if (df.getFlag(DataFrame.END_STREAM)) { - break; - } - // Don't send window update on a stream which is - // closed or half closed. - windowUpdater.update(len); - } while (true); - setEndStreamReceived(); - } - publisher.acceptData(Optional.empty()); - } catch (Throwable e) { - Log.logTrace("receiveData: {0}", e.toString()); - e.printStackTrace(); - cf.completeExceptionally(e); - publisher.acceptError(e); - } - }); + Consumer onError = e -> { + Log.logTrace("receiveData: {0}", e.toString()); + e.printStackTrace(); + cf.completeExceptionally(e); + publisher.acceptError(e); + }; + if (executor == null) { + inputQ.blockingReceive(this::receiveDataFrame, onError); + } else { + inputQ.asyncReceive(executor, this::receiveDataFrame, onError); + } return cf; } @Override - void sendBody() throws IOException, InterruptedException { - sendBodyImpl(); + void sendBody() throws IOException { + try { + sendBodyImpl().join(); + } catch (CompletionException e) { + throw Utils.getIOException(e); + } + } + + CompletableFuture> sendBodyAsync() { + return sendBodyImpl().thenApply( v -> this); } @SuppressWarnings("unchecked") @@ -268,7 +275,6 @@ }; this.requestPseudoHeaders = new HttpHeadersImpl(); // NEW - this.inputQ = new Queue<>(); this.publisher = new BlockingPushPublisher<>(); this.windowUpdater = new StreamWindowUpdateSender(connection); } @@ -673,6 +679,10 @@ response_cfs.add(cf); } } + if (executor != null && !cf.isDone()) { + // protect from executing later chain of CompletableFuture operations from SelectorManager thread + cf = cf.thenApplyAsync(r -> r, executor); + } Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf); PushGroup pg = exchange.getPushGroup(); if (pg != null) { @@ -743,20 +753,12 @@ } } - private void waitForCompletion() throws IOException { - try { - requestBodyCF.join(); - } catch (CompletionException e) { - throw Utils.getIOException(e); - } - } - - void sendBodyImpl() throws IOException, InterruptedException { + CompletableFuture sendBodyImpl() { RequestSubscriber subscriber = new RequestSubscriber(requestContentLen); subscriber.setClient(client); requestProcessor.subscribe(subscriber); - waitForCompletion(); - requestSent(); + requestBodyCF.whenComplete((v,t) -> requestSent()); + return requestBodyCF; } @Override @@ -846,30 +848,24 @@ // error record it in the PushGroup. The error method is called // with a null value when no error occurred (is a no-op) @Override - CompletableFuture sendBodyAsync(Executor executor) { - return super.sendBodyAsync(executor) - .whenComplete((Void v, Throwable t) -> { - pushGroup.pushError(t); - }); + CompletableFuture> sendBodyAsync() { + return super.sendBodyAsync() + .whenComplete((ExchangeImpl v, Throwable t) -> pushGroup.pushError(t)); } @Override - CompletableFuture sendHeadersAsync() { + CompletableFuture> sendHeadersAsync() { return super.sendHeadersAsync() - .whenComplete((Void v, Throwable t) -> { - pushGroup.pushError(t); - }); - } - - @Override - CompletableFuture sendRequestAsync(Executor executor) { - return super.sendRequestAsync(executor) - .whenComplete((v, t) -> pushGroup.pushError(t)); + .whenComplete((ExchangeImpl ex, Throwable t) -> pushGroup.pushError(t)); } @Override CompletableFuture getResponseAsync(Executor executor) { - return pushCF.whenComplete((v, t) -> pushGroup.pushError(t)); + CompletableFuture cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t)); + if(executor!=null && !cf.isDone()) { + cf = cf.thenApplyAsync( r -> r, executor); + } + return cf; } @Override @@ -887,7 +883,8 @@ HttpResponseImpl.logResponse(r); pushCF.complete(r); // not strictly required for push API // start reading the body using the obtained BodyProcessor - readBodyAsync(getPushHandler(), false, getExchange().executor()) + CompletableFuture start = new MinimalFuture<>(); + start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor())) .whenComplete((T body, Throwable t) -> { if (t != null) { responseCF.completeExceptionally(t); @@ -896,6 +893,7 @@ responseCF.complete(response); } }); + start.completeAsync(() -> null, getExchange().executor()); } @Override diff -r d35bbdbe9d36 -r f33383dcb1fb jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncDataReadQueue.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncDataReadQueue.java Tue Feb 21 11:08:34 2017 +0000 @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package jdk.incubator.http.internal.common; + +import jdk.incubator.http.internal.frame.DataFrame; +import jdk.incubator.http.internal.frame.Http2Frame; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +/** + * Http2Frame Producer-Consumer queue which either allows to consume all frames in blocking way + * or allows to consume it asynchronously. In the latter case put operation from the producer thread + * executes consume operation in the given executor. + */ +public class AsyncDataReadQueue implements Closeable { + + @FunctionalInterface + public interface DataConsumer { + /** + * + * @param t - frame + * @return true if consuming should be continued. false when END_STREAM was received. + * @throws Throwable + */ + boolean accept(Http2Frame t) throws Throwable; + } + + private static final int BLOCKING = 0; + private static final int FLUSHING = 1; + private static final int REFLUSHING = 2; + private static final int ASYNC = 3; + private static final int CLOSED = 4; + + + private final AtomicInteger state = new AtomicInteger(BLOCKING); + private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private Executor executor; + private DataConsumer onData; + private Consumer onError; + + public AsyncDataReadQueue() { + } + + public boolean tryPut(Http2Frame f) { + if(state.get() == CLOSED) { + return false; + } else { + queue.offer(f); + flushAsync(false); + return true; + } + } + + public void put(Http2Frame f) throws IOException { + if(!tryPut(f)) + throw new IOException("stream closed"); + } + + public void blockingReceive(DataConsumer onData, Consumer onError) { + if (state.get() == CLOSED) { + onError.accept(new IOException("stream closed")); + return; + } + assert state.get() == BLOCKING; + try { + while (onData.accept(queue.take())); + assert state.get() == CLOSED; + } catch (Throwable e) { + onError.accept(e); + } + } + + public void asyncReceive(Executor executor, DataConsumer onData, + Consumer onError) { + if (state.get() == CLOSED) { + onError.accept(new IOException("stream closed")); + return; + } + + assert state.get() == BLOCKING; + + // Validates that fields not already set. + if (!checkCanSet("executor", this.executor, onError) + || !checkCanSet("onData", this.onData, onError) + || !checkCanSet("onError", this.onError, onError)) { + return; + } + + this.executor = executor; + this.onData = onData; + this.onError = onError; + + // This will report an error if asyncReceive is called twice, + // because we won't be in BLOCKING state if that happens + if (!this.state.compareAndSet(BLOCKING, ASYNC)) { + onError.accept(new IOException( + new IllegalStateException("State: "+this.state.get()))); + return; + } + + flushAsync(true); + } + + private static boolean checkCanSet(String name, T oldval, Consumer onError) { + if (oldval != null) { + onError.accept(new IOException( + new IllegalArgumentException(name))); + return false; + } + return true; + } + + @Override + public void close() { + int prevState = state.getAndSet(CLOSED); + if(prevState == BLOCKING) { + // wake up blocked take() + queue.offer(new DataFrame(0, DataFrame.END_STREAM, new ByteBufferReference[0])); + } + } + + private void flushAsync(boolean alreadyInExecutor) { + while(true) { + switch (state.get()) { + case BLOCKING: + case CLOSED: + case REFLUSHING: + return; + case ASYNC: + if(state.compareAndSet(ASYNC, FLUSHING)) { + if(alreadyInExecutor) { + flushLoop(); + } else { + executor.execute(this::flushLoop); + } + return; + } + break; + case FLUSHING: + if(state.compareAndSet(FLUSHING, REFLUSHING)) { + return; + } + break; + } + } + } + + private void flushLoop() { + try { + while(true) { + Http2Frame frame = queue.poll(); + while (frame != null) { + if(!onData.accept(frame)) { + assert state.get() == CLOSED; + return; // closed + } + frame = queue.poll(); + } + switch (state.get()) { + case BLOCKING: + assert false; + break; + case ASYNC: + throw new RuntimeException("Shouldn't happen"); + case FLUSHING: + if(state.compareAndSet(FLUSHING, ASYNC)) { + return; + } + break; + case REFLUSHING: + // We need to check if new elements were put after last + // poll() and do graceful exit + state.compareAndSet(REFLUSHING, FLUSHING); + break; + case CLOSED: + return; + } + } + } catch (Throwable e) { + onError.accept(e); + close(); + } + } +} diff -r d35bbdbe9d36 -r f33383dcb1fb jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java --- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java Mon Feb 20 15:32:37 2017 +0800 +++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java Tue Feb 21 11:08:34 2017 +0000 @@ -26,6 +26,7 @@ package jdk.incubator.http.internal.common; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.function.BiConsumer; @@ -40,6 +41,11 @@ */ public final class MinimalFuture extends CompletableFuture { + @FunctionalInterface + public interface ExceptionalSupplier { + U get() throws Throwable; + } + final static AtomicLong TOKENS = new AtomicLong(); final long id; @@ -56,6 +62,29 @@ return f; } + public static CompletableFuture supply(ExceptionalSupplier supplier) { + CompletableFuture cf = new MinimalFuture<>(); + try { + U value = supplier.get(); + cf.complete(value); + } catch (Throwable t) { + cf.completeExceptionally(t); + } + return cf; + } + + public static CompletableFuture supply(ExceptionalSupplier supplier, Executor executor) { + CompletableFuture cf = new MinimalFuture<>(); + cf.completeAsync( () -> { + try { + return supplier.get(); + } catch (Throwable ex) { + throw new CompletionException(ex); + } + }, executor); + return cf; + } + public MinimalFuture() { super(); this.id = TOKENS.incrementAndGet(); diff -r d35bbdbe9d36 -r f33383dcb1fb jdk/test/java/net/httpclient/http2/FixedThreadPoolTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/jdk/test/java/net/httpclient/http2/FixedThreadPoolTest.java Tue Feb 21 11:08:34 2017 +0000 @@ -0,0 +1,241 @@ +/* + * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * @test + * @bug 8087112 + * @library /lib/testlibrary server + * @build jdk.testlibrary.SimpleSSLContext + * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.common + * jdk.incubator.httpclient/jdk.incubator.http.internal.frame + * jdk.incubator.httpclient/jdk.incubator.http.internal.hpack + * @run testng/othervm -Djdk.httpclient.HttpClient.log=ssl,requests,responses,errors FixedThreadPoolTest + */ + +import java.net.*; +import jdk.incubator.http.*; +import static jdk.incubator.http.HttpClient.Version.HTTP_2; +import javax.net.ssl.*; +import java.nio.file.*; +import java.util.concurrent.*; +import jdk.testlibrary.SimpleSSLContext; +import static jdk.incubator.http.HttpRequest.BodyProcessor.fromFile; +import static jdk.incubator.http.HttpRequest.BodyProcessor.fromString; +import static jdk.incubator.http.HttpResponse.BodyHandler.asFile; +import static jdk.incubator.http.HttpResponse.BodyHandler.asString; + +import org.testng.annotations.Test; + +@Test +public class FixedThreadPoolTest { + static int httpPort, httpsPort; + static Http2TestServer httpServer, httpsServer; + static HttpClient client = null; + static ExecutorService exec; + static SSLContext sslContext; + + static String httpURIString, httpsURIString; + + static void initialize() throws Exception { + try { + SimpleSSLContext sslct = new SimpleSSLContext(); + sslContext = sslct.get(); + client = getClient(); + httpServer = new Http2TestServer(false, 0, exec, sslContext); + httpServer.addHandler(new EchoHandler(), "/"); + httpPort = httpServer.getAddress().getPort(); + + httpsServer = new Http2TestServer(true, 0, exec, sslContext); + httpsServer.addHandler(new EchoHandler(), "/"); + + httpsPort = httpsServer.getAddress().getPort(); + httpURIString = "http://127.0.0.1:" + httpPort + "/foo/"; + httpsURIString = "https://127.0.0.1:" + httpsPort + "/bar/"; + + httpServer.start(); + httpsServer.start(); + } catch (Throwable e) { + System.err.println("Throwing now"); + e.printStackTrace(); + throw e; + } + } + + @Test(timeOut=3000000) + public static void test() throws Exception { + try { + initialize(); + simpleTest(false); + simpleTest(true); + streamTest(false); + streamTest(true); + paramsTest(); + Thread.sleep(1000 * 4); + } catch (Exception | Error tt) { + tt.printStackTrace(); + throw tt; + } finally { + httpServer.stop(); + httpsServer.stop(); + exec.shutdownNow(); + } + } + + static HttpClient getClient() { + if (client == null) { + exec = Executors.newCachedThreadPool(); + client = HttpClient.newBuilder() + .executor(Executors.newFixedThreadPool(2)) + .sslContext(sslContext) + .version(HTTP_2) + .build(); + } + return client; + } + + static URI getURI(boolean secure) { + if (secure) + return URI.create(httpsURIString); + else + return URI.create(httpURIString); + } + + static void checkStatus(int expected, int found) throws Exception { + if (expected != found) { + System.err.printf ("Test failed: wrong status code %d/%d\n", + expected, found); + throw new RuntimeException("Test failed"); + } + } + + static void checkStrings(String expected, String found) throws Exception { + if (!expected.equals(found)) { + System.err.printf ("Test failed: wrong string %s/%s\n", + expected, found); + throw new RuntimeException("Test failed"); + } + } + + static Void compareFiles(Path path1, Path path2) { + return TestUtil.compareFiles(path1, path2); + } + + static Path tempFile() { + return TestUtil.tempFile(); + } + + static final String SIMPLE_STRING = "Hello world Goodbye world"; + + static final int LOOPS = 32; + static final int FILESIZE = 64 * 1024 + 200; + + static void streamTest(boolean secure) throws Exception { + URI uri = getURI(secure); + System.err.printf("streamTest %b to %s\n" , secure, uri); + + HttpClient client = getClient(); + Path src = TestUtil.getAFile(FILESIZE * 4); + HttpRequest req = HttpRequest.newBuilder(uri) + .POST(fromFile(src)) + .build(); + + Path dest = Paths.get("streamtest.txt"); + dest.toFile().delete(); + CompletableFuture response = client.sendAsync(req, asFile(dest)) + .thenApply(resp -> { + if (resp.statusCode() != 200) + throw new RuntimeException(); + return resp.body(); + }); + response.join(); + compareFiles(src, dest); + System.err.println("DONE"); + } + + static void paramsTest() throws Exception { + System.err.println("paramsTest"); + Http2TestServer server = new Http2TestServer(true, 0, exec, sslContext); + server.addHandler((t -> { + SSLSession s = t.getSSLSession(); + String prot = s.getProtocol(); + if (prot.equals("TLSv1.2")) { + t.sendResponseHeaders(200, -1); + } else { + System.err.printf("Protocols =%s\n", prot); + t.sendResponseHeaders(500, -1); + } + }), "/"); + server.start(); + int port = server.getAddress().getPort(); + URI u = new URI("https://127.0.0.1:"+port+"/foo"); + HttpClient client = getClient(); + HttpRequest req = HttpRequest.newBuilder(u).build(); + HttpResponse resp = client.sendAsync(req, asString()).get(); + int stat = resp.statusCode(); + if (stat != 200) { + throw new RuntimeException("paramsTest failed " + + Integer.toString(stat)); + } + } + + static void simpleTest(boolean secure) throws Exception { + URI uri = getURI(secure); + System.err.println("Request to " + uri); + + // Do a simple warmup request + + HttpClient client = getClient(); + HttpRequest req = HttpRequest.newBuilder(uri) + .POST(fromString(SIMPLE_STRING)) + .build(); + HttpResponse response = client.sendAsync(req, asString()).get(); + HttpHeaders h = response.headers(); + + checkStatus(200, response.statusCode()); + + String responseBody = response.body(); + checkStrings(SIMPLE_STRING, responseBody); + + checkStrings(h.firstValue("x-hello").get(), "world"); + checkStrings(h.firstValue("x-bye").get(), "universe"); + + // Do loops asynchronously + + CompletableFuture[] responses = new CompletableFuture[LOOPS]; + final Path source = TestUtil.getAFile(FILESIZE); + HttpRequest request = HttpRequest.newBuilder(uri) + .POST(fromFile(source)) + .build(); + for (int i = 0; i < LOOPS; i++) { + responses[i] = client.sendAsync(request, asFile(tempFile())) + //.thenApply(resp -> compareFiles(resp.body(), source)); + .thenApply(resp -> { + System.out.printf("Resp status %d body size %d\n", + resp.statusCode(), resp.body().toFile().length()); + return compareFiles(resp.body(), source); + }); + } + CompletableFuture.allOf(responses).join(); + System.err.println("DONE"); + } +}