test/jdk/java/net/httpclient/ThrowingPublishers.java
branchhttp-client-branch
changeset 56209 43d5ad612710
parent 56204 e5d0c20217a3
child 56218 fd7bd32963ef
--- a/test/jdk/java/net/httpclient/ThrowingPublishers.java	Tue Feb 27 19:26:25 2018 +0000
+++ b/test/jdk/java/net/httpclient/ThrowingPublishers.java	Wed Feb 28 15:48:46 2018 +0000
@@ -66,14 +66,17 @@
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Flow;
 import java.util.concurrent.SubmissionPublisher;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
@@ -154,7 +157,7 @@
             out.println("Failed tests: ");
             FAILURES.entrySet().forEach((e) -> {
                 out.printf("\t%s: %s%n", e.getKey(), e.getValue());
-                e.getValue().printStackTrace();
+                e.getValue().printStackTrace(out);
             });
             if (tasksFailed) {
                 System.out.println("WARNING: Some tasks failed");
@@ -263,10 +266,6 @@
                                 System.out.println("publishing done");
                                 publisher.close();
                             },
-//                        Stream.of(BODY.split("\\|"))
-//                                .onClose(() -> {System.out.println("publishing done"); publisher.close();})
-//                                .forEachOrdered(s -> { System.out.println("submitting \"" + s +"\""); publisher
-//                                        .submit(ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)));}),
                     executor);
 
             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
@@ -280,7 +279,7 @@
         }
     }
 
-    // @Test(dataProvider = "variants")
+    @Test(dataProvider = "variants")
     public void testThrowingAsString(String uri,
                                      boolean sameClient,
                                      Thrower thrower)
@@ -289,7 +288,7 @@
         String test = format("testThrowingAsString(%s, %b, %s)",
                              uri, sameClient, thrower);
         testThrowing(test, uri, sameClient, () -> BodyPublishers.ofString(BODY),
-                this::shouldHaveThrown, thrower,false);
+                this::shouldNotThrowInCancel, thrower,false);
     }
 
     private <T,U> void testThrowing(String name, String uri, boolean sameClient,
@@ -323,6 +322,7 @@
                     new ThrowingBodyPublisher(where.select(thrower), publishers.get());
             HttpRequest req = HttpRequest.
                     newBuilder(URI.create(uri))
+                    .header("X-expect-exception", "true")
                     .POST(bodyPublisher)
                     .build();
             BodyHandler<String> handler = BodyHandlers.ofString();
@@ -332,17 +332,17 @@
                 try {
                     response = client.sendAsync(req, handler).join();
                 } catch (Error | Exception x) {
-                    Throwable cause = findCause(x, thrower);
-                    if (cause == null) throw x;
+                    Throwable cause = findCause(where, x, thrower);
+                    if (cause == null) throw causeNotFound(where, x);
                     System.out.println(now() + "Got expected exception: " + cause);
                 }
             } else {
                 try {
                     response = client.send(req, handler);
                 } catch (Error | Exception t) {
-                    if (thrower.test(t)) {
+                    if (thrower.test(where, t)) {
                         System.out.println(now() + "Got expected exception: " + t);
-                    } else throw t;
+                    } else throw causeNotFound(where, t);
                 }
             }
             if (response != null) {
@@ -366,7 +366,7 @@
         }
     }
 
-    interface Thrower extends Consumer<Where>, Predicate<Throwable> {
+    interface Thrower extends Consumer<Where>, BiPredicate<Where,Throwable> {
 
     }
 
@@ -374,17 +374,41 @@
         U finish(Where w, HttpResponse<T> resp, Thrower thrower) throws IOException;
     }
 
+    final <T,U> U shouldNotThrowInCancel(Where w, HttpResponse<T> resp, Thrower thrower) {
+        switch (w) {
+            case BEFORE_CANCEL: return null;
+            case AFTER_CANCEL: return null;
+            default: break;
+        }
+        return shouldHaveThrown(w, resp, thrower);
+    }
+
+
     final <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {
         throw new RuntimeException("Expected exception not thrown in " + w);
     }
 
 
-    private static Throwable findCause(Throwable x,
-                                       Predicate<Throwable> filter) {
-        while (x != null && !filter.test(x)) x = x.getCause();
+    private static Throwable findCause(Where w,
+                                       Throwable x,
+                                       BiPredicate<Where, Throwable> filter) {
+        while (x != null && !filter.test(w,x)) x = x.getCause();
         return x;
     }
 
+    static AssertionError causeNotFound(Where w, Throwable t) {
+        return new AssertionError("Expected exception not found in " + w, t);
+    }
+
+    static boolean isConnectionClosedLocally(Throwable t) {
+        if (t instanceof CompletionException) t = t.getCause();
+        if (t instanceof ExecutionException) t = t.getCause();
+        if (t instanceof IOException) {
+            return t.getMessage().contains("connection closed locally");
+        }
+        return false;
+    }
+
     static final class UncheckedCustomExceptionThrower implements Thrower {
         @Override
         public void accept(Where where) {
@@ -393,7 +417,14 @@
         }
 
         @Override
-        public boolean test(Throwable throwable) {
+        public boolean test(Where w, Throwable throwable) {
+            switch (w) {
+                case AFTER_REQUEST:
+                    if (isConnectionClosedLocally(throwable)) return true;
+                    break;
+                default:
+                    break;
+            }
             return UncheckedCustomException.class.isInstance(throwable);
         }
 
@@ -411,7 +442,14 @@
         }
 
         @Override
-        public boolean test(Throwable throwable) {
+        public boolean test(Where w, Throwable throwable) {
+            switch (w) {
+                case AFTER_REQUEST:
+                    if (isConnectionClosedLocally(throwable)) return true;
+                    break;
+                default:
+                    break;
+            }
             return UncheckedIOException.class.isInstance(throwable)
                     && CustomIOException.class.isInstance(throwable.getCause());
         }
@@ -459,11 +497,12 @@
         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
             try {
                 throwing.accept(Where.BEFORE_SUBSCRIBE);
-                publisher.subscribe(subscriber);
+                publisher.subscribe(new SubscriberWrapper(subscriber));
                 subscribedCF.complete(null);
                 throwing.accept(Where.AFTER_SUBSCRIBE);
             } catch (Throwable t) {
                 subscribedCF.completeExceptionally(t);
+                throw t;
             }
         }