# HG changeset patch # User dfuchs # Date 1519832926 0 # Node ID 43d5ad61271052dafb67b120f0ca9e4255347c47 # Parent d37c08ce784a8a006d38331386cbfbcb53cf2143 http-client-branch: more ThrowingPublishers tests and fixes diff -r d37c08ce784a -r 43d5ad612710 src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Tue Feb 27 19:26:25 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Wed Feb 28 15:48:46 2018 +0000 @@ -53,9 +53,6 @@ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); - private static final System.Logger DEBUG_LOGGER = - Utils.getDebugLogger("Http1Exchange"::toString, DEBUG); - final HttpRequestImpl request; // main request final Http1Request requestAction; private volatile Http1Response response; @@ -121,12 +118,17 @@ final MinimalFuture whenSubscribed = new MinimalFuture<>(); private volatile Flow.Subscription subscription; volatile boolean complete; + private final System.Logger debug; + Http1BodySubscriber(System.Logger debug) { + assert debug != null; + this.debug = debug; + } /** Final sentinel in the stream of request body. */ static final List COMPLETED = List.of(ByteBuffer.allocate(0)); final void request(long n) { - DEBUG_LOGGER.log(Level.DEBUG, () -> + debug.log(Level.DEBUG, () -> "Http1BodySubscriber requesting " + n + ", from " + subscription); subscription.request(n); } @@ -141,11 +143,17 @@ } final void cancelSubscription() { - subscription.cancel(); + try { + subscription.cancel(); + } catch(Throwable t) { + String msg = "Ignoring exception raised when canceling BodyPublisher subscription"; + debug.log(Level.DEBUG, "%s: %s", msg, t); + Log.logError("{0}: {1}", msg, (Object)t); + } } - static Http1BodySubscriber completeSubscriber() { - return new Http1BodySubscriber() { + static Http1BodySubscriber completeSubscriber(System.Logger debug) { + return new Http1BodySubscriber(debug) { @Override public void onSubscribe(Flow.Subscription subscription) { error(); } @Override public void onNext(ByteBuffer item) { error(); } @Override public void onError(Throwable throwable) { error(); } @@ -285,15 +293,15 @@ try { bodySubscriber = requestAction.continueRequest(); if (bodySubscriber == null) { - bodySubscriber = Http1BodySubscriber.completeSubscriber(); + bodySubscriber = Http1BodySubscriber.completeSubscriber(debug); appendToOutgoing(Http1BodySubscriber.COMPLETED); } else { // start - bodySubscriber.whenSubscribed.thenAccept( - (s) -> bodySubscriber.request(1)); + bodySubscriber.whenSubscribed + .thenAccept((s) -> requestMoreBody()); } } catch (Throwable t) { - connection.close(); + cancelImpl(t); bodySentCF.completeExceptionally(t); } return bodySentCF; @@ -385,9 +393,11 @@ private void cancelImpl(Throwable cause) { LinkedList> toComplete = null; int count = 0; + Throwable error; synchronized (lock) { - if (failed == null) - failed = cause; + if ((error = failed) == null) { + failed = error = cause; + } if (requestAction != null && requestAction.finished() && response != null && response.finished()) { return; @@ -426,12 +436,13 @@ Executor exec = client.isSelectorThread() ? executor : this::runInline; + Throwable x = error; while (!toComplete.isEmpty()) { CompletableFuture cf = toComplete.poll(); exec.execute(() -> { - if (cf.completeExceptionally(cause)) { + if (cf.completeExceptionally(x)) { debug.log(Level.DEBUG, "completed cf with %s", - (Object) cause); + (Object) x); } }); } @@ -479,6 +490,17 @@ return !outgoing.isEmpty(); } + private void requestMoreBody() { + try { + debug.log(Level.DEBUG, "requesting more body from the subscriber"); + bodySubscriber.request(1); + } catch (Throwable t) { + debug.log(Level.DEBUG, "Subscription::request failed", t); + cancelImpl(t); + bodySentCF.completeExceptionally(t); + } + } + // Invoked only by the publisher // ALL tasks should execute off the Selector-Manager thread /** Returns the next portion of the HTTP request, or the error. */ @@ -513,8 +535,7 @@ debug.log(Level.DEBUG, "initiating completion of bodySentCF"); bodySentCF.completeAsync(() -> this, exec); } else { - debug.log(Level.DEBUG, "requesting more body from the subscriber"); - exec.execute(() -> bodySubscriber.request(1)); + exec.execute(this::requestMoreBody); } break; case INITIAL: diff -r d37c08ce784a -r 43d5ad612710 src/java.net.http/share/classes/jdk/internal/net/http/Http1Request.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Request.java Tue Feb 27 19:26:25 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Request.java Wed Feb 28 15:48:46 2018 +0000 @@ -244,6 +244,8 @@ final class StreamSubscriber extends Http1BodySubscriber { + StreamSubscriber() { super(debug); } + @Override public void onSubscribe(Flow.Subscription subscription) { if (isSubscribed()) { @@ -301,6 +303,7 @@ final class FixedContentSubscriber extends Http1BodySubscriber { private volatile long contentWritten; + FixedContentSubscriber() { super(debug); } @Override public void onSubscribe(Flow.Subscription subscription) { diff -r d37c08ce784a -r 43d5ad612710 test/jdk/java/net/httpclient/HttpServerAdapters.java --- a/test/jdk/java/net/httpclient/HttpServerAdapters.java Tue Feb 27 19:26:25 2018 +0000 +++ b/test/jdk/java/net/httpclient/HttpServerAdapters.java Wed Feb 28 15:48:46 2018 +0000 @@ -327,6 +327,15 @@ } } + public static boolean expectException(HttpTestExchange e) { + HttpTestHeaders h = e.getRequestHeaders(); + Optional expectException = h.firstValue("X-expect-exception"); + if (expectException.isPresent()) { + return expectException.get().equalsIgnoreCase("true"); + } + return false; + } + /** * A version agnostic adapter class for HTTP Server Filter Chains. */ @@ -351,9 +360,9 @@ try { exchange.doFilter(chain); } catch (Throwable t) { - System.out.println("WARNING: exception caught in Http1Chain::doFilter" + t); - System.err.println("WARNING: exception caught in Http1Chain::doFilter" + t); - if (PRINTSTACK) t.printStackTrace(); + System.out.println("WARNING: exception caught in Http1Chain::doFilter " + t); + System.err.println("WARNING: exception caught in Http1Chain::doFilter " + t); + if (PRINTSTACK && !expectException(exchange)) t.printStackTrace(); throw t; } } @@ -375,9 +384,9 @@ handler.handle(exchange); } } catch (Throwable t) { - System.out.println("WARNING: exception caught in Http2Chain::doFilter" + t); - System.err.println("WARNING: exception caught in Http2Chain::doFilter" + t); - if (PRINTSTACK) t.printStackTrace(); + System.out.println("WARNING: exception caught in Http2Chain::doFilter " + t); + System.err.println("WARNING: exception caught in Http2Chain::doFilter " + t); + if (PRINTSTACK && !expectException(exchange)) t.printStackTrace(); throw t; } } diff -r d37c08ce784a -r 43d5ad612710 test/jdk/java/net/httpclient/ThrowingPublishers.java --- a/test/jdk/java/net/httpclient/ThrowingPublishers.java Tue Feb 27 19:26:25 2018 +0000 +++ b/test/jdk/java/net/httpclient/ThrowingPublishers.java Wed Feb 28 15:48:46 2018 +0000 @@ -66,14 +66,17 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; @@ -154,7 +157,7 @@ out.println("Failed tests: "); FAILURES.entrySet().forEach((e) -> { out.printf("\t%s: %s%n", e.getKey(), e.getValue()); - e.getValue().printStackTrace(); + e.getValue().printStackTrace(out); }); if (tasksFailed) { System.out.println("WARNING: Some tasks failed"); @@ -263,10 +266,6 @@ System.out.println("publishing done"); publisher.close(); }, -// Stream.of(BODY.split("\\|")) -// .onClose(() -> {System.out.println("publishing done"); publisher.close();}) -// .forEachOrdered(s -> { System.out.println("submitting \"" + s +"\""); publisher -// .submit(ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)));}), executor); HttpRequest req = HttpRequest.newBuilder(URI.create(uri)) @@ -280,7 +279,7 @@ } } - // @Test(dataProvider = "variants") + @Test(dataProvider = "variants") public void testThrowingAsString(String uri, boolean sameClient, Thrower thrower) @@ -289,7 +288,7 @@ String test = format("testThrowingAsString(%s, %b, %s)", uri, sameClient, thrower); testThrowing(test, uri, sameClient, () -> BodyPublishers.ofString(BODY), - this::shouldHaveThrown, thrower,false); + this::shouldNotThrowInCancel, thrower,false); } private void testThrowing(String name, String uri, boolean sameClient, @@ -323,6 +322,7 @@ new ThrowingBodyPublisher(where.select(thrower), publishers.get()); HttpRequest req = HttpRequest. newBuilder(URI.create(uri)) + .header("X-expect-exception", "true") .POST(bodyPublisher) .build(); BodyHandler handler = BodyHandlers.ofString(); @@ -332,17 +332,17 @@ try { response = client.sendAsync(req, handler).join(); } catch (Error | Exception x) { - Throwable cause = findCause(x, thrower); - if (cause == null) throw x; + Throwable cause = findCause(where, x, thrower); + if (cause == null) throw causeNotFound(where, x); System.out.println(now() + "Got expected exception: " + cause); } } else { try { response = client.send(req, handler); } catch (Error | Exception t) { - if (thrower.test(t)) { + if (thrower.test(where, t)) { System.out.println(now() + "Got expected exception: " + t); - } else throw t; + } else throw causeNotFound(where, t); } } if (response != null) { @@ -366,7 +366,7 @@ } } - interface Thrower extends Consumer, Predicate { + interface Thrower extends Consumer, BiPredicate { } @@ -374,17 +374,41 @@ U finish(Where w, HttpResponse resp, Thrower thrower) throws IOException; } + final U shouldNotThrowInCancel(Where w, HttpResponse resp, Thrower thrower) { + switch (w) { + case BEFORE_CANCEL: return null; + case AFTER_CANCEL: return null; + default: break; + } + return shouldHaveThrown(w, resp, thrower); + } + + final U shouldHaveThrown(Where w, HttpResponse resp, Thrower thrower) { throw new RuntimeException("Expected exception not thrown in " + w); } - private static Throwable findCause(Throwable x, - Predicate filter) { - while (x != null && !filter.test(x)) x = x.getCause(); + private static Throwable findCause(Where w, + Throwable x, + BiPredicate filter) { + while (x != null && !filter.test(w,x)) x = x.getCause(); return x; } + static AssertionError causeNotFound(Where w, Throwable t) { + return new AssertionError("Expected exception not found in " + w, t); + } + + static boolean isConnectionClosedLocally(Throwable t) { + if (t instanceof CompletionException) t = t.getCause(); + if (t instanceof ExecutionException) t = t.getCause(); + if (t instanceof IOException) { + return t.getMessage().contains("connection closed locally"); + } + return false; + } + static final class UncheckedCustomExceptionThrower implements Thrower { @Override public void accept(Where where) { @@ -393,7 +417,14 @@ } @Override - public boolean test(Throwable throwable) { + public boolean test(Where w, Throwable throwable) { + switch (w) { + case AFTER_REQUEST: + if (isConnectionClosedLocally(throwable)) return true; + break; + default: + break; + } return UncheckedCustomException.class.isInstance(throwable); } @@ -411,7 +442,14 @@ } @Override - public boolean test(Throwable throwable) { + public boolean test(Where w, Throwable throwable) { + switch (w) { + case AFTER_REQUEST: + if (isConnectionClosedLocally(throwable)) return true; + break; + default: + break; + } return UncheckedIOException.class.isInstance(throwable) && CustomIOException.class.isInstance(throwable.getCause()); } @@ -459,11 +497,12 @@ public void subscribe(Flow.Subscriber subscriber) { try { throwing.accept(Where.BEFORE_SUBSCRIBE); - publisher.subscribe(subscriber); + publisher.subscribe(new SubscriberWrapper(subscriber)); subscribedCF.complete(null); throwing.accept(Where.AFTER_SUBSCRIBE); } catch (Throwable t) { subscribedCF.completeExceptionally(t); + throw t; } } diff -r d37c08ce784a -r 43d5ad612710 test/jdk/java/net/httpclient/ThrowingSubscribers.java --- a/test/jdk/java/net/httpclient/ThrowingSubscribers.java Tue Feb 27 19:26:25 2018 +0000 +++ b/test/jdk/java/net/httpclient/ThrowingSubscribers.java Wed Feb 28 15:48:46 2018 +0000 @@ -151,6 +151,7 @@ out.println("Failed tests: "); FAILURES.entrySet().forEach((e) -> { out.printf("\t%s: %s%n", e.getKey(), e.getValue()); + e.getValue().printStackTrace(out); e.getValue().printStackTrace(); }); if (tasksFailed) { diff -r d37c08ce784a -r 43d5ad612710 test/jdk/java/net/httpclient/http2/server/BodyInputStream.java --- a/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java Tue Feb 27 19:26:25 2018 +0000 +++ b/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java Wed Feb 28 15:48:46 2018 +0000 @@ -62,6 +62,7 @@ Http2Frame frame; do { frame = q.take(); + if (frame == null) return null; // closed/eof before receiving data. // ignoring others for now Wupdates handled elsewhere if (frame.type() != DataFrame.TYPE) { System.out.println("Ignoring " + frame.toString() + " CHECK THIS");