jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
changeset 43984 f33383dcb1fb
parent 42460 7133f144981a
child 43999 4cc44dd9f14f
--- a/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Mon Feb 20 15:32:37 2017 +0800
+++ b/jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Tue Feb 21 11:08:34 2017 +0000
@@ -39,6 +39,8 @@
 import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
 import jdk.incubator.http.internal.common.*;
 import jdk.incubator.http.internal.frame.*;
 import jdk.incubator.http.internal.hpack.DecodingCallback;
@@ -96,7 +98,7 @@
  */
 class Stream<T> extends ExchangeImpl<T> {
 
-    final Queue<Http2Frame> inputQ;
+    final AsyncDataReadQueue inputQ = new AsyncDataReadQueue();
 
     /**
      * This stream's identifier. Assigned lazily by the HTTP2Connection before
@@ -169,7 +171,7 @@
     {
         CompletableFuture<T> cf = readBodyAsync(handler,
                                                 returnToCache,
-                                                this::executeInline);
+                                                null);
         try {
             return cf.join();
         } catch (CompletionException e) {
@@ -177,10 +179,6 @@
         }
     }
 
-    void executeInline(Runnable r) {
-        r.run();
-    }
-
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
@@ -189,60 +187,69 @@
         return sb.toString();
     }
 
+    private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException {
+        if (frame instanceof ResetFrame) {
+            handleReset((ResetFrame) frame);
+            return true;
+        } else if (!(frame instanceof DataFrame)) {
+            assert false;
+            return true;
+        }
+        DataFrame df = (DataFrame) frame;
+        // RFC 7540 6.1:
+        // The entire DATA frame payload is included in flow control,
+        // including the Pad Length and Padding fields if present
+        int len = df.payloadLength();
+        ByteBufferReference[] buffers = df.getData();
+        for (ByteBufferReference b : buffers) {
+            ByteBuffer buf = b.get();
+            if (buf.hasRemaining()) {
+                publisher.acceptData(Optional.of(buf));
+            }
+        }
+        connection.windowUpdater.update(len);
+        if (df.getFlag(DataFrame.END_STREAM)) {
+            setEndStreamReceived();
+            publisher.acceptData(Optional.empty());
+            return false;
+        }
+        // Don't send window update on a stream which is
+        // closed or half closed.
+        windowUpdater.update(len);
+        return true;
+    }
+
     // pushes entire response body into response processor
     // blocking when required by local or remote flow control
     CompletableFuture<T> receiveData(Executor executor) {
         CompletableFuture<T> cf = responseProcessor
                 .getBody()
                 .toCompletableFuture();
-
-        executor.execute(() -> {
-            Http2Frame frame;
-            DataFrame df = null;
-            try {
-                if (!endStreamReceived()) {
-                    do {
-                        frame = inputQ.take();
-                        if (frame instanceof ResetFrame) {
-                            handleReset((ResetFrame)frame);
-                            continue;
-                        } else if (!(frame instanceof DataFrame)) {
-                            assert false;
-                            continue;
-                        }
-                        df = (DataFrame) frame;
-                        // RFC 7540 6.1:
-                        // The entire DATA frame payload is included in flow control,
-                        // including the Pad Length and Padding fields if present
-                        int len = df.payloadLength();
-                        ByteBufferReference[] buffers = df.getData();
-                        for (ByteBufferReference b : buffers) {
-                            publisher.acceptData(Optional.of(b.get()));
-                        }
-                        connection.windowUpdater.update(len);
-                        if (df.getFlag(DataFrame.END_STREAM)) {
-                            break;
-                        }
-                        // Don't send window update on a stream which is
-                        // closed or half closed.
-                        windowUpdater.update(len);
-                    } while (true);
-                    setEndStreamReceived();
-                }
-                publisher.acceptData(Optional.empty());
-            } catch (Throwable e) {
-                Log.logTrace("receiveData: {0}", e.toString());
-                e.printStackTrace();
-                cf.completeExceptionally(e);
-                publisher.acceptError(e);
-            }
-        });
+        Consumer<Throwable> onError = e -> {
+            Log.logTrace("receiveData: {0}", e.toString());
+            e.printStackTrace();
+            cf.completeExceptionally(e);
+            publisher.acceptError(e);
+        };
+        if (executor == null) {
+            inputQ.blockingReceive(this::receiveDataFrame, onError);
+        } else {
+            inputQ.asyncReceive(executor, this::receiveDataFrame, onError);
+        }
         return cf;
     }
 
     @Override
-    void sendBody() throws IOException, InterruptedException {
-        sendBodyImpl();
+    void sendBody() throws IOException {
+        try {
+            sendBodyImpl().join();
+        } catch (CompletionException e) {
+            throw Utils.getIOException(e);
+        }
+    }
+
+    CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
+        return sendBodyImpl().thenApply( v -> this);
     }
 
     @SuppressWarnings("unchecked")
@@ -268,7 +275,6 @@
         };
         this.requestPseudoHeaders = new HttpHeadersImpl();
         // NEW
-        this.inputQ = new Queue<>();
         this.publisher = new BlockingPushPublisher<>();
         this.windowUpdater = new StreamWindowUpdateSender(connection);
     }
@@ -673,6 +679,10 @@
                 response_cfs.add(cf);
             }
         }
+        if (executor != null && !cf.isDone()) {
+            // protect from executing later chain of CompletableFuture operations from SelectorManager thread
+            cf = cf.thenApplyAsync(r -> r, executor);
+        }
         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
         PushGroup<?,?> pg = exchange.getPushGroup();
         if (pg != null) {
@@ -743,20 +753,12 @@
         }
     }
 
-    private void waitForCompletion() throws IOException {
-        try {
-            requestBodyCF.join();
-        } catch (CompletionException e) {
-            throw Utils.getIOException(e);
-        }
-    }
-
-    void sendBodyImpl() throws IOException, InterruptedException {
+    CompletableFuture<Void> sendBodyImpl() {
         RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
         subscriber.setClient(client);
         requestProcessor.subscribe(subscriber);
-        waitForCompletion();
-        requestSent();
+        requestBodyCF.whenComplete((v,t) -> requestSent());
+        return requestBodyCF;
     }
 
     @Override
@@ -846,30 +848,24 @@
         // error record it in the PushGroup. The error method is called
         // with a null value when no error occurred (is a no-op)
         @Override
-        CompletableFuture<Void> sendBodyAsync(Executor executor) {
-            return super.sendBodyAsync(executor)
-                        .whenComplete((Void v, Throwable t) -> {
-                            pushGroup.pushError(t);
-                        });
+        CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
+            return super.sendBodyAsync()
+                        .whenComplete((ExchangeImpl<T> v, Throwable t) -> pushGroup.pushError(t));
         }
 
         @Override
-        CompletableFuture<Void> sendHeadersAsync() {
+        CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
             return super.sendHeadersAsync()
-                        .whenComplete((Void v, Throwable t) -> {
-                            pushGroup.pushError(t);
-                         });
-        }
-
-        @Override
-        CompletableFuture<Void> sendRequestAsync(Executor executor) {
-            return super.sendRequestAsync(executor)
-                        .whenComplete((v, t) -> pushGroup.pushError(t));
+                        .whenComplete((ExchangeImpl<T> ex, Throwable t) -> pushGroup.pushError(t));
         }
 
         @Override
         CompletableFuture<Response> getResponseAsync(Executor executor) {
-            return pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
+            CompletableFuture<Response> cf = pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
+            if(executor!=null && !cf.isDone()) {
+                cf  = cf.thenApplyAsync( r -> r, executor);
+            }
+            return cf;
         }
 
         @Override
@@ -887,7 +883,8 @@
             HttpResponseImpl.logResponse(r);
             pushCF.complete(r); // not strictly required for push API
             // start reading the body using the obtained BodyProcessor
-            readBodyAsync(getPushHandler(), false, getExchange().executor())
+            CompletableFuture<Void> start = new MinimalFuture<>();
+            start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
                 .whenComplete((T body, Throwable t) -> {
                     if (t != null) {
                         responseCF.completeExceptionally(t);
@@ -896,6 +893,7 @@
                         responseCF.complete(response);
                     }
                 });
+            start.completeAsync(() -> null, getExchange().executor());
         }
 
         @Override