--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Tue Feb 21 11:08:34 2017 +0000
@@ -39,6 +39,8 @@
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
import jdk.incubator.http.internal.common.*;
import jdk.incubator.http.internal.frame.*;
import jdk.incubator.http.internal.hpack.DecodingCallback;
@@ -96,7 +98,7 @@
*/
class Stream<T> extends ExchangeImpl<T> {
- final Queue<Http2Frame> inputQ;
+ final AsyncDataReadQueue inputQ = new AsyncDataReadQueue();
/**
* This stream's identifier. Assigned lazily by the HTTP2Connection before
@@ -169,7 +171,7 @@
{
CompletableFuture<T> cf = readBodyAsync(handler,
returnToCache,
- this::executeInline);
+ null);
try {
return cf.join();
} catch (CompletionException e) {
@@ -177,10 +179,6 @@
}
}
- void executeInline(Runnable r) {
- r.run();
- }
-
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -189,60 +187,69 @@
return sb.toString();
}
+ private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
+ if (frame instanceof ResetFrame) {
+ handleReset((ResetFrame) frame);
+ return true;
+ } else if (!(frame instanceof DataFrame)) {
+ assert false;
+ return true;
+ }
+ DataFrame df = (DataFrame) frame;
+ // RFC 7540 6.1:
+ // The entire DATA frame payload is included in flow control,
+ // including the Pad Length and Padding fields if present
+ int len = df.payloadLength();
+ ByteBufferReference[] buffers = df.getData();
+ for (ByteBufferReference b : buffers) {
+ ByteBuffer buf = b.get();
+ if (buf.hasRemaining()) {
+ publisher.acceptData(Optional.of(buf));
+ }
+ }
+ connection.windowUpdater.update(len);
+ if (df.getFlag(DataFrame.END_STREAM)) {
+ setEndStreamReceived();
+ publisher.acceptData(Optional.empty());
+ return false;
+ }
+ // Don't send window update on a stream which is
+ // closed or half closed.
+ windowUpdater.update(len);
+ return true;
+ }
+
// pushes entire response body into response processor
// blocking when required by local or remote flow control
CompletableFuture<T> receiveData(Executor executor) {
CompletableFuture<T> cf = responseProcessor
.getBody()
.toCompletableFuture();
-
- executor.execute(() -> {
- Http2Frame frame;
- DataFrame df = null;
- try {
- if (!endStreamReceived()) {
- do {
- frame = inputQ.take();
- if (frame instanceof ResetFrame) {
- handleReset((ResetFrame)frame);
- continue;
- } else if (!(frame instanceof DataFrame)) {
- assert false;
- continue;
- }
- df = (DataFrame) frame;
- // RFC 7540 6.1:
- // The entire DATA frame payload is included in flow control,
- // including the Pad Length and Padding fields if present
- int len = df.payloadLength();
- ByteBufferReference[] buffers = df.getData();
- for (ByteBufferReference b : buffers) {
- publisher.acceptData(Optional.of(b.get()));
- }
- connection.windowUpdater.update(len);
- if (df.getFlag(DataFrame.END_STREAM)) {
- break;
- }
- // Don't send window update on a stream which is
- // closed or half closed.
- windowUpdater.update(len);
- } while (true);
- setEndStreamReceived();
- }
- publisher.acceptData(Optional.empty());
- } catch (Throwable e) {
- Log.logTrace("receiveData: {0}", e.toString());
- e.printStackTrace();
- cf.completeExceptionally(e);
- publisher.acceptError(e);
- }
- });
+ Consumer<Throwable> onError = e -> {
+ Log.logTrace("receiveData: {0}", e.toString());
+ e.printStackTrace();
+ cf.completeExceptionally(e);
+ publisher.acceptError(e);
+ };
+ if (executor == null) {
+ inputQ.blockingReceive(this::receiveDataFrame, onError);
+ } else {
+ inputQ.asyncReceive(executor, this::receiveDataFrame, onError);
+ }
return cf;
}
@Override
- void sendBody() throws IOException, InterruptedException {
- sendBodyImpl();
+ void sendBody() throws IOException {
+ try {
+ sendBodyImpl().join();
+ } catch (CompletionException e) {
+ throw Utils.getIOException(e);
+ }
+ }
+
+ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
+ return sendBodyImpl().thenApply( v -> this);
}
@SuppressWarnings("unchecked")
@@ -268,7 +275,6 @@
};
this.requestPseudoHeaders = new HttpHeadersImpl();
// NEW
- this.inputQ = new Queue<>();
this.publisher = new BlockingPushPublisher<>();
this.windowUpdater = new StreamWindowUpdateSender(connection);
}
@@ -673,6 +679,10 @@
response_cfs.add(cf);
}
}
+ if (executor != null && !cf.isDone()) {
+ // protect from executing later chain of CompletableFuture operations from SelectorManager thread
+ cf = cf.thenApplyAsync(r -> r, executor);
+ }
Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
PushGroup<?,?> pg = exchange.getPushGroup();
if (pg != null) {
@@ -743,20 +753,12 @@
}
}
- private void waitForCompletion() throws IOException {
- try {
- requestBodyCF.join();
- } catch (CompletionException e) {
- throw Utils.getIOException(e);
- }
- }
-
- void sendBodyImpl() throws IOException, InterruptedException {
+ CompletableFuture<Void> sendBodyImpl() {
RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
subscriber.setClient(client);
requestProcessor.subscribe(subscriber);
- waitForCompletion();
- requestSent();
+ requestBodyCF.whenComplete((v,t) -> requestSent());
+ return requestBodyCF;
}
@Override
@@ -846,30 +848,24 @@
// error record it in the PushGroup. The error method is called
// with a null value when no error occurred (is a no-op)
@Override
- CompletableFuture<Void> sendBodyAsync(Executor executor) {
- return super.sendBodyAsync(executor)
- .whenComplete((Void v, Throwable t) -> {
- pushGroup.pushError(t);
- });
+ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
+ return super.sendBodyAsync()
+ .whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t));
}
@Override
- CompletableFuture<Void> sendHeadersAsync() {
+ CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
return super.sendHeadersAsync()
- .whenComplete((Void v, Throwable t) -> {
- pushGroup.pushError(t);
- });
- }
-
- @Override
- CompletableFuture<Void> sendRequestAsync(Executor executor) {
- return super.sendRequestAsync(executor)
- .whenComplete((v, t) -> pushGroup.pushError(t));
+ .whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t));
}
@Override
CompletableFuture<Response> getResponseAsync(Executor executor) {
- return pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
+ CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
+ if(executor!=null && !cf.isDone()) {
+ cf = cf.thenApplyAsync( r -> r, executor);
+ }
+ return cf;
}
@Override
@@ -887,7 +883,8 @@
HttpResponseImpl.logResponse(r);
pushCF.complete(r); // not strictly required for push API
// start reading the body using the obtained BodyProcessor
- readBodyAsync(getPushHandler(), false, getExchange().executor())
+ CompletableFuture<Void> start = new MinimalFuture<>();
+ start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
.whenComplete((T body, Throwable t) -> {
if (t != null) {
responseCF.completeExceptionally(t);
@@ -896,6 +893,7 @@
responseCF.complete(response);
}
});
+ start.completeAsync(() -> null, getExchange().executor());
}
@Override