--- a/test/jdk/java/net/httpclient/ThrowingPublishers.java Tue Feb 27 19:26:25 2018 +0000
+++ b/test/jdk/java/net/httpclient/ThrowingPublishers.java Wed Feb 28 15:48:46 2018 +0000
@@ -66,14 +66,17 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -154,7 +157,7 @@
out.println("Failed tests: ");
FAILURES.entrySet().forEach((e) -> {
out.printf("\t%s: %s%n", e.getKey(), e.getValue());
- e.getValue().printStackTrace();
+ e.getValue().printStackTrace(out);
});
if (tasksFailed) {
System.out.println("WARNING: Some tasks failed");
@@ -263,10 +266,6 @@
System.out.println("publishing done");
publisher.close();
},
-// Stream.of(BODY.split("\\|"))
-// .onClose(() -> {System.out.println("publishing done"); publisher.close();})
-// .forEachOrdered(s -> { System.out.println("submitting \"" + s +"\""); publisher
-// .submit(ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)));}),
executor);
HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
@@ -280,7 +279,7 @@
}
}
- // @Test(dataProvider = "variants")
+ @Test(dataProvider = "variants")
public void testThrowingAsString(String uri,
boolean sameClient,
Thrower thrower)
@@ -289,7 +288,7 @@
String test = format("testThrowingAsString(%s, %b, %s)",
uri, sameClient, thrower);
testThrowing(test, uri, sameClient, () -> BodyPublishers.ofString(BODY),
- this::shouldHaveThrown, thrower,false);
+ this::shouldNotThrowInCancel, thrower,false);
}
private <T,U> void testThrowing(String name, String uri, boolean sameClient,
@@ -323,6 +322,7 @@
new ThrowingBodyPublisher(where.select(thrower), publishers.get());
HttpRequest req = HttpRequest.
newBuilder(URI.create(uri))
+ .header("X-expect-exception", "true")
.POST(bodyPublisher)
.build();
BodyHandler<String> handler = BodyHandlers.ofString();
@@ -332,17 +332,17 @@
try {
response = client.sendAsync(req, handler).join();
} catch (Error | Exception x) {
- Throwable cause = findCause(x, thrower);
- if (cause == null) throw x;
+ Throwable cause = findCause(where, x, thrower);
+ if (cause == null) throw causeNotFound(where, x);
System.out.println(now() + "Got expected exception: " + cause);
}
} else {
try {
response = client.send(req, handler);
} catch (Error | Exception t) {
- if (thrower.test(t)) {
+ if (thrower.test(where, t)) {
System.out.println(now() + "Got expected exception: " + t);
- } else throw t;
+ } else throw causeNotFound(where, t);
}
}
if (response != null) {
@@ -366,7 +366,7 @@
}
}
- interface Thrower extends Consumer<Where>, Predicate<Throwable> {
+ interface Thrower extends Consumer<Where>, BiPredicate<Where,Throwable> {
}
@@ -374,17 +374,41 @@
U finish(Where w, HttpResponse<T> resp, Thrower thrower) throws IOException;
}
+ final <T,U> U shouldNotThrowInCancel(Where w, HttpResponse<T> resp, Thrower thrower) {
+ switch (w) {
+ case BEFORE_CANCEL: return null;
+ case AFTER_CANCEL: return null;
+ default: break;
+ }
+ return shouldHaveThrown(w, resp, thrower);
+ }
+
+
final <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {
throw new RuntimeException("Expected exception not thrown in " + w);
}
- private static Throwable findCause(Throwable x,
- Predicate<Throwable> filter) {
- while (x != null && !filter.test(x)) x = x.getCause();
+ private static Throwable findCause(Where w,
+ Throwable x,
+ BiPredicate<Where, Throwable> filter) {
+ while (x != null && !filter.test(w,x)) x = x.getCause();
return x;
}
+ static AssertionError causeNotFound(Where w, Throwable t) {
+ return new AssertionError("Expected exception not found in " + w, t);
+ }
+
+ static boolean isConnectionClosedLocally(Throwable t) {
+ if (t instanceof CompletionException) t = t.getCause();
+ if (t instanceof ExecutionException) t = t.getCause();
+ if (t instanceof IOException) {
+ return t.getMessage().contains("connection closed locally");
+ }
+ return false;
+ }
+
static final class UncheckedCustomExceptionThrower implements Thrower {
@Override
public void accept(Where where) {
@@ -393,7 +417,14 @@
}
@Override
- public boolean test(Throwable throwable) {
+ public boolean test(Where w, Throwable throwable) {
+ switch (w) {
+ case AFTER_REQUEST:
+ if (isConnectionClosedLocally(throwable)) return true;
+ break;
+ default:
+ break;
+ }
return UncheckedCustomException.class.isInstance(throwable);
}
@@ -411,7 +442,14 @@
}
@Override
- public boolean test(Throwable throwable) {
+ public boolean test(Where w, Throwable throwable) {
+ switch (w) {
+ case AFTER_REQUEST:
+ if (isConnectionClosedLocally(throwable)) return true;
+ break;
+ default:
+ break;
+ }
return UncheckedIOException.class.isInstance(throwable)
&& CustomIOException.class.isInstance(throwable.getCause());
}
@@ -459,11 +497,12 @@
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
try {
throwing.accept(Where.BEFORE_SUBSCRIBE);
- publisher.subscribe(subscriber);
+ publisher.subscribe(new SubscriberWrapper(subscriber));
subscribedCF.complete(null);
throwing.accept(Where.AFTER_SUBSCRIBE);
} catch (Throwable t) {
subscribedCF.completeExceptionally(t);
+ throw t;
}
}