http-client-branch: review comment - invalid Subscription::request arguments should be relayed to Subscriber::onError (part II: HTTP/2) http-client-branch
authordfuchs
Fri, 06 Apr 2018 22:11:44 +0100
branchhttp-client-branch
changeset 56399 a0929d5dd63f
parent 56396 8e8423347c1e
child 56400 9adc63b7f4d8
http-client-branch: review comment - invalid Subscription::request arguments should be relayed to Subscriber::onError (part II: HTTP/2)
src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java
src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java
src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriptionBase.java
test/jdk/java/net/httpclient/InvalidInputStreamSubscriptionRequest.java
test/jdk/java/net/httpclient/ThrowingPushPromises.java
test/jdk/java/net/httpclient/ThrowingSubscribers.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java	Fri Apr 06 18:15:35 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java	Fri Apr 06 22:11:44 2018 +0100
@@ -332,7 +332,7 @@
         if (pending != null && pendingDelegateRef.compareAndSet(pending, null)) {
             Http1AsyncDelegate delegate = this.delegate;
             if (delegate != null) unsubscribe(delegate);
-            Consumer<Throwable> onIllegalArg = (x) -> {
+            Consumer<Throwable> onSubscriptionError = (x) -> {
                 setRetryOnError(false);
                 stopRequested = true;
                 onReadError(x);
@@ -356,7 +356,7 @@
             // the header/body parser work with a flow of ByteBuffer, whereas
             // we have a flow List<ByteBuffer> upstream.
             Http1AsyncDelegateSubscription subscription =
-                    new Http1AsyncDelegateSubscription(scheduler, cancel, onIllegalArg);
+                    new Http1AsyncDelegateSubscription(scheduler, cancel, onSubscriptionError);
             pending.onSubscribe(subscription);
             this.delegate = delegate = pending;
             final Object captured = delegate;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Fri Apr 06 18:15:35 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Fri Apr 06 22:11:44 2018 +0100
@@ -27,7 +27,6 @@
 
 import java.io.EOFException;
 import java.lang.System.Logger.Level;
-import java.net.http.HttpClient;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -36,7 +35,6 @@
 import java.util.concurrent.Flow;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.net.http.HttpHeaders;
@@ -45,7 +43,6 @@
 import jdk.internal.net.http.common.Log;
 import jdk.internal.net.http.common.MinimalFuture;
 import jdk.internal.net.http.common.Utils;
-import java.lang.ref.Reference;
 
 import static java.net.http.HttpClient.Version.HTTP_1_1;
 import static java.net.http.HttpResponse.BodySubscribers.discarding;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Fri Apr 06 18:15:35 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Fri Apr 06 22:11:44 2018 +0100
@@ -101,7 +101,8 @@
     final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
     final SequentialScheduler sched =
             SequentialScheduler.synchronizedScheduler(this::schedule);
-    final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel);
+    final SubscriptionBase userSubscription =
+            new SubscriptionBase(sched, this::cancel, this::onSubscriptionError);
 
     /**
      * This stream's identifier. Assigned lazily by the HTTP2Connection before
@@ -125,6 +126,8 @@
     private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
     volatile CompletableFuture<T> responseBodyCF;
+    volatile HttpResponse.BodySubscriber<T> pendingResponseSubscriber;
+    volatile boolean stopRequested;
 
     /** True if END_STREAM has been seen in a frame received on this stream. */
     private volatile boolean remotelyClosed;
@@ -152,12 +155,19 @@
      * of after user subscription window has re-opened, from SubscriptionBase.request()
      */
     private void schedule() {
-        if (responseSubscriber == null)
-            // can't process anything yet
-            return;
-
         boolean onCompleteCalled = false;
+        HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
         try {
+            if (subscriber == null) {
+                subscriber = responseSubscriber = pendingResponseSubscriber;
+                if (subscriber == null) {
+                    // can't process anything yet
+                    return;
+                } else {
+                    debug.log(Level.DEBUG, "subscribing user subscriber");
+                    subscriber.onSubscribe(userSubscription);
+                }
+            }
             while (!inputQ.isEmpty()) {
                 Http2Frame frame = inputQ.peek();
                 if (frame instanceof ResetFrame) {
@@ -176,7 +186,7 @@
                     Log.logTrace("responseSubscriber.onComplete");
                     debug.log(Level.DEBUG, "incoming: onComplete");
                     sched.stop();
-                    responseSubscriber.onComplete();
+                    subscriber.onComplete();
                     onCompleteCalled = true;
                     setEndStreamReceived();
                     return;
@@ -184,17 +194,18 @@
                     inputQ.remove();
                     Log.logTrace("responseSubscriber.onNext {0}", size);
                     debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
-                    responseSubscriber.onNext(dsts);
+                    subscriber.onNext(dsts);
                     if (consumed(df)) {
                         Log.logTrace("responseSubscriber.onComplete");
                         debug.log(Level.DEBUG, "incoming: onComplete");
                         sched.stop();
-                        responseSubscriber.onComplete();
+                        subscriber.onComplete();
                         onCompleteCalled = true;
                         setEndStreamReceived();
                         return;
                     }
                 } else {
+                    if (stopRequested) break;
                     return;
                 }
             }
@@ -207,7 +218,10 @@
             sched.stop();
             try {
                 if (!onCompleteCalled) {
-                    responseSubscriber.onError(t);
+                    debug.log(Level.DEBUG, "calling subscriber.onError: %s", (Object)t);
+                    subscriber.onError(t);
+                } else {
+                    debug.log(Level.DEBUG, "already completed: dropping error %s", (Object)t);
                 }
             } catch (Throwable x) {
                 Log.logError("Subscriber::onError threw exception: {0}", (Object)t);
@@ -301,10 +315,7 @@
             Throwable t = getCancelCause();
             responseBodyCF.completeExceptionally(t);
         } else {
-            bodySubscriber.onSubscribe(userSubscription);
-            // Set the responseSubscriber field now that onSubscribe has been called.
-            // This effectively allows the scheduler to start invoking the callbacks.
-            responseSubscriber = bodySubscriber;
+            pendingResponseSubscriber = bodySubscriber;
             sched.runOrSchedule(); // in case data waiting already to be processed
         }
         return responseBodyCF;
@@ -988,6 +999,19 @@
         cancel(new IOException("Stream " + streamid + " cancelled"));
     }
 
+    void onSubscriptionError(Throwable t) {
+        errorRef.compareAndSet(null, t);
+        debug.log(Level.DEBUG, "Got subscription error: %s", (Object)t);
+        // This is the special case where the subscriber
+        // has requested an illegal number of items.
+        // In this case, the error doesn't come from
+        // upstream, but from downstream, and we need to
+        // handle the error without waiting for the inputQ
+        // to be exhausted.
+        stopRequested = true;
+        sched.runOrSchedule();
+    }
+
     @Override
     void cancel(IOException cause) {
         cancelImpl(cause);
@@ -1009,7 +1033,7 @@
             }
         }
         if (closing) { // true if the stream has not been closed yet
-            if (responseSubscriber != null)
+            if (responseSubscriber != null || pendingResponseSubscriber != null)
                 sched.runOrSchedule();
         }
         completeResponseExceptionally(e);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriptionBase.java	Fri Apr 06 18:15:35 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriptionBase.java	Fri Apr 06 22:11:44 2018 +0100
@@ -27,6 +27,7 @@
 
 import java.util.concurrent.Flow;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 
 /**
  * Maintains subscription counter and provides primitives for:
@@ -41,17 +42,33 @@
     final SequentialScheduler scheduler; // when window was zero and is opened, run this
     final Runnable cancelAction; // when subscription cancelled, run this
     final AtomicBoolean cancelled;
+    final Consumer<Throwable> onError;
 
     public SubscriptionBase(SequentialScheduler scheduler, Runnable cancelAction) {
+        this(scheduler, cancelAction, null);
+    }
+
+    public SubscriptionBase(SequentialScheduler scheduler,
+                            Runnable cancelAction,
+                            Consumer<Throwable> onError) {
         this.scheduler = scheduler;
         this.cancelAction = cancelAction;
         this.cancelled = new AtomicBoolean(false);
+        this.onError = onError;
     }
 
     @Override
     public void request(long n) {
-        if (demand.increase(n))
-            scheduler.runOrSchedule();
+        try {
+            if (demand.increase(n))
+                scheduler.runOrSchedule();
+        } catch(Throwable t) {
+            if (onError != null) {
+                if (cancelled.getAndSet(true))
+                    return;
+                onError.accept(t);
+            } else throw t;
+        }
     }
 
     @Override
--- a/test/jdk/java/net/httpclient/InvalidInputStreamSubscriptionRequest.java	Fri Apr 06 18:15:35 2018 +0100
+++ b/test/jdk/java/net/httpclient/InvalidInputStreamSubscriptionRequest.java	Fri Apr 06 22:11:44 2018 +0100
@@ -184,19 +184,19 @@
                 { httpURI_chunk,    false, OF_INPUTSTREAM },
                 { httpsURI_fixed,   false, OF_INPUTSTREAM },
                 { httpsURI_chunk,   false, OF_INPUTSTREAM },
-//                { http2URI_fixed,   false, OF_INPUTSTREAM },
-//                { http2URI_chunk,   false, OF_INPUTSTREAM },
-//                { https2URI_fixed,  false, OF_INPUTSTREAM },
-//                { https2URI_chunk,  false, OF_INPUTSTREAM },
+                { http2URI_fixed,   false, OF_INPUTSTREAM },
+                { http2URI_chunk,   false, OF_INPUTSTREAM },
+                { https2URI_fixed,  false, OF_INPUTSTREAM },
+                { https2URI_chunk,  false, OF_INPUTSTREAM },
 
                 { httpURI_fixed,    true, OF_INPUTSTREAM },
                 { httpURI_chunk,    true, OF_INPUTSTREAM },
                 { httpsURI_fixed,   true, OF_INPUTSTREAM },
                 { httpsURI_chunk,   true, OF_INPUTSTREAM },
-//                { http2URI_fixed,   true, OF_INPUTSTREAM },
-//                { http2URI_chunk,   true, OF_INPUTSTREAM },
-//                { https2URI_fixed,  true, OF_INPUTSTREAM },
-//                { https2URI_chunk,  true, OF_INPUTSTREAM },
+                { http2URI_fixed,   true, OF_INPUTSTREAM },
+                { http2URI_chunk,   true, OF_INPUTSTREAM },
+                { https2URI_fixed,  true, OF_INPUTSTREAM },
+                { https2URI_chunk,  true, OF_INPUTSTREAM },
         };
     }
 
@@ -226,7 +226,8 @@
                 try (InputStream is = response.body()) {
                     String body = new String(is.readAllBytes(), UTF_8);
                     assertEquals(body, "");
-                    if (uri.endsWith("/chunk")) {
+                    if (uri.endsWith("/chunk")
+                            && response.version() == HttpClient.Version.HTTP_1_1) {
                         // with /fixed and 0 length
                         // there's no need for any call to request()
                         throw new RuntimeException("Expected IAE not thrown");
@@ -261,8 +262,9 @@
             BodyHandler<InputStream> handler = handlers.get();
             BodyHandler<InputStream> badHandler = (c,h) ->
                     new BadBodySubscriber<>(handler.apply(c,h));
-            CompletableFuture<String> result =
-                    client.sendAsync(req, badHandler).thenCompose(
+            CompletableFuture<HttpResponse<InputStream>> response =
+                    client.sendAsync(req, badHandler);
+            CompletableFuture<String> result = response.thenCompose(
                             (responsePublisher) -> {
                                 try (InputStream is = responsePublisher.body()) {
                                     return CompletableFuture.completedFuture(
@@ -274,7 +276,8 @@
             try {
                 // Get the final result and compare it with the expected body
                 assertEquals(result.get(), "");
-                if (uri.endsWith("/chunk")) {
+                if (uri.endsWith("/chunk")
+                        && response.get().version() == HttpClient.Version.HTTP_1_1) {
                     // with /fixed and 0 length
                     // there's no need for any call to request()
                     throw new RuntimeException("Expected IAE not thrown");
--- a/test/jdk/java/net/httpclient/ThrowingPushPromises.java	Fri Apr 06 18:15:35 2018 +0100
+++ b/test/jdk/java/net/httpclient/ThrowingPushPromises.java	Fri Apr 06 22:11:44 2018 +0100
@@ -372,7 +372,11 @@
     }
 
     final <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {
-        throw new RuntimeException("Expected exception not thrown in " + w);
+        String msg = "Expected exception not thrown in " + w
+                + "\n\tReceived: " + resp
+                + "\n\tWith body: " + resp.body();
+        System.out.println(msg);
+        throw new RuntimeException(msg);
     }
 
     final List<String> checkAsString(Where w, URI reqURI,
@@ -442,7 +446,6 @@
                 case BEFORE_ACCEPTING:
                 case AFTER_ACCEPTING:
                 case BODY_HANDLER:
-                case ON_SUBSCRIBE:
                 case GET_BODY:
                 case BODY_CF:
                     return shouldHaveThrown(w, presp, thrower);
--- a/test/jdk/java/net/httpclient/ThrowingSubscribers.java	Fri Apr 06 18:15:35 2018 +0100
+++ b/test/jdk/java/net/httpclient/ThrowingSubscribers.java	Fri Apr 06 22:11:44 2018 +0100
@@ -453,7 +453,6 @@
     final List<String> checkAsLines(Where w, HttpResponse<Stream<String>> resp, Thrower thrower) {
         switch(w) {
             case BODY_HANDLER: return shouldHaveThrown(w, resp, thrower);
-            case ON_SUBSCRIBE: return shouldHaveThrown(w, resp, thrower);
             case GET_BODY: return shouldHaveThrown(w, resp, thrower);
             case BODY_CF: return shouldHaveThrown(w, resp, thrower);
             default: break;
@@ -469,7 +468,7 @@
             }
             throw causeNotFound(w, x);
         }
-        throw new RuntimeException("Expected exception not thrown in " + w);
+        return shouldHaveThrown(w, resp, thrower);
     }
 
     final List<String> checkAsInputStream(Where w, HttpResponse<InputStream> resp,
@@ -478,7 +477,6 @@
     {
         switch(w) {
             case BODY_HANDLER: return shouldHaveThrown(w, resp, thrower);
-            case ON_SUBSCRIBE: return shouldHaveThrown(w, resp, thrower);
             case GET_BODY: return shouldHaveThrown(w, resp, thrower);
             case BODY_CF: return shouldHaveThrown(w, resp, thrower);
             default: break;