http-client-branch: fix for subscribers throwing in onSubscribe http-client-branch
authordfuchs
Tue, 13 Mar 2018 14:24:15 +0000
branchhttp-client-branch
changeset 56288 2de1aa88cf06
parent 56286 3d8333fa243b
child 56289 904b7c299931
http-client-branch: fix for subscribers throwing in onSubscribe
src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java
test/jdk/java/net/httpclient/ThrowingSubscribers.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Tue Mar 13 10:39:26 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Tue Mar 13 14:24:15 2018 +0000
@@ -560,7 +560,12 @@
         @Override
         public void onSubscribe(AbstractSubscription s) {
             this.subscription = s;
-            parser.onSubscribe(s);
+            try {
+                parser.onSubscribe(s);
+            } catch (Throwable t) {
+                cf.completeExceptionally(t);
+                throw t;
+            }
         }
 
         @Override
--- a/test/jdk/java/net/httpclient/ThrowingSubscribers.java	Tue Mar 13 10:39:26 2018 +0000
+++ b/test/jdk/java/net/httpclient/ThrowingSubscribers.java	Tue Mar 13 14:24:15 2018 +0000
@@ -63,6 +63,7 @@
 import java.net.http.HttpResponse.BodySubscriber;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -229,6 +230,36 @@
         }
     }
 
+    enum SubscriberType {
+        INLINE,  // In line subscribers complete their CF on ON_COMPLETE
+                 // e.g. BodySubscribers::ofString
+        OFFLINE; // Off line subscribers complete their CF immediately
+                 // but require the client to pull the data after the
+                 // CF completes (e.g. BodySubscribers::ofInputStream)
+    }
+
+    static EnumSet<Where> excludes(SubscriberType type) {
+        EnumSet<Where> set = EnumSet.noneOf(Where.class);
+
+        if (type == SubscriberType.OFFLINE) {
+            // Throwing on onSubscribe needs some more work
+            // for the case of InputStream, where the body has already
+            // completed by the time the subscriber is subscribed.
+            // The only way we have at that point to relay the exception
+            // is to call onError on the subscriber, but should we if
+            // Subscriber::onSubscribed has thrown an exception and
+            // not completed normally?
+            set.add(Where.ON_SUBSCRIBE);
+        }
+
+        // Don't know how to make the stack reliably cause onError
+        // to be called without closing the connection.
+        // And how do we get the exception if onError throws anyway?
+        set.add(Where.ON_ERROR);
+
+        return set;
+    }
+
     @Test(dataProvider = "noThrows")
     public void testNoThrows(String uri, boolean sameClient)
             throws Exception {
@@ -258,7 +289,8 @@
         String test = format("testThrowingAsString(%s, %b, %s)",
                              uri, sameClient, thrower);
         testThrowing(test, uri, sameClient, BodyHandlers::ofString,
-                this::shouldHaveThrown, thrower,false);
+                this::shouldHaveThrown, thrower,false,
+                excludes(SubscriberType.INLINE));
     }
 
     @Test(dataProvider = "variants")
@@ -270,7 +302,8 @@
         String test =  format("testThrowingAsLines(%s, %b, %s)",
                 uri, sameClient, thrower);
         testThrowing(test, uri, sameClient, BodyHandlers::ofLines,
-                this::checkAsLines, thrower,false);
+                this::checkAsLines, thrower,false,
+                excludes(SubscriberType.OFFLINE));
     }
 
     @Test(dataProvider = "variants")
@@ -282,7 +315,8 @@
         String test = format("testThrowingAsInputStream(%s, %b, %s)",
                 uri, sameClient, thrower);
         testThrowing(test, uri, sameClient, BodyHandlers::ofInputStream,
-                this::checkAsInputStream,  thrower,false);
+                this::checkAsInputStream,  thrower,false,
+                excludes(SubscriberType.OFFLINE));
     }
 
     @Test(dataProvider = "variants")
@@ -294,7 +328,8 @@
         String test = format("testThrowingAsStringAsync(%s, %b, %s)",
                 uri, sameClient, thrower);
         testThrowing(test, uri, sameClient, BodyHandlers::ofString,
-                     this::shouldHaveThrown, thrower, true);
+                     this::shouldHaveThrown, thrower, true,
+                excludes(SubscriberType.INLINE));
     }
 
     @Test(dataProvider = "variants")
@@ -306,7 +341,8 @@
         String test = format("testThrowingAsLinesAsync(%s, %b, %s)",
                 uri, sameClient, thrower);
         testThrowing(test, uri, sameClient, BodyHandlers::ofLines,
-                this::checkAsLines, thrower,true);
+                this::checkAsLines, thrower,true,
+                excludes(SubscriberType.OFFLINE));
     }
 
     @Test(dataProvider = "variants")
@@ -318,17 +354,19 @@
         String test = format("testThrowingAsInputStreamAsync(%s, %b, %s)",
                 uri, sameClient, thrower);
         testThrowing(test, uri, sameClient, BodyHandlers::ofInputStream,
-                this::checkAsInputStream, thrower,true);
+                this::checkAsInputStream, thrower,true,
+                excludes(SubscriberType.OFFLINE));
     }
 
     private <T,U> void testThrowing(String name, String uri, boolean sameClient,
                                     Supplier<BodyHandler<T>> handlers,
-                                    Finisher finisher, Thrower thrower, boolean async)
+                                    Finisher finisher, Thrower thrower,
+                                    boolean async, EnumSet<Where> excludes)
             throws Exception
     {
         out.printf("%n%s%s%n", now(), name);
         try {
-            testThrowing(uri, sameClient, handlers, finisher, thrower, async);
+            testThrowing(uri, sameClient, handlers, finisher, thrower, async, excludes);
         } catch (Error | Exception x) {
             FAILURES.putIfAbsent(name, x);
             throw x;
@@ -338,25 +376,12 @@
     private <T,U> void testThrowing(String uri, boolean sameClient,
                                     Supplier<BodyHandler<T>> handlers,
                                     Finisher finisher, Thrower thrower,
-                                    boolean async)
+                                    boolean async,
+                                    EnumSet<Where> excludes)
             throws Exception
     {
         HttpClient client = null;
-        for (Where where : Where.values()) {
-
-            // Throwing on onSubscribe needs some more work
-            // for the case of InputStream, where the body has already
-            // completed by the time the subscriber is subscribed.
-            // The only way we have at that point to relay the exception
-            // is to call onError on the subscriber, but should we if
-            // Subscriber::onSubscribed has thrown an exception and
-            // not completed normally?
-            if (where == Where.ON_SUBSCRIBE) continue;
-
-            // Don't know how to make the stack reliably cause onError
-            // to be called without closing the connection.
-            // And how do we get the exception if onError throws anyway?
-            if (where == Where.ON_ERROR) continue;
+        for (Where where : EnumSet.complementOf(excludes)) {
 
             if (!sameClient || client == null)
                 client = newHttpClient(sameClient);