http-client-branch: more ThrowingPublishers tests and fixes http-client-branch
authordfuchs
Wed, 28 Feb 2018 15:48:46 +0000
branchhttp-client-branch
changeset 56209 43d5ad612710
parent 56208 d37c08ce784a
child 56210 efa5db108669
http-client-branch: more ThrowingPublishers tests and fixes
src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java
src/java.net.http/share/classes/jdk/internal/net/http/Http1Request.java
test/jdk/java/net/httpclient/HttpServerAdapters.java
test/jdk/java/net/httpclient/ThrowingPublishers.java
test/jdk/java/net/httpclient/ThrowingSubscribers.java
test/jdk/java/net/httpclient/http2/server/BodyInputStream.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Tue Feb 27 19:26:25 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Wed Feb 28 15:48:46 2018 +0000
@@ -53,9 +53,6 @@
 
     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
-    private static final System.Logger DEBUG_LOGGER =
-            Utils.getDebugLogger("Http1Exchange"::toString, DEBUG);
-
     final HttpRequestImpl request; // main request
     final Http1Request requestAction;
     private volatile Http1Response<T> response;
@@ -121,12 +118,17 @@
         final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>();
         private volatile Flow.Subscription subscription;
         volatile boolean complete;
+        private final System.Logger debug;
+        Http1BodySubscriber(System.Logger debug) {
+            assert debug != null;
+            this.debug = debug;
+        }
 
         /** Final sentinel in the stream of request body. */
         static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0));
 
         final void request(long n) {
-            DEBUG_LOGGER.log(Level.DEBUG, () ->
+            debug.log(Level.DEBUG, () ->
                 "Http1BodySubscriber requesting " + n + ", from " + subscription);
             subscription.request(n);
         }
@@ -141,11 +143,17 @@
         }
 
         final void cancelSubscription() {
-            subscription.cancel();
+            try {
+                subscription.cancel();
+            } catch(Throwable t) {
+                String msg = "Ignoring exception raised when canceling BodyPublisher subscription";
+                debug.log(Level.DEBUG, "%s: %s", msg, t);
+                Log.logError("{0}: {1}", msg, (Object)t);
+            }
         }
 
-        static Http1BodySubscriber completeSubscriber() {
-            return new Http1BodySubscriber() {
+        static Http1BodySubscriber completeSubscriber(System.Logger debug) {
+            return new Http1BodySubscriber(debug) {
                 @Override public void onSubscribe(Flow.Subscription subscription) { error(); }
                 @Override public void onNext(ByteBuffer item) { error(); }
                 @Override public void onError(Throwable throwable) { error(); }
@@ -285,15 +293,15 @@
         try {
             bodySubscriber = requestAction.continueRequest();
             if (bodySubscriber == null) {
-                bodySubscriber = Http1BodySubscriber.completeSubscriber();
+                bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
                 appendToOutgoing(Http1BodySubscriber.COMPLETED);
             } else {
                 // start
-                bodySubscriber.whenSubscribed.thenAccept(
-                        (s) -> bodySubscriber.request(1));
+                bodySubscriber.whenSubscribed
+                        .thenAccept((s) -> requestMoreBody());
             }
         } catch (Throwable t) {
-            connection.close();
+            cancelImpl(t);
             bodySentCF.completeExceptionally(t);
         }
         return bodySentCF;
@@ -385,9 +393,11 @@
     private void cancelImpl(Throwable cause) {
         LinkedList<CompletableFuture<?>> toComplete = null;
         int count = 0;
+        Throwable error;
         synchronized (lock) {
-            if (failed == null)
-                failed = cause;
+            if ((error = failed) == null) {
+                failed = error = cause;
+            }
             if (requestAction != null && requestAction.finished()
                     && response != null && response.finished()) {
                 return;
@@ -426,12 +436,13 @@
             Executor exec = client.isSelectorThread()
                             ? executor
                             : this::runInline;
+            Throwable x = error;
             while (!toComplete.isEmpty()) {
                 CompletableFuture<?> cf = toComplete.poll();
                 exec.execute(() -> {
-                    if (cf.completeExceptionally(cause)) {
+                    if (cf.completeExceptionally(x)) {
                         debug.log(Level.DEBUG, "completed cf with %s",
-                                 (Object) cause);
+                                 (Object) x);
                     }
                 });
             }
@@ -479,6 +490,17 @@
         return !outgoing.isEmpty();
     }
 
+    private void requestMoreBody() {
+        try {
+            debug.log(Level.DEBUG, "requesting more body from the subscriber");
+            bodySubscriber.request(1);
+        } catch (Throwable t) {
+            debug.log(Level.DEBUG, "Subscription::request failed", t);
+            cancelImpl(t);
+            bodySentCF.completeExceptionally(t);
+        }
+    }
+
     // Invoked only by the publisher
     // ALL tasks should execute off the Selector-Manager thread
     /** Returns the next portion of the HTTP request, or the error. */
@@ -513,8 +535,7 @@
                         debug.log(Level.DEBUG, "initiating completion of bodySentCF");
                         bodySentCF.completeAsync(() -> this, exec);
                     } else {
-                        debug.log(Level.DEBUG, "requesting more body from the subscriber");
-                        exec.execute(() -> bodySubscriber.request(1));
+                        exec.execute(this::requestMoreBody);
                     }
                     break;
                 case INITIAL:
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Request.java	Tue Feb 27 19:26:25 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Request.java	Wed Feb 28 15:48:46 2018 +0000
@@ -244,6 +244,8 @@
 
     final class StreamSubscriber extends Http1BodySubscriber {
 
+        StreamSubscriber() { super(debug); }
+
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
             if (isSubscribed()) {
@@ -301,6 +303,7 @@
     final class FixedContentSubscriber extends Http1BodySubscriber {
 
         private volatile long contentWritten;
+        FixedContentSubscriber() { super(debug); }
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
--- a/test/jdk/java/net/httpclient/HttpServerAdapters.java	Tue Feb 27 19:26:25 2018 +0000
+++ b/test/jdk/java/net/httpclient/HttpServerAdapters.java	Wed Feb 28 15:48:46 2018 +0000
@@ -327,6 +327,15 @@
         }
     }
 
+    public static boolean expectException(HttpTestExchange e) {
+        HttpTestHeaders h = e.getRequestHeaders();
+        Optional<String> expectException = h.firstValue("X-expect-exception");
+        if (expectException.isPresent()) {
+            return expectException.get().equalsIgnoreCase("true");
+        }
+        return false;
+    }
+
     /**
      * A version agnostic adapter class for HTTP Server Filter Chains.
      */
@@ -351,9 +360,9 @@
                 try {
                     exchange.doFilter(chain);
                 } catch (Throwable t) {
-                    System.out.println("WARNING: exception caught in Http1Chain::doFilter" + t);
-                    System.err.println("WARNING: exception caught in Http1Chain::doFilter" + t);
-                    if (PRINTSTACK) t.printStackTrace();
+                    System.out.println("WARNING: exception caught in Http1Chain::doFilter " + t);
+                    System.err.println("WARNING: exception caught in Http1Chain::doFilter " + t);
+                    if (PRINTSTACK && !expectException(exchange)) t.printStackTrace();
                     throw t;
                 }
             }
@@ -375,9 +384,9 @@
                         handler.handle(exchange);
                     }
                 } catch (Throwable t) {
-                    System.out.println("WARNING: exception caught in Http2Chain::doFilter" + t);
-                    System.err.println("WARNING: exception caught in Http2Chain::doFilter" + t);
-                    if (PRINTSTACK) t.printStackTrace();
+                    System.out.println("WARNING: exception caught in Http2Chain::doFilter " + t);
+                    System.err.println("WARNING: exception caught in Http2Chain::doFilter " + t);
+                    if (PRINTSTACK && !expectException(exchange)) t.printStackTrace();
                     throw t;
                 }
             }
--- a/test/jdk/java/net/httpclient/ThrowingPublishers.java	Tue Feb 27 19:26:25 2018 +0000
+++ b/test/jdk/java/net/httpclient/ThrowingPublishers.java	Wed Feb 28 15:48:46 2018 +0000
@@ -66,14 +66,17 @@
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Flow;
 import java.util.concurrent.SubmissionPublisher;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
@@ -154,7 +157,7 @@
             out.println("Failed tests: ");
             FAILURES.entrySet().forEach((e) -> {
                 out.printf("\t%s: %s%n", e.getKey(), e.getValue());
-                e.getValue().printStackTrace();
+                e.getValue().printStackTrace(out);
             });
             if (tasksFailed) {
                 System.out.println("WARNING: Some tasks failed");
@@ -263,10 +266,6 @@
                                 System.out.println("publishing done");
                                 publisher.close();
                             },
-//                        Stream.of(BODY.split("\\|"))
-//                                .onClose(() -> {System.out.println("publishing done"); publisher.close();})
-//                                .forEachOrdered(s -> { System.out.println("submitting \"" + s +"\""); publisher
-//                                        .submit(ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)));}),
                     executor);
 
             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
@@ -280,7 +279,7 @@
         }
     }
 
-    // @Test(dataProvider = "variants")
+    @Test(dataProvider = "variants")
     public void testThrowingAsString(String uri,
                                      boolean sameClient,
                                      Thrower thrower)
@@ -289,7 +288,7 @@
         String test = format("testThrowingAsString(%s, %b, %s)",
                              uri, sameClient, thrower);
         testThrowing(test, uri, sameClient, () -> BodyPublishers.ofString(BODY),
-                this::shouldHaveThrown, thrower,false);
+                this::shouldNotThrowInCancel, thrower,false);
     }
 
     private <T,U> void testThrowing(String name, String uri, boolean sameClient,
@@ -323,6 +322,7 @@
                     new ThrowingBodyPublisher(where.select(thrower), publishers.get());
             HttpRequest req = HttpRequest.
                     newBuilder(URI.create(uri))
+                    .header("X-expect-exception", "true")
                     .POST(bodyPublisher)
                     .build();
             BodyHandler<String> handler = BodyHandlers.ofString();
@@ -332,17 +332,17 @@
                 try {
                     response = client.sendAsync(req, handler).join();
                 } catch (Error | Exception x) {
-                    Throwable cause = findCause(x, thrower);
-                    if (cause == null) throw x;
+                    Throwable cause = findCause(where, x, thrower);
+                    if (cause == null) throw causeNotFound(where, x);
                     System.out.println(now() + "Got expected exception: " + cause);
                 }
             } else {
                 try {
                     response = client.send(req, handler);
                 } catch (Error | Exception t) {
-                    if (thrower.test(t)) {
+                    if (thrower.test(where, t)) {
                         System.out.println(now() + "Got expected exception: " + t);
-                    } else throw t;
+                    } else throw causeNotFound(where, t);
                 }
             }
             if (response != null) {
@@ -366,7 +366,7 @@
         }
     }
 
-    interface Thrower extends Consumer<Where>, Predicate<Throwable> {
+    interface Thrower extends Consumer<Where>, BiPredicate<Where,Throwable> {
 
     }
 
@@ -374,17 +374,41 @@
         U finish(Where w, HttpResponse<T> resp, Thrower thrower) throws IOException;
     }
 
+    final <T,U> U shouldNotThrowInCancel(Where w, HttpResponse<T> resp, Thrower thrower) {
+        switch (w) {
+            case BEFORE_CANCEL: return null;
+            case AFTER_CANCEL: return null;
+            default: break;
+        }
+        return shouldHaveThrown(w, resp, thrower);
+    }
+
+
     final <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {
         throw new RuntimeException("Expected exception not thrown in " + w);
     }
 
 
-    private static Throwable findCause(Throwable x,
-                                       Predicate<Throwable> filter) {
-        while (x != null && !filter.test(x)) x = x.getCause();
+    private static Throwable findCause(Where w,
+                                       Throwable x,
+                                       BiPredicate<Where, Throwable> filter) {
+        while (x != null && !filter.test(w,x)) x = x.getCause();
         return x;
     }
 
+    static AssertionError causeNotFound(Where w, Throwable t) {
+        return new AssertionError("Expected exception not found in " + w, t);
+    }
+
+    static boolean isConnectionClosedLocally(Throwable t) {
+        if (t instanceof CompletionException) t = t.getCause();
+        if (t instanceof ExecutionException) t = t.getCause();
+        if (t instanceof IOException) {
+            return t.getMessage().contains("connection closed locally");
+        }
+        return false;
+    }
+
     static final class UncheckedCustomExceptionThrower implements Thrower {
         @Override
         public void accept(Where where) {
@@ -393,7 +417,14 @@
         }
 
         @Override
-        public boolean test(Throwable throwable) {
+        public boolean test(Where w, Throwable throwable) {
+            switch (w) {
+                case AFTER_REQUEST:
+                    if (isConnectionClosedLocally(throwable)) return true;
+                    break;
+                default:
+                    break;
+            }
             return UncheckedCustomException.class.isInstance(throwable);
         }
 
@@ -411,7 +442,14 @@
         }
 
         @Override
-        public boolean test(Throwable throwable) {
+        public boolean test(Where w, Throwable throwable) {
+            switch (w) {
+                case AFTER_REQUEST:
+                    if (isConnectionClosedLocally(throwable)) return true;
+                    break;
+                default:
+                    break;
+            }
             return UncheckedIOException.class.isInstance(throwable)
                     && CustomIOException.class.isInstance(throwable.getCause());
         }
@@ -459,11 +497,12 @@
         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
             try {
                 throwing.accept(Where.BEFORE_SUBSCRIBE);
-                publisher.subscribe(subscriber);
+                publisher.subscribe(new SubscriberWrapper(subscriber));
                 subscribedCF.complete(null);
                 throwing.accept(Where.AFTER_SUBSCRIBE);
             } catch (Throwable t) {
                 subscribedCF.completeExceptionally(t);
+                throw t;
             }
         }
 
--- a/test/jdk/java/net/httpclient/ThrowingSubscribers.java	Tue Feb 27 19:26:25 2018 +0000
+++ b/test/jdk/java/net/httpclient/ThrowingSubscribers.java	Wed Feb 28 15:48:46 2018 +0000
@@ -151,6 +151,7 @@
             out.println("Failed tests: ");
             FAILURES.entrySet().forEach((e) -> {
                 out.printf("\t%s: %s%n", e.getKey(), e.getValue());
+                e.getValue().printStackTrace(out);
                 e.getValue().printStackTrace();
             });
             if (tasksFailed) {
--- a/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java	Tue Feb 27 19:26:25 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java	Wed Feb 28 15:48:46 2018 +0000
@@ -62,6 +62,7 @@
         Http2Frame frame;
         do {
             frame = q.take();
+            if (frame == null) return null; // closed/eof before receiving data.
             // ignoring others for now Wupdates handled elsewhere
             if (frame.type() != DataFrame.TYPE) {
                 System.out.println("Ignoring " + frame.toString() + " CHECK THIS");