--- a/test/jdk/java/net/httpclient/ThrowingPublishers.java Wed Feb 28 19:13:16 2018 +0000
+++ b/test/jdk/java/net/httpclient/ThrowingPublishers.java Thu Mar 01 13:36:41 2018 +0000
@@ -54,20 +54,17 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.http.HttpClient;
-import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublisher;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers;
-import java.net.http.HttpResponse.BodySubscriber;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -78,7 +75,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
-import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -287,7 +283,10 @@
{
String test = format("testThrowingAsString(%s, %b, %s)",
uri, sameClient, thrower);
- testThrowing(test, uri, sameClient, () -> BodyPublishers.ofString(BODY),
+ List<byte[]> bytes = Stream.of(BODY.split("|"))
+ .map(s -> s.getBytes(UTF_8))
+ .collect(Collectors.toList());
+ testThrowing(test, uri, sameClient, () -> BodyPublishers.ofByteArrays(bytes),
this::shouldNotThrowInCancel, thrower,false);
}
@@ -352,8 +351,8 @@
}
enum Where {
- BEFORE_SUBSCRIBE, BEFORE_REQUEST, BEFORE_CANCEL,
- AFTER_SUBSCRIBE, AFTER_REQUEST, AFTER_CANCEL;
+ BEFORE_SUBSCRIBE, BEFORE_REQUEST, BEFORE_NEXT_REQUEST, BEFORE_CANCEL,
+ AFTER_SUBSCRIBE, AFTER_REQUEST, AFTER_NEXT_REQUEST, AFTER_CANCEL;
public Consumer<Where> select(Consumer<Where> consumer) {
return new Consumer<Where>() {
@Override
@@ -385,7 +384,11 @@
final <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {
- throw new RuntimeException("Expected exception not thrown in " + w);
+ String msg = "Expected exception not thrown in " + w
+ + "\n\tReceived: " + resp
+ + "\n\tWith body: " + resp.body();
+ System.out.println(msg);
+ throw new RuntimeException(msg);
}
@@ -404,7 +407,9 @@
if (t instanceof CompletionException) t = t.getCause();
if (t instanceof ExecutionException) t = t.getCause();
if (t instanceof IOException) {
- return t.getMessage().contains("connection closed locally");
+ String msg = t.getMessage();
+ return msg == null ? false
+ : msg.contains("connection closed locally");
}
return false;
}
@@ -420,6 +425,8 @@
public boolean test(Where w, Throwable throwable) {
switch (w) {
case AFTER_REQUEST:
+ case BEFORE_NEXT_REQUEST:
+ case AFTER_NEXT_REQUEST:
if (isConnectionClosedLocally(throwable)) return true;
break;
default:
@@ -445,6 +452,8 @@
public boolean test(Where w, Throwable throwable) {
switch (w) {
case AFTER_REQUEST:
+ case BEFORE_NEXT_REQUEST:
+ case AFTER_NEXT_REQUEST:
if (isConnectionClosedLocally(throwable)) return true;
break;
default:
@@ -512,14 +521,19 @@
class SubscriptionWrapper implements Flow.Subscription {
final Flow.Subscription subscription;
+ final AtomicLong requestCount = new AtomicLong();
SubscriptionWrapper(Flow.Subscription subscription) {
this.subscription = subscription;
}
@Override
public void request(long n) {
+ long count = requestCount.incrementAndGet();
+ System.out.printf("%s request-%d(%d)%n", now(), count, n);
+ if (count > 1) throwing.accept(Where.BEFORE_NEXT_REQUEST);
throwing.accept(Where.BEFORE_REQUEST);
subscription.request(n);
throwing.accept(Where.AFTER_REQUEST);
+ if (count > 1) throwing.accept(Where.AFTER_NEXT_REQUEST);
}
@Override