--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Tue Feb 21 11:08:34 2017 +0000
@@ -27,7 +27,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.Proxy;
import java.net.ProxySelector;
import java.net.SocketPermission;
import java.net.URI;
@@ -71,7 +70,6 @@
final Executor parentExecutor;
final HttpRequest.BodyProcessor requestProcessor;
boolean upgrading; // to HTTP/2
- volatile Executor responseExecutor;
final PushGroup<?,T> pushGroup;
// buffer for receiving response headers
@@ -139,7 +137,7 @@
}
public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
- return exchImpl.readBodyAsync(handler, true, responseExecutor);
+ return exchImpl.readBodyAsync(handler, true, parentExecutor);
}
public void cancel() {
@@ -224,7 +222,8 @@
return checkForUpgrade(resp, exchImpl);
} else {
- exchImpl.sendRequest();
+ exchImpl.sendHeadersOnly();
+ exchImpl.sendBody();
Response resp = exchImpl.getResponse();
HttpResponseImpl.logResponse(resp);
return checkForUpgrade(resp, exchImpl);
@@ -235,8 +234,6 @@
// will be a non null responseAsync if expect continue returns an error
public CompletableFuture<Response> responseAsync() {
- // take one thread from supplied executor to handle response headers and body
- responseExecutor = Utils.singleThreadExecutor(parentExecutor);
return responseAsyncImpl(null);
}
@@ -267,20 +264,18 @@
Log.logTrace("Sending Expect: 100-Continue");
return exchImpl
.sendHeadersAsync()
- .thenCompose((v) -> exchImpl.getResponseAsync(responseExecutor))
+ .thenCompose(v -> exchImpl.getResponseAsync(parentExecutor))
.thenCompose((Response r1) -> {
HttpResponseImpl.logResponse(r1);
int rcode = r1.statusCode();
if (rcode == 100) {
Log.logTrace("Received 100-Continue: sending body");
- return exchImpl.sendBodyAsync(parentExecutor)
- .thenCompose((v) -> exchImpl.getResponseAsync(responseExecutor))
- .thenCompose((Response r2) -> {
- return checkForUpgradeAsync(r2, exchImpl);
- }).thenApply((Response r) -> {
- HttpResponseImpl.logResponse(r);
- return r;
- });
+ CompletableFuture<Response> cf =
+ exchImpl.sendBodyAsync()
+ .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
+ cf = wrapForUpgrade(cf);
+ cf = wrapForLog(cf);
+ return cf;
} else {
Log.logTrace("Expectation failed: Received {0}",
rcode);
@@ -289,26 +284,38 @@
"Unable to handle 101 while waiting for 100");
return MinimalFuture.failedFuture(failed);
}
- return exchImpl.readBodyAsync(this::ignoreBody, false, responseExecutor)
- .thenApply((v) -> {
- return r1;
- });
+ return exchImpl.readBodyAsync(this::ignoreBody, false, parentExecutor)
+ .thenApply(v -> r1);
}
});
} else {
- return exchImpl
- .sendRequestAsync(parentExecutor)
- .thenCompose((v) -> exchImpl.getResponseAsync(responseExecutor))
- .thenCompose((Response r1) -> {
- return checkForUpgradeAsync(r1, exchImpl);
- })
- .thenApply((Response response) -> {
- HttpResponseImpl.logResponse(response);
- return response;
- });
+ CompletableFuture<Response> cf = exchImpl
+ .sendHeadersAsync()
+ .thenCompose(ExchangeImpl::sendBodyAsync)
+ .thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
+ cf = wrapForUpgrade(cf);
+ cf = wrapForLog(cf);
+ return cf;
}
}
+ private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> cf) {
+ if (upgrading) {
+ return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl));
+ }
+ return cf;
+ }
+
+ private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) {
+ if (Log.requests()) {
+ return cf.thenApply(response -> {
+ HttpResponseImpl.logResponse(response);
+ return response;
+ });
+ }
+ return cf;
+ }
+
HttpResponse.BodyProcessor<T> ignoreBody(int status, HttpHeaders hdrs) {
return HttpResponse.BodyProcessor.discard((T)null);
}
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Tue Feb 21 11:08:34 2017 +0000
@@ -102,16 +102,12 @@
// Blocking impl but in async style
- CompletableFuture<Void> sendHeadersAsync() {
- CompletableFuture<Void> cf = new MinimalFuture<>();
- try {
+ CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
+ // this is blocking. cf will already be completed.
+ return MinimalFuture.supply(() -> {
sendHeadersOnly();
- cf.complete(null);
- } catch (Throwable t) {
- cf.completeExceptionally(t);
- }
- // this is blocking. cf will already be completed.
- return cf;
+ return this;
+ });
}
/**
@@ -156,40 +152,14 @@
// Async version of sendBody(). This only used when body sent separately
// to headers (100 continue)
- CompletableFuture<Void> sendBodyAsync(Executor executor) {
- CompletableFuture<Void> cf = new MinimalFuture<>();
- executor.execute(() -> {
- try {
- sendBody();
- cf.complete(null);
- } catch (Throwable t) {
- cf.completeExceptionally(t);
- }
+ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
+ return MinimalFuture.supply(() -> {
+ sendBody();
+ return this;
});
- return cf;
}
/**
- * Sends the entire request (headers and body) blocking.
- */
- void sendRequest() throws IOException, InterruptedException {
- sendHeadersOnly();
- sendBody();
- }
-
- CompletableFuture<Void> sendRequestAsync(Executor executor) {
- CompletableFuture<Void> cf = new MinimalFuture<>();
- executor.execute(() -> {
- try {
- sendRequest();
- cf.complete(null);
- } catch (Throwable t) {
- cf.completeExceptionally(t);
- }
- });
- return cf;
- }
- /**
* Cancels a request. Not currently exposed through API.
*/
abstract void cancel();
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Tue Feb 21 11:08:34 2017 +0000
@@ -194,17 +194,11 @@
}
CompletableFuture<Response> getResponseAsyncImpl(Executor executor) {
- CompletableFuture<Response> cf = new MinimalFuture<>();
- executor.execute(() -> {
- try {
- response = new Http1Response<>(connection, Http1Exchange.this);
- response.readHeaders();
- cf.complete(response.response());
- } catch (Throwable e) {
- cf.completeExceptionally(e);
- }
- });
- return cf;
+ return MinimalFuture.supply( () -> {
+ response = new Http1Response<>(connection, Http1Exchange.this);
+ response.readHeaders();
+ return response.response();
+ }, executor);
}
@Override
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Tue Feb 21 11:08:34 2017 +0000
@@ -241,14 +241,7 @@
Http2ClientImpl client2,
Exchange<?> exchange,
ByteBuffer initial) {
- CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
- try {
- Http2Connection c = new Http2Connection(connection, client2, exchange, initial);
- cf.complete(c);
- } catch (IOException | InterruptedException e) {
- cf.completeExceptionally(e);
- }
- return cf;
+ return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
}
/**
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpClientImpl.java Tue Feb 21 11:08:34 2017 +0000
@@ -235,7 +235,7 @@
sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseHandler)
{
MultiExchange<Void,T> mex = new MultiExchange<>(req, this, responseHandler);
- return mex.responseAsync(null)
+ return mex.responseAsync()
.thenApply((HttpResponseImpl<T> b) -> (HttpResponse<T>) b);
}
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java Tue Feb 21 11:08:34 2017 +0000
@@ -35,6 +35,8 @@
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.concurrent.Executor;
+import java.util.function.UnaryOperator;
+
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Pair;
@@ -150,8 +152,7 @@
Exchange<T> currExchange = getExchange();
requestFilters(r);
Response response = currExchange.response();
- Pair<Response, HttpRequestImpl> filterResult = responseFilters(response);
- HttpRequestImpl newreq = filterResult.second;
+ HttpRequestImpl newreq = responseFilters(response);
if (newreq == null) {
if (attempts > 1) {
Log.logError("Succeeded on attempt: " + attempts);
@@ -213,23 +214,7 @@
Log.logTrace("All filters applied");
}
- // Filters are assumed to be non-blocking so the async
- // versions of these methods just call the blocking ones
-
- private CompletableFuture<Void> requestFiltersAsync(HttpRequestImpl r) {
- CompletableFuture<Void> cf = new MinimalFuture<>();
- try {
- requestFilters(r);
- cf.complete(null);
- } catch(Throwable e) {
- cf.completeExceptionally(e);
- }
- return cf;
- }
-
-
- private Pair<Response,HttpRequestImpl>
- responseFilters(Response response) throws IOException
+ private HttpRequestImpl responseFilters(Response response) throws IOException
{
Log.logTrace("Applying response filters");
for (HeaderFilter filter : filters) {
@@ -237,24 +222,11 @@
HttpRequestImpl newreq = filter.response(response);
if (newreq != null) {
Log.logTrace("New request: stopping filters");
- return pair(null, newreq);
+ return newreq;
}
}
Log.logTrace("All filters applied");
- return pair(response, null);
- }
-
- private CompletableFuture<Pair<Response,HttpRequestImpl>>
- responseFiltersAsync(Response response)
- {
- CompletableFuture<Pair<Response,HttpRequestImpl>> cf = new MinimalFuture<>();
- try {
- Pair<Response,HttpRequestImpl> n = responseFilters(response); // assumed to be fast
- cf.complete(n);
- } catch (Throwable e) {
- cf.completeExceptionally(e);
- }
- return cf;
+ return null;
}
public void cancel() {
@@ -267,24 +239,27 @@
getExchange().cancel(cause);
}
- public CompletableFuture<HttpResponseImpl<T>> responseAsync(Void v) {
- return responseAsync1(null)
+ public CompletableFuture<HttpResponseImpl<T>> responseAsync() {
+ CompletableFuture<Void> start = new MinimalFuture<>();
+ CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
+ start.completeAsync( () -> null, executor); // trigger execution
+ return cf;
+ }
+
+ private CompletableFuture<HttpResponseImpl<T>> responseAsync0(CompletableFuture<Void> start) {
+ return start.thenCompose( v -> responseAsyncImpl())
.thenCompose((Response r) -> {
Exchange<T> exch = getExchange();
return exch.readBodyAsync(responseHandler)
- .thenApply((T body) -> {
- Pair<Response,T> result = new Pair<>(r, body);
- return result;
- });
- })
- .thenApply((Pair<Response,T> result) -> {
- return new HttpResponseImpl<>(userRequest, result.first, result.second, getExchange());
+ .thenApply((T body) -> new HttpResponseImpl<>(userRequest, r, body, exch));
});
}
CompletableFuture<U> multiResponseAsync() {
- CompletableFuture<HttpResponse<T>> mainResponse = responseAsync(null)
- .thenApply((HttpResponseImpl<T> b) -> {
+ CompletableFuture<Void> start = new MinimalFuture<>();
+ CompletableFuture<HttpResponseImpl<T>> cf = responseAsync0(start);
+ CompletableFuture<HttpResponse<T>> mainResponse =
+ cf.thenApply((HttpResponseImpl<T> b) -> {
multiResponseHandler.onResponse(b);
return (HttpResponse<T>)b;
});
@@ -295,10 +270,12 @@
// All push promises received by now.
pushGroup.noMorePushes(true);
});
- return multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF());
+ CompletableFuture<U> res = multiResponseHandler.completion(pushGroup.groupResult(), pushGroup.pushesCF());
+ start.completeAsync( () -> null, executor); // trigger execution
+ return res;
}
- private CompletableFuture<Response> responseAsync1(Void v) {
+ private CompletableFuture<Response> responseAsyncImpl() {
CompletableFuture<Response> cf;
if (++attempts > max_attempts) {
cf = MinimalFuture.failedFuture(new IOException("Too many retries"));
@@ -307,48 +284,51 @@
timedEvent = new TimedEvent(currentreq.duration());
client.registerTimer(timedEvent);
}
+ try {
+ // 1. Apply request filters
+ requestFilters(currentreq);
+ } catch (IOException e) {
+ return MinimalFuture.failedFuture(e);
+ }
Exchange<T> exch = getExchange();
- // 1. Apply request filters
- cf = requestFiltersAsync(currentreq)
- // 2. get response
- .thenCompose((v1) -> {
- return exch.responseAsync();
- })
- // 3. Apply response filters
- .thenCompose(this::responseFiltersAsync)
- // 4. Check filter result and repeat or continue
- .thenCompose((Pair<Response,HttpRequestImpl> pair) -> {
- Response resp = pair.first;
- if (resp != null) {
+ // 2. get response
+ cf = exch.responseAsync()
+ .thenCompose((Response response) -> {
+ HttpRequestImpl newrequest = null;
+ try {
+ // 3. Apply response filters
+ newrequest = responseFilters(response);
+ } catch (IOException e) {
+ return MinimalFuture.failedFuture(e);
+ }
+ // 4. Check filter result and repeat or continue
+ if (newrequest == null) {
if (attempts > 1) {
Log.logError("Succeeded on attempt: " + attempts);
}
- return MinimalFuture.completedFuture(resp);
+ return MinimalFuture.completedFuture(response);
} else {
- currentreq = pair.second;
- Exchange<T> previous = exch;
+ currentreq = newrequest;
setExchange(new Exchange<>(currentreq, this, acc));
//reads body off previous, and then waits for next response
- return responseAsync1(null);
+ return responseAsyncImpl();
}
})
- // 5. Convert result to Pair
- .handle((BiFunction<Response, Throwable, Pair<Response, Throwable>>) Pair::new)
- // 6. Handle errors and cancel any timer set
- .thenCompose((Pair<Response,Throwable> obj) -> {
- Response response = obj.first;
+ // 5. Handle errors and cancel any timer set
+ .handle((response, ex) -> {
if (response != null) {
return MinimalFuture.completedFuture(response);
}
// all exceptions thrown are handled here
- CompletableFuture<Response> error = getExceptionalCF(obj.second);
+ CompletableFuture<Response> error = getExceptionalCF(ex);
if (error == null) {
cancelTimer();
- return responseAsync1(null);
+ return responseAsyncImpl();
} else {
return error;
}
- });
+ })
+ .thenCompose(UnaryOperator.identity());
}
return cf;
}
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java Tue Feb 21 11:08:34 2017 +0000
@@ -50,7 +50,7 @@
.thenCompose((Void v) -> {
HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address);
MultiExchange<Void,Void> mconnectExchange = new MultiExchange<>(req, client, this::ignore);
- return mconnectExchange.responseAsync(null)
+ return mconnectExchange.responseAsync()
.thenCompose((HttpResponseImpl<Void> resp) -> {
CompletableFuture<Void> cf = new MinimalFuture<>();
if (resp.statusCode() != 200) {
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SSLConnection.java Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SSLConnection.java Tue Feb 21 11:08:34 2017 +0000
@@ -50,18 +50,11 @@
@Override
public CompletableFuture<Void> connectAsync() {
return delegate.connectAsync()
- .thenCompose((Void v) -> {
- CompletableFuture<Void> cf = new MinimalFuture<>();
- try {
- this.sslDelegate = new SSLDelegate(delegate.channel(),
- client,
- alpn);
- cf.complete(null);
- } catch (IOException e) {
- cf.completeExceptionally(e);
- }
- return cf;
- });
+ .thenCompose((Void v) ->
+ MinimalFuture.supply( () -> {
+ this.sslDelegate = new SSLDelegate(delegate.channel(), client, alpn);
+ return null;
+ }));
}
@Override
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Tue Feb 21 11:08:34 2017 +0000
@@ -39,6 +39,8 @@
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
import jdk.incubator.http.internal.common.*;
import jdk.incubator.http.internal.frame.*;
import jdk.incubator.http.internal.hpack.DecodingCallback;
@@ -96,7 +98,7 @@
*/
class Stream<T> extends ExchangeImpl<T> {
- final Queue<Http2Frame> inputQ;
+ final AsyncDataReadQueue inputQ = new AsyncDataReadQueue();
/**
* This stream's identifier. Assigned lazily by the HTTP2Connection before
@@ -169,7 +171,7 @@
{
CompletableFuture<T> cf = readBodyAsync(handler,
returnToCache,
- this::executeInline);
+ null);
try {
return cf.join();
} catch (CompletionException e) {
@@ -177,10 +179,6 @@
}
}
- void executeInline(Runnable r) {
- r.run();
- }
-
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -189,60 +187,69 @@
return sb.toString();
}
+ private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
+ if (frame instanceof ResetFrame) {
+ handleReset((ResetFrame) frame);
+ return true;
+ } else if (!(frame instanceof DataFrame)) {
+ assert false;
+ return true;
+ }
+ DataFrame df = (DataFrame) frame;
+ // RFC 7540 6.1:
+ // The entire DATA frame payload is included in flow control,
+ // including the Pad Length and Padding fields if present
+ int len = df.payloadLength();
+ ByteBufferReference[] buffers = df.getData();
+ for (ByteBufferReference b : buffers) {
+ ByteBuffer buf = b.get();
+ if (buf.hasRemaining()) {
+ publisher.acceptData(Optional.of(buf));
+ }
+ }
+ connection.windowUpdater.update(len);
+ if (df.getFlag(DataFrame.END_STREAM)) {
+ setEndStreamReceived();
+ publisher.acceptData(Optional.empty());
+ return false;
+ }
+ // Don't send window update on a stream which is
+ // closed or half closed.
+ windowUpdater.update(len);
+ return true;
+ }
+
// pushes entire response body into response processor
// blocking when required by local or remote flow control
CompletableFuture<T> receiveData(Executor executor) {
CompletableFuture<T> cf = responseProcessor
.getBody()
.toCompletableFuture();
-
- executor.execute(() -> {
- Http2Frame frame;
- DataFrame df = null;
- try {
- if (!endStreamReceived()) {
- do {
- frame = inputQ.take();
- if (frame instanceof ResetFrame) {
- handleReset((ResetFrame)frame);
- continue;
- } else if (!(frame instanceof DataFrame)) {
- assert false;
- continue;
- }
- df = (DataFrame) frame;
- // RFC 7540 6.1:
- // The entire DATA frame payload is included in flow control,
- // including the Pad Length and Padding fields if present
- int len = df.payloadLength();
- ByteBufferReference[] buffers = df.getData();
- for (ByteBufferReference b : buffers) {
- publisher.acceptData(Optional.of(b.get()));
- }
- connection.windowUpdater.update(len);
- if (df.getFlag(DataFrame.END_STREAM)) {
- break;
- }
- // Don't send window update on a stream which is
- // closed or half closed.
- windowUpdater.update(len);
- } while (true);
- setEndStreamReceived();
- }
- publisher.acceptData(Optional.empty());
- } catch (Throwable e) {
- Log.logTrace("receiveData: {0}", e.toString());
- e.printStackTrace();
- cf.completeExceptionally(e);
- publisher.acceptError(e);
- }
- });
+ Consumer<Throwable> onError = e -> {
+ Log.logTrace("receiveData: {0}", e.toString());
+ e.printStackTrace();
+ cf.completeExceptionally(e);
+ publisher.acceptError(e);
+ };
+ if (executor == null) {
+ inputQ.blockingReceive(this::receiveDataFrame, onError);
+ } else {
+ inputQ.asyncReceive(executor, this::receiveDataFrame, onError);
+ }
return cf;
}
@Override
- void sendBody() throws IOException, InterruptedException {
- sendBodyImpl();
+ void sendBody() throws IOException {
+ try {
+ sendBodyImpl().join();
+ } catch (CompletionException e) {
+ throw Utils.getIOException(e);
+ }
+ }
+
+ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
+ return sendBodyImpl().thenApply( v -> this);
}
@SuppressWarnings("unchecked")
@@ -268,7 +275,6 @@
};
this.requestPseudoHeaders = new HttpHeadersImpl();
// NEW
- this.inputQ = new Queue<>();
this.publisher = new BlockingPushPublisher<>();
this.windowUpdater = new StreamWindowUpdateSender(connection);
}
@@ -673,6 +679,10 @@
response_cfs.add(cf);
}
}
+ if (executor != null && !cf.isDone()) {
+ // protect from executing later chain of CompletableFuture operations from SelectorManager thread
+ cf = cf.thenApplyAsync(r -> r, executor);
+ }
Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
PushGroup<?,?> pg = exchange.getPushGroup();
if (pg != null) {
@@ -743,20 +753,12 @@
}
}
- private void waitForCompletion() throws IOException {
- try {
- requestBodyCF.join();
- } catch (CompletionException e) {
- throw Utils.getIOException(e);
- }
- }
-
- void sendBodyImpl() throws IOException, InterruptedException {
+ CompletableFuture<Void> sendBodyImpl() {
RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
subscriber.setClient(client);
requestProcessor.subscribe(subscriber);
- waitForCompletion();
- requestSent();
+ requestBodyCF.whenComplete((v,t) -> requestSent());
+ return requestBodyCF;
}
@Override
@@ -846,30 +848,24 @@
// error record it in the PushGroup. The error method is called
// with a null value when no error occurred (is a no-op)
@Override
- CompletableFuture<Void> sendBodyAsync(Executor executor) {
- return super.sendBodyAsync(executor)
- .whenComplete((Void v, Throwable t) -> {
- pushGroup.pushError(t);
- });
+ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
+ return super.sendBodyAsync()
+ .whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t));
}
@Override
- CompletableFuture<Void> sendHeadersAsync() {
+ CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
return super.sendHeadersAsync()
- .whenComplete((Void v, Throwable t) -> {
- pushGroup.pushError(t);
- });
- }
-
- @Override
- CompletableFuture<Void> sendRequestAsync(Executor executor) {
- return super.sendRequestAsync(executor)
- .whenComplete((v, t) -> pushGroup.pushError(t));
+ .whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t));
}
@Override
CompletableFuture<Response> getResponseAsync(Executor executor) {
- return pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
+ CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
+ if(executor!=null && !cf.isDone()) {
+ cf = cf.thenApplyAsync( r -> r, executor);
+ }
+ return cf;
}
@Override
@@ -887,7 +883,8 @@
HttpResponseImpl.logResponse(r);
pushCF.complete(r); // not strictly required for push API
// start reading the body using the obtained BodyProcessor
- readBodyAsync(getPushHandler(), false, getExchange().executor())
+ CompletableFuture<Void> start = new MinimalFuture<>();
+ start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
.whenComplete((T body, Throwable t) -> {
if (t != null) {
responseCF.completeExceptionally(t);
@@ -896,6 +893,7 @@
responseCF.complete(response);
}
});
+ start.completeAsync(() -> null, getExchange().executor());
}
@Override
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncDataReadQueue.java Tue Feb 21 11:08:34 2017 +0000
@@ -0,0 +1,212 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package jdk.incubator.http.internal.common;
+
+import jdk.incubator.http.internal.frame.DataFrame;
+import jdk.incubator.http.internal.frame.Http2Frame;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+/**
+ * Http2Frame Producer-Consumer queue which either allows to consume all frames in blocking way
+ * or allows to consume it asynchronously. In the latter case put operation from the producer thread
+ * executes consume operation in the given executor.
+ */
+public class AsyncDataReadQueue implements Closeable {
+
+ @FunctionalInterface
+ public interface DataConsumer {
+ /**
+ *
+ * @param t - frame
+ * @return true if consuming should be continued. false when END_STREAM was received.
+ * @throws Throwable
+ */
+ boolean accept(Http2Frame t) throws Throwable;
+ }
+
+ private static final int BLOCKING = 0;
+ private static final int FLUSHING = 1;
+ private static final int REFLUSHING = 2;
+ private static final int ASYNC = 3;
+ private static final int CLOSED = 4;
+
+
+ private final AtomicInteger state = new AtomicInteger(BLOCKING);
+ private final BlockingQueue<Http2Frame> queue = new LinkedBlockingQueue<>();
+ private Executor executor;
+ private DataConsumer onData;
+ private Consumer<Throwable> onError;
+
+ public AsyncDataReadQueue() {
+ }
+
+ public boolean tryPut(Http2Frame f) {
+ if(state.get() == CLOSED) {
+ return false;
+ } else {
+ queue.offer(f);
+ flushAsync(false);
+ return true;
+ }
+ }
+
+ public void put(Http2Frame f) throws IOException {
+ if(!tryPut(f))
+ throw new IOException("stream closed");
+ }
+
+ public void blockingReceive(DataConsumer onData, Consumer<Throwable> onError) {
+ if (state.get() == CLOSED) {
+ onError.accept(new IOException("stream closed"));
+ return;
+ }
+ assert state.get() == BLOCKING;
+ try {
+ while (onData.accept(queue.take()));
+ assert state.get() == CLOSED;
+ } catch (Throwable e) {
+ onError.accept(e);
+ }
+ }
+
+ public void asyncReceive(Executor executor, DataConsumer onData,
+ Consumer<Throwable> onError) {
+ if (state.get() == CLOSED) {
+ onError.accept(new IOException("stream closed"));
+ return;
+ }
+
+ assert state.get() == BLOCKING;
+
+ // Validates that fields not already set.
+ if (!checkCanSet("executor", this.executor, onError)
+ || !checkCanSet("onData", this.onData, onError)
+ || !checkCanSet("onError", this.onError, onError)) {
+ return;
+ }
+
+ this.executor = executor;
+ this.onData = onData;
+ this.onError = onError;
+
+ // This will report an error if asyncReceive is called twice,
+ // because we won't be in BLOCKING state if that happens
+ if (!this.state.compareAndSet(BLOCKING, ASYNC)) {
+ onError.accept(new IOException(
+ new IllegalStateException("State: "+this.state.get())));
+ return;
+ }
+
+ flushAsync(true);
+ }
+
+ private static <T> boolean checkCanSet(String name, T oldval, Consumer<Throwable> onError) {
+ if (oldval != null) {
+ onError.accept(new IOException(
+ new IllegalArgumentException(name)));
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void close() {
+ int prevState = state.getAndSet(CLOSED);
+ if(prevState == BLOCKING) {
+ // wake up blocked take()
+ queue.offer(new DataFrame(0, DataFrame.END_STREAM, new ByteBufferReference[0]));
+ }
+ }
+
+ private void flushAsync(boolean alreadyInExecutor) {
+ while(true) {
+ switch (state.get()) {
+ case BLOCKING:
+ case CLOSED:
+ case REFLUSHING:
+ return;
+ case ASYNC:
+ if(state.compareAndSet(ASYNC, FLUSHING)) {
+ if(alreadyInExecutor) {
+ flushLoop();
+ } else {
+ executor.execute(this::flushLoop);
+ }
+ return;
+ }
+ break;
+ case FLUSHING:
+ if(state.compareAndSet(FLUSHING, REFLUSHING)) {
+ return;
+ }
+ break;
+ }
+ }
+ }
+
+ private void flushLoop() {
+ try {
+ while(true) {
+ Http2Frame frame = queue.poll();
+ while (frame != null) {
+ if(!onData.accept(frame)) {
+ assert state.get() == CLOSED;
+ return; // closed
+ }
+ frame = queue.poll();
+ }
+ switch (state.get()) {
+ case BLOCKING:
+ assert false;
+ break;
+ case ASYNC:
+ throw new RuntimeException("Shouldn't happen");
+ case FLUSHING:
+ if(state.compareAndSet(FLUSHING, ASYNC)) {
+ return;
+ }
+ break;
+ case REFLUSHING:
+ // We need to check if new elements were put after last
+ // poll() and do graceful exit
+ state.compareAndSet(REFLUSHING, FLUSHING);
+ break;
+ case CLOSED:
+ return;
+ }
+ }
+ } catch (Throwable e) {
+ onError.accept(e);
+ close();
+ }
+ }
+}
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/MinimalFuture.java Tue Feb 21 11:08:34 2017 +0000
@@ -26,6 +26,7 @@
package jdk.incubator.http.internal.common;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
@@ -40,6 +41,11 @@
*/
public final class MinimalFuture<T> extends CompletableFuture<T> {
+ @FunctionalInterface
+ public interface ExceptionalSupplier<U> {
+ U get() throws Throwable;
+ }
+
final static AtomicLong TOKENS = new AtomicLong();
final long id;
@@ -56,6 +62,29 @@
return f;
}
+ public static <U> CompletableFuture<U> supply(ExceptionalSupplier<U> supplier) {
+ CompletableFuture<U> cf = new MinimalFuture<>();
+ try {
+ U value = supplier.get();
+ cf.complete(value);
+ } catch (Throwable t) {
+ cf.completeExceptionally(t);
+ }
+ return cf;
+ }
+
+ public static <U> CompletableFuture<U> supply(ExceptionalSupplier<U> supplier, Executor executor) {
+ CompletableFuture<U> cf = new MinimalFuture<>();
+ cf.completeAsync( () -> {
+ try {
+ return supplier.get();
+ } catch (Throwable ex) {
+ throw new CompletionException(ex);
+ }
+ }, executor);
+ return cf;
+ }
+
public MinimalFuture() {
super();
this.id = TOKENS.incrementAndGet();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/http2/FixedThreadPoolTest.java Tue Feb 21 11:08:34 2017 +0000
@@ -0,0 +1,241 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @bug 8087112
+ * @library /lib/testlibrary server
+ * @build jdk.testlibrary.SimpleSSLContext
+ * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.common
+ * jdk.incubator.httpclient/jdk.incubator.http.internal.frame
+ * jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
+ * @run testng/othervm -Djdk.httpclient.HttpClient.log=ssl,requests,responses,errors FixedThreadPoolTest
+ */
+
+import java.net.*;
+import jdk.incubator.http.*;
+import static jdk.incubator.http.HttpClient.Version.HTTP_2;
+import javax.net.ssl.*;
+import java.nio.file.*;
+import java.util.concurrent.*;
+import jdk.testlibrary.SimpleSSLContext;
+import static jdk.incubator.http.HttpRequest.BodyProcessor.fromFile;
+import static jdk.incubator.http.HttpRequest.BodyProcessor.fromString;
+import static jdk.incubator.http.HttpResponse.BodyHandler.asFile;
+import static jdk.incubator.http.HttpResponse.BodyHandler.asString;
+
+import org.testng.annotations.Test;
+
+@Test
+public class FixedThreadPoolTest {
+ static int httpPort, httpsPort;
+ static Http2TestServer httpServer, httpsServer;
+ static HttpClient client = null;
+ static ExecutorService exec;
+ static SSLContext sslContext;
+
+ static String httpURIString, httpsURIString;
+
+ static void initialize() throws Exception {
+ try {
+ SimpleSSLContext sslct = new SimpleSSLContext();
+ sslContext = sslct.get();
+ client = getClient();
+ httpServer = new Http2TestServer(false, 0, exec, sslContext);
+ httpServer.addHandler(new EchoHandler(), "/");
+ httpPort = httpServer.getAddress().getPort();
+
+ httpsServer = new Http2TestServer(true, 0, exec, sslContext);
+ httpsServer.addHandler(new EchoHandler(), "/");
+
+ httpsPort = httpsServer.getAddress().getPort();
+ httpURIString = "http://127.0.0.1:" + httpPort + "/foo/";
+ httpsURIString = "https://127.0.0.1:" + httpsPort + "/bar/";
+
+ httpServer.start();
+ httpsServer.start();
+ } catch (Throwable e) {
+ System.err.println("Throwing now");
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @Test(timeOut=3000000)
+ public static void test() throws Exception {
+ try {
+ initialize();
+ simpleTest(false);
+ simpleTest(true);
+ streamTest(false);
+ streamTest(true);
+ paramsTest();
+ Thread.sleep(1000 * 4);
+ } catch (Exception | Error tt) {
+ tt.printStackTrace();
+ throw tt;
+ } finally {
+ httpServer.stop();
+ httpsServer.stop();
+ exec.shutdownNow();
+ }
+ }
+
+ static HttpClient getClient() {
+ if (client == null) {
+ exec = Executors.newCachedThreadPool();
+ client = HttpClient.newBuilder()
+ .executor(Executors.newFixedThreadPool(2))
+ .sslContext(sslContext)
+ .version(HTTP_2)
+ .build();
+ }
+ return client;
+ }
+
+ static URI getURI(boolean secure) {
+ if (secure)
+ return URI.create(httpsURIString);
+ else
+ return URI.create(httpURIString);
+ }
+
+ static void checkStatus(int expected, int found) throws Exception {
+ if (expected != found) {
+ System.err.printf ("Test failed: wrong status code %d/%d\n",
+ expected, found);
+ throw new RuntimeException("Test failed");
+ }
+ }
+
+ static void checkStrings(String expected, String found) throws Exception {
+ if (!expected.equals(found)) {
+ System.err.printf ("Test failed: wrong string %s/%s\n",
+ expected, found);
+ throw new RuntimeException("Test failed");
+ }
+ }
+
+ static Void compareFiles(Path path1, Path path2) {
+ return TestUtil.compareFiles(path1, path2);
+ }
+
+ static Path tempFile() {
+ return TestUtil.tempFile();
+ }
+
+ static final String SIMPLE_STRING = "Hello world Goodbye world";
+
+ static final int LOOPS = 32;
+ static final int FILESIZE = 64 * 1024 + 200;
+
+ static void streamTest(boolean secure) throws Exception {
+ URI uri = getURI(secure);
+ System.err.printf("streamTest %b to %s\n" , secure, uri);
+
+ HttpClient client = getClient();
+ Path src = TestUtil.getAFile(FILESIZE * 4);
+ HttpRequest req = HttpRequest.newBuilder(uri)
+ .POST(fromFile(src))
+ .build();
+
+ Path dest = Paths.get("streamtest.txt");
+ dest.toFile().delete();
+ CompletableFuture<Path> response = client.sendAsync(req, asFile(dest))
+ .thenApply(resp -> {
+ if (resp.statusCode() != 200)
+ throw new RuntimeException();
+ return resp.body();
+ });
+ response.join();
+ compareFiles(src, dest);
+ System.err.println("DONE");
+ }
+
+ static void paramsTest() throws Exception {
+ System.err.println("paramsTest");
+ Http2TestServer server = new Http2TestServer(true, 0, exec, sslContext);
+ server.addHandler((t -> {
+ SSLSession s = t.getSSLSession();
+ String prot = s.getProtocol();
+ if (prot.equals("TLSv1.2")) {
+ t.sendResponseHeaders(200, -1);
+ } else {
+ System.err.printf("Protocols =%s\n", prot);
+ t.sendResponseHeaders(500, -1);
+ }
+ }), "/");
+ server.start();
+ int port = server.getAddress().getPort();
+ URI u = new URI("https://127.0.0.1:"+port+"/foo");
+ HttpClient client = getClient();
+ HttpRequest req = HttpRequest.newBuilder(u).build();
+ HttpResponse<String> resp = client.sendAsync(req, asString()).get();
+ int stat = resp.statusCode();
+ if (stat != 200) {
+ throw new RuntimeException("paramsTest failed "
+ + Integer.toString(stat));
+ }
+ }
+
+ static void simpleTest(boolean secure) throws Exception {
+ URI uri = getURI(secure);
+ System.err.println("Request to " + uri);
+
+ // Do a simple warmup request
+
+ HttpClient client = getClient();
+ HttpRequest req = HttpRequest.newBuilder(uri)
+ .POST(fromString(SIMPLE_STRING))
+ .build();
+ HttpResponse<String> response = client.sendAsync(req, asString()).get();
+ HttpHeaders h = response.headers();
+
+ checkStatus(200, response.statusCode());
+
+ String responseBody = response.body();
+ checkStrings(SIMPLE_STRING, responseBody);
+
+ checkStrings(h.firstValue("x-hello").get(), "world");
+ checkStrings(h.firstValue("x-bye").get(), "universe");
+
+ // Do loops asynchronously
+
+ CompletableFuture[] responses = new CompletableFuture[LOOPS];
+ final Path source = TestUtil.getAFile(FILESIZE);
+ HttpRequest request = HttpRequest.newBuilder(uri)
+ .POST(fromFile(source))
+ .build();
+ for (int i = 0; i < LOOPS; i++) {
+ responses[i] = client.sendAsync(request, asFile(tempFile()))
+ //.thenApply(resp -> compareFiles(resp.body(), source));
+ .thenApply(resp -> {
+ System.out.printf("Resp status %d body size %d\n",
+ resp.statusCode(), resp.body().toFile().length());
+ return compareFiles(resp.body(), source);
+ });
+ }
+ CompletableFuture.allOf(responses).join();
+ System.err.println("DONE");
+ }
+}