--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Tue Feb 27 19:26:25 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Wed Feb 28 15:48:46 2018 +0000
@@ -53,9 +53,6 @@
static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
- private static final System.Logger DEBUG_LOGGER =
- Utils.getDebugLogger("Http1Exchange"::toString, DEBUG);
-
final HttpRequestImpl request; // main request
final Http1Request requestAction;
private volatile Http1Response<T> response;
@@ -121,12 +118,17 @@
final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>();
private volatile Flow.Subscription subscription;
volatile boolean complete;
+ private final System.Logger debug;
+ Http1BodySubscriber(System.Logger debug) {
+ assert debug != null;
+ this.debug = debug;
+ }
/** Final sentinel in the stream of request body. */
static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0));
final void request(long n) {
- DEBUG_LOGGER.log(Level.DEBUG, () ->
+ debug.log(Level.DEBUG, () ->
"Http1BodySubscriber requesting " + n + ", from " + subscription);
subscription.request(n);
}
@@ -141,11 +143,17 @@
}
final void cancelSubscription() {
- subscription.cancel();
+ try {
+ subscription.cancel();
+ } catch(Throwable t) {
+ String msg = "Ignoring exception raised when canceling BodyPublisher subscription";
+ debug.log(Level.DEBUG, "%s: %s", msg, t);
+ Log.logError("{0}: {1}", msg, (Object)t);
+ }
}
- static Http1BodySubscriber completeSubscriber() {
- return new Http1BodySubscriber() {
+ static Http1BodySubscriber completeSubscriber(System.Logger debug) {
+ return new Http1BodySubscriber(debug) {
@Override public void onSubscribe(Flow.Subscription subscription) { error(); }
@Override public void onNext(ByteBuffer item) { error(); }
@Override public void onError(Throwable throwable) { error(); }
@@ -285,15 +293,15 @@
try {
bodySubscriber = requestAction.continueRequest();
if (bodySubscriber == null) {
- bodySubscriber = Http1BodySubscriber.completeSubscriber();
+ bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
appendToOutgoing(Http1BodySubscriber.COMPLETED);
} else {
// start
- bodySubscriber.whenSubscribed.thenAccept(
- (s) -> bodySubscriber.request(1));
+ bodySubscriber.whenSubscribed
+ .thenAccept((s) -> requestMoreBody());
}
} catch (Throwable t) {
- connection.close();
+ cancelImpl(t);
bodySentCF.completeExceptionally(t);
}
return bodySentCF;
@@ -385,9 +393,11 @@
private void cancelImpl(Throwable cause) {
LinkedList<CompletableFuture<?>> toComplete = null;
int count = 0;
+ Throwable error;
synchronized (lock) {
- if (failed == null)
- failed = cause;
+ if ((error = failed) == null) {
+ failed = error = cause;
+ }
if (requestAction != null && requestAction.finished()
&& response != null && response.finished()) {
return;
@@ -426,12 +436,13 @@
Executor exec = client.isSelectorThread()
? executor
: this::runInline;
+ Throwable x = error;
while (!toComplete.isEmpty()) {
CompletableFuture<?> cf = toComplete.poll();
exec.execute(() -> {
- if (cf.completeExceptionally(cause)) {
+ if (cf.completeExceptionally(x)) {
debug.log(Level.DEBUG, "completed cf with %s",
- (Object) cause);
+ (Object) x);
}
});
}
@@ -479,6 +490,17 @@
return !outgoing.isEmpty();
}
+ private void requestMoreBody() {
+ try {
+ debug.log(Level.DEBUG, "requesting more body from the subscriber");
+ bodySubscriber.request(1);
+ } catch (Throwable t) {
+ debug.log(Level.DEBUG, "Subscription::request failed", t);
+ cancelImpl(t);
+ bodySentCF.completeExceptionally(t);
+ }
+ }
+
// Invoked only by the publisher
// ALL tasks should execute off the Selector-Manager thread
/** Returns the next portion of the HTTP request, or the error. */
@@ -513,8 +535,7 @@
debug.log(Level.DEBUG, "initiating completion of bodySentCF");
bodySentCF.completeAsync(() -> this, exec);
} else {
- debug.log(Level.DEBUG, "requesting more body from the subscriber");
- exec.execute(() -> bodySubscriber.request(1));
+ exec.execute(this::requestMoreBody);
}
break;
case INITIAL:
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Request.java Tue Feb 27 19:26:25 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Request.java Wed Feb 28 15:48:46 2018 +0000
@@ -244,6 +244,8 @@
final class StreamSubscriber extends Http1BodySubscriber {
+ StreamSubscriber() { super(debug); }
+
@Override
public void onSubscribe(Flow.Subscription subscription) {
if (isSubscribed()) {
@@ -301,6 +303,7 @@
final class FixedContentSubscriber extends Http1BodySubscriber {
private volatile long contentWritten;
+ FixedContentSubscriber() { super(debug); }
@Override
public void onSubscribe(Flow.Subscription subscription) {
--- a/test/jdk/java/net/httpclient/HttpServerAdapters.java Tue Feb 27 19:26:25 2018 +0000
+++ b/test/jdk/java/net/httpclient/HttpServerAdapters.java Wed Feb 28 15:48:46 2018 +0000
@@ -327,6 +327,15 @@
}
}
+ public static boolean expectException(HttpTestExchange e) {
+ HttpTestHeaders h = e.getRequestHeaders();
+ Optional<String> expectException = h.firstValue("X-expect-exception");
+ if (expectException.isPresent()) {
+ return expectException.get().equalsIgnoreCase("true");
+ }
+ return false;
+ }
+
/**
* A version agnostic adapter class for HTTP Server Filter Chains.
*/
@@ -351,9 +360,9 @@
try {
exchange.doFilter(chain);
} catch (Throwable t) {
- System.out.println("WARNING: exception caught in Http1Chain::doFilter" + t);
- System.err.println("WARNING: exception caught in Http1Chain::doFilter" + t);
- if (PRINTSTACK) t.printStackTrace();
+ System.out.println("WARNING: exception caught in Http1Chain::doFilter " + t);
+ System.err.println("WARNING: exception caught in Http1Chain::doFilter " + t);
+ if (PRINTSTACK && !expectException(exchange)) t.printStackTrace();
throw t;
}
}
@@ -375,9 +384,9 @@
handler.handle(exchange);
}
} catch (Throwable t) {
- System.out.println("WARNING: exception caught in Http2Chain::doFilter" + t);
- System.err.println("WARNING: exception caught in Http2Chain::doFilter" + t);
- if (PRINTSTACK) t.printStackTrace();
+ System.out.println("WARNING: exception caught in Http2Chain::doFilter " + t);
+ System.err.println("WARNING: exception caught in Http2Chain::doFilter " + t);
+ if (PRINTSTACK && !expectException(exchange)) t.printStackTrace();
throw t;
}
}
--- a/test/jdk/java/net/httpclient/ThrowingPublishers.java Tue Feb 27 19:26:25 2018 +0000
+++ b/test/jdk/java/net/httpclient/ThrowingPublishers.java Wed Feb 28 15:48:46 2018 +0000
@@ -66,14 +66,17 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -154,7 +157,7 @@
out.println("Failed tests: ");
FAILURES.entrySet().forEach((e) -> {
out.printf("\t%s: %s%n", e.getKey(), e.getValue());
- e.getValue().printStackTrace();
+ e.getValue().printStackTrace(out);
});
if (tasksFailed) {
System.out.println("WARNING: Some tasks failed");
@@ -263,10 +266,6 @@
System.out.println("publishing done");
publisher.close();
},
-// Stream.of(BODY.split("\\|"))
-// .onClose(() -> {System.out.println("publishing done"); publisher.close();})
-// .forEachOrdered(s -> { System.out.println("submitting \"" + s +"\""); publisher
-// .submit(ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)));}),
executor);
HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
@@ -280,7 +279,7 @@
}
}
- // @Test(dataProvider = "variants")
+ @Test(dataProvider = "variants")
public void testThrowingAsString(String uri,
boolean sameClient,
Thrower thrower)
@@ -289,7 +288,7 @@
String test = format("testThrowingAsString(%s, %b, %s)",
uri, sameClient, thrower);
testThrowing(test, uri, sameClient, () -> BodyPublishers.ofString(BODY),
- this::shouldHaveThrown, thrower,false);
+ this::shouldNotThrowInCancel, thrower,false);
}
private <T,U> void testThrowing(String name, String uri, boolean sameClient,
@@ -323,6 +322,7 @@
new ThrowingBodyPublisher(where.select(thrower), publishers.get());
HttpRequest req = HttpRequest.
newBuilder(URI.create(uri))
+ .header("X-expect-exception", "true")
.POST(bodyPublisher)
.build();
BodyHandler<String> handler = BodyHandlers.ofString();
@@ -332,17 +332,17 @@
try {
response = client.sendAsync(req, handler).join();
} catch (Error | Exception x) {
- Throwable cause = findCause(x, thrower);
- if (cause == null) throw x;
+ Throwable cause = findCause(where, x, thrower);
+ if (cause == null) throw causeNotFound(where, x);
System.out.println(now() + "Got expected exception: " + cause);
}
} else {
try {
response = client.send(req, handler);
} catch (Error | Exception t) {
- if (thrower.test(t)) {
+ if (thrower.test(where, t)) {
System.out.println(now() + "Got expected exception: " + t);
- } else throw t;
+ } else throw causeNotFound(where, t);
}
}
if (response != null) {
@@ -366,7 +366,7 @@
}
}
- interface Thrower extends Consumer<Where>, Predicate<Throwable> {
+ interface Thrower extends Consumer<Where>, BiPredicate<Where,Throwable> {
}
@@ -374,17 +374,41 @@
U finish(Where w, HttpResponse<T> resp, Thrower thrower) throws IOException;
}
+ final <T,U> U shouldNotThrowInCancel(Where w, HttpResponse<T> resp, Thrower thrower) {
+ switch (w) {
+ case BEFORE_CANCEL: return null;
+ case AFTER_CANCEL: return null;
+ default: break;
+ }
+ return shouldHaveThrown(w, resp, thrower);
+ }
+
+
final <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {
throw new RuntimeException("Expected exception not thrown in " + w);
}
- private static Throwable findCause(Throwable x,
- Predicate<Throwable> filter) {
- while (x != null && !filter.test(x)) x = x.getCause();
+ private static Throwable findCause(Where w,
+ Throwable x,
+ BiPredicate<Where, Throwable> filter) {
+ while (x != null && !filter.test(w,x)) x = x.getCause();
return x;
}
+ static AssertionError causeNotFound(Where w, Throwable t) {
+ return new AssertionError("Expected exception not found in " + w, t);
+ }
+
+ static boolean isConnectionClosedLocally(Throwable t) {
+ if (t instanceof CompletionException) t = t.getCause();
+ if (t instanceof ExecutionException) t = t.getCause();
+ if (t instanceof IOException) {
+ return t.getMessage().contains("connection closed locally");
+ }
+ return false;
+ }
+
static final class UncheckedCustomExceptionThrower implements Thrower {
@Override
public void accept(Where where) {
@@ -393,7 +417,14 @@
}
@Override
- public boolean test(Throwable throwable) {
+ public boolean test(Where w, Throwable throwable) {
+ switch (w) {
+ case AFTER_REQUEST:
+ if (isConnectionClosedLocally(throwable)) return true;
+ break;
+ default:
+ break;
+ }
return UncheckedCustomException.class.isInstance(throwable);
}
@@ -411,7 +442,14 @@
}
@Override
- public boolean test(Throwable throwable) {
+ public boolean test(Where w, Throwable throwable) {
+ switch (w) {
+ case AFTER_REQUEST:
+ if (isConnectionClosedLocally(throwable)) return true;
+ break;
+ default:
+ break;
+ }
return UncheckedIOException.class.isInstance(throwable)
&& CustomIOException.class.isInstance(throwable.getCause());
}
@@ -459,11 +497,12 @@
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
try {
throwing.accept(Where.BEFORE_SUBSCRIBE);
- publisher.subscribe(subscriber);
+ publisher.subscribe(new SubscriberWrapper(subscriber));
subscribedCF.complete(null);
throwing.accept(Where.AFTER_SUBSCRIBE);
} catch (Throwable t) {
subscribedCF.completeExceptionally(t);
+ throw t;
}
}
--- a/test/jdk/java/net/httpclient/ThrowingSubscribers.java Tue Feb 27 19:26:25 2018 +0000
+++ b/test/jdk/java/net/httpclient/ThrowingSubscribers.java Wed Feb 28 15:48:46 2018 +0000
@@ -151,6 +151,7 @@
out.println("Failed tests: ");
FAILURES.entrySet().forEach((e) -> {
out.printf("\t%s: %s%n", e.getKey(), e.getValue());
+ e.getValue().printStackTrace(out);
e.getValue().printStackTrace();
});
if (tasksFailed) {
--- a/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java Tue Feb 27 19:26:25 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java Wed Feb 28 15:48:46 2018 +0000
@@ -62,6 +62,7 @@
Http2Frame frame;
do {
frame = q.take();
+ if (frame == null) return null; // closed/eof before receiving data.
// ignoring others for now Wupdates handled elsewhere
if (frame.type() != DataFrame.TYPE) {
System.out.println("Ignoring " + frame.toString() + " CHECK THIS");