# HG changeset patch # User dfuchs # Date 1519320801 0 # Node ID 8a6065d830b9818d62d3dfc7232451d969798721 # Parent 4db4bec0e5bbe6d014353a22cc7293e1a9a80d82 http-client-branch: Add a test for BodySubscribers throwing in getBody() or returning exceptionally completed CFs diff -r 4db4bec0e5bb -r 8a6065d830b9 src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Thu Feb 22 14:58:11 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Thu Feb 22 17:33:21 2018 +0000 @@ -477,9 +477,9 @@ if (dp.throwable != null) { state = State.ERROR; exec.execute(() -> { - connection.close(); headersSentCF.completeExceptionally(dp.throwable); bodySentCF.completeExceptionally(dp.throwable); + connection.close(); }); return dp; } diff -r 4db4bec0e5bb -r 8a6065d830b9 src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Thu Feb 22 14:58:11 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Thu Feb 22 17:33:21 2018 +0000 @@ -29,9 +29,8 @@ import java.lang.System.Logger.Level; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; -import java.util.function.BiConsumer; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; import java.net.http.HttpHeaders; @@ -67,8 +66,9 @@ static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE} private volatile State readProgress = State.INITIAL; static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. - final System.Logger debug = Utils.getDebugLogger(this.getClass()::getSimpleName, DEBUG); - + final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); + final static AtomicLong responseCount = new AtomicLong(); + final long id = responseCount.incrementAndGet(); Http1Response(HttpConnection conn, Http1Exchange exchange, @@ -82,6 +82,56 @@ bodyReader = new BodyReader(this::advance); } + String dbgTag; + private String dbgString() { + String dbg = dbgTag; + if (dbg == null) { + String cdbg = connection.dbgTag; + if (cdbg != null) { + dbgTag = dbg = "Http1Response(id=" + id + ", " + cdbg + ")"; + } else { + dbg = "Http1Response(id=" + id + ")"; + } + } + return dbg; + } + + // The ClientRefCountTracker is used to track the state + // of a pending operation. Altough there usually is a single + // point where the operation starts, it may terminate at + // different places. + private final class ClientRefCountTracker { + final HttpClientImpl client = connection.client(); + // state & 0x01 != 0 => acquire called + // state & 0x02 != 0 => tryRelease called + byte state; + + public synchronized void acquire() { + if (state == 0) { + // increment the reference count on the HttpClientImpl + // to prevent the SelectorManager thread from exiting + // until our operation is complete. + debug.log(Level.DEBUG, "incrementing ref count for %s", client); + client.reference(); + state = 0x01; + } else { + assert (state & 0x01) == 0 : "reference count already incremented"; + } + } + + public synchronized void tryRelease() { + if (state == 0x01) { + // decrement the reference count on the HttpClientImpl + // to allow the SelectorManager thread to exit if no + // other operation is pending and the facade is no + // longer referenced. + debug.log(Level.DEBUG, "decrementing ref count for %s", client); + client.unreference(); + state |= 0x02; + } + } + } + public CompletableFuture readHeadersAsync(Executor executor) { debug.log(Level.DEBUG, () -> "Reading Headers: (remaining: " + asyncReceiver.remaining() +") " + readProgress); @@ -157,6 +207,7 @@ } } + public CompletableFuture readBody(HttpResponse.BodySubscriber p, boolean return2Cache, Executor executor) { @@ -173,10 +224,10 @@ // if we reach here, we must reset the headersReader state. asyncReceiver.unsubscribe(headersReader); headersReader.reset(); + ClientRefCountTracker refCountTracker = new ClientRefCountTracker(); executor.execute(() -> { try { - HttpClientImpl client = connection.client(); content = new ResponseContent( connection, clen, headers, pusher, this::onFinished @@ -189,7 +240,7 @@ // increment the reference count on the HttpClientImpl // to prevent the SelectorManager thread from exiting until // the body is fully read. - client.reference(); + refCountTracker.acquire(); bodyReader.start(content.getBodyParser( (t) -> { try { @@ -200,11 +251,6 @@ cf.completeExceptionally(t); } } finally { - // decrement the reference count on the HttpClientImpl - // to allow the SelectorManager thread to exit if no - // other operation is pending and the facade is no - // longer referenced. - client.unreference(); bodyReader.onComplete(t); } })); @@ -216,7 +262,7 @@ CompletableFuture trailingOp = bodyReaderCF.whenComplete((s,t) -> { t = Utils.getCompletionCause(t); try { - if (t != null) { + if (t == null) { debug.log(Level.DEBUG, () -> "Finished reading body: " + s); assert s == State.READING_BODY; @@ -228,6 +274,10 @@ } catch (Throwable x) { // not supposed to happen asyncReceiver.onReadError(x); + } finally { + // we're done: release the ref count for + // the current operation. + refCountTracker.tryRelease(); } }); connection.addTrailingOperation(trailingOp); @@ -243,14 +293,31 @@ } } }); - p.getBody().whenComplete((U u, Throwable t) -> { - if (t == null) - cf.complete(u); - else - cf.completeExceptionally(t); + try { + p.getBody().whenComplete((U u, Throwable t) -> { + if (t == null) + cf.complete(u); + else + cf.completeExceptionally(t); + }); + } catch (Throwable t) { + cf.completeExceptionally(t); + asyncReceiver.setRetryOnError(false); + asyncReceiver.onReadError(t); + } + + return cf.whenComplete((s,t) -> { + if (t != null) { + // If an exception occurred, release the + // ref count for the current operation, as + // it may never be triggered otherwise + // (BodySubscriber ofInputStream) + // If there was no exception then the + // ref count will be/have been released when + // the last byte of the response is/was received + refCountTracker.tryRelease(); + } }); - - return cf; } diff -r 4db4bec0e5bb -r 8a6065d830b9 src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Thu Feb 22 14:58:11 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Thu Feb 22 17:33:21 2018 +0000 @@ -520,9 +520,17 @@ void eventUpdated(AsyncEvent e) throws ClosedChannelException { if (Thread.currentThread() == this) { SelectionKey key = e.channel().keyFor(selector); - if (key != null) { + if (key != null && key.isValid()) { SelectorAttachment sa = (SelectorAttachment) key.attachment(); - if (sa != null) sa.register(e); + sa.register(e); + } else if (e.interestOps() != 0){ + // We don't care about paused events. + // These are actually handled by + // SelectorAttachment::resetInterestOps later on. + // But if we reach here when trying to resume an + // event then it's better to fail fast. + debug.log(Level.DEBUG, "No key for channel"); + e.abort(new IOException("No key for channel")); } } else { register(e); @@ -563,11 +571,13 @@ public void run() { List> errorList = new ArrayList<>(); List readyList = new ArrayList<>(); + List resetList = new ArrayList<>(); try { while (!Thread.currentThread().isInterrupted()) { synchronized (this) { assert errorList.isEmpty(); assert readyList.isEmpty(); + assert resetList.isEmpty(); for (AsyncEvent event : registrations) { if (event instanceof AsyncTriggerEvent) { readyList.add(event); @@ -673,9 +683,10 @@ owner.purgeTimeoutsAndReturnNextDeadline(); continue; } - Set keys = selector.selectedKeys(); + Set keys = selector.selectedKeys(); assert errorList.isEmpty(); + for (SelectionKey key : keys) { SelectorAttachment sa = (SelectorAttachment) key.attachment(); if (!key.isValid()) { @@ -697,17 +708,24 @@ continue; } sa.events(eventsOccurred).forEach(readyList::add); - sa.resetInterestOps(eventsOccurred); + resetList.add(() -> sa.resetInterestOps(eventsOccurred)); } + selector.selectNow(); // complete cancellation selector.selectedKeys().clear(); - for (AsyncEvent event : readyList) { - handleEvent(event, null); // will be delegated to executor - } + // handle selected events + readyList.forEach((e) -> handleEvent(e, null)); readyList.clear(); + + // handle errors (closed channels etc...) errorList.forEach((p) -> handleEvent(p.first, p.second)); errorList.clear(); + + // reset interest ops for selected channels + resetList.forEach(r -> r.run()); + resetList.clear(); + } } catch (Throwable e) { //e.printStackTrace(); @@ -746,6 +764,20 @@ } } + final String debugInterestOps(SelectableChannel channel) { + try { + SelectionKey key = channel.keyFor(selmgr.selector); + if (key == null) return "channel not registered with selector"; + String keyInterestOps = key.isValid() + ? "key.interestOps=" + key.interestOps() : "invalid key"; + return String.format("channel registered with selector, %s, sa.interestOps=%s", + keyInterestOps, + ((SelectorAttachment)key.attachment()).interestOps); + } catch (Throwable t) { + return String.valueOf(t); + } + } + /** * Tracks multiple user level registrations associated with one NIO * registration (SelectionKey). In this implementation, registrations @@ -772,6 +804,9 @@ void register(AsyncEvent e) throws ClosedChannelException { int newOps = e.interestOps(); + // re register interest if we are not already interested + // in the event. If the event is paused, then the pause will + // be taken into account later when resetInterestOps is called. boolean reRegister = (interestOps & newOps) != newOps; interestOps |= newOps; pending.add(e); @@ -779,9 +814,11 @@ // first time registration happens here also try { chan.register(selector, interestOps, this); - } catch (CancelledKeyException x) { + } catch (Throwable x) { abortPending(x); } + } else if (!chan.isOpen()) { + abortPending(new ClosedChannelException()); } } @@ -818,11 +855,20 @@ this.interestOps = newOps; SelectionKey key = chan.keyFor(selector); - if (newOps == 0 && pending.isEmpty()) { + if (newOps == 0 && key != null && pending.isEmpty()) { key.cancel(); } else { try { + if (key == null || !key.isValid()) { + throw new CancelledKeyException(); + } key.interestOps(newOps); + // double check after + if (!chan.isOpen()) { + abortPending(new ClosedChannelException()); + return; + } + assert key.interestOps() == newOps; } catch (CancelledKeyException x) { // channel may have been closed debug.log(Level.DEBUG, "key cancelled for " + chan); diff -r 4db4bec0e5bb -r 8a6065d830b9 src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java Thu Feb 22 14:58:11 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java Thu Feb 22 17:33:21 2018 +0000 @@ -179,13 +179,10 @@ HttpClientImpl client, HttpRequestImpl request, Version version) { + // The default proxy selector may select a proxy whose address is + // unresolved. We must resolve the address before connecting to it. + InetSocketAddress proxy = Utils.resolveAddress(request.proxy()); HttpConnection c = null; - InetSocketAddress proxy = request.proxy(); - if (proxy != null && proxy.isUnresolved()) { - // The default proxy selector may select a proxy whose address is - // unresolved. We must resolve the address before connecting to it. - proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort()); - } boolean secure = request.secure(); ConnectionPool pool = client.connectionPool(); diff -r 4db4bec0e5bb -r 8a6065d830b9 src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Thu Feb 22 14:58:11 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Thu Feb 22 17:33:21 2018 +0000 @@ -51,7 +51,7 @@ private final Object reading = new Object(); protected final SocketChannel chan; - private final FlowTube tube; + private final SocketTube tube; // need SocketTube to call signalClosed(). private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading); private volatile boolean connected; private boolean closed; @@ -84,7 +84,8 @@ boolean finished = chan.finishConnect(); assert finished : "Expected channel to be connected"; debug.log(Level.DEBUG, - "ConnectEvent: connect finished: %s Local addr: %s", finished, chan.getLocalAddress()); + "ConnectEvent: connect finished: %s Local addr: %s", + finished, chan.getLocalAddress()); connected = true; // complete async since the event runs on the SelectorManager thread cf.completeAsync(() -> null, client().theExecutor()); @@ -107,7 +108,8 @@ assert !connected : "Already connected"; assert !chan.isBlocking() : "Unexpected blocking channel"; boolean finished = false; - PrivilegedExceptionAction pa = () -> chan.connect(address); + PrivilegedExceptionAction pa = + () -> chan.connect(Utils.resolveAddress(address)); try { finished = AccessController.doPrivileged(pa); } catch (PrivilegedActionException e) { @@ -179,14 +181,19 @@ * Closes this connection */ @Override - public synchronized void close() { - if (closed) { - return; + public void close() { + synchronized (this) { + if (closed) { + return; + } + closed = true; } - closed = true; try { Log.logTrace("Closing: " + toString()); + debug.log(Level.DEBUG, () -> "Closing channel: " + + client().debugInterestOps(chan)); chan.close(); + tube.signalClosed(); } catch (IOException e) { Log.logTrace("Closing resulted in " + e); } diff -r 4db4bec0e5bb -r 8a6065d830b9 src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Thu Feb 22 14:58:11 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Thu Feb 22 17:33:21 2018 +0000 @@ -154,6 +154,14 @@ // Events // // ======================================================================// + void signalClosed() { + // Ensure that the subscriber will be terminated + // and that future subscribers will be notified + // when the connection is closed. + readPublisher.subscriptionImpl.signalError( + new IOException("connection closed locally")); + } + /** * A restartable task used to process tasks in sequence. */ @@ -498,6 +506,7 @@ } void signalError(Throwable error) { + debug.log(Level.DEBUG, () -> "error signalled " + error); if (!errorRef.compareAndSet(null, error)) { return; } diff -r 4db4bec0e5bb -r 8a6065d830b9 src/java.net.http/share/classes/jdk/internal/net/http/Stream.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Thu Feb 22 14:58:11 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Thu Feb 22 17:33:21 2018 +0000 @@ -230,16 +230,21 @@ boolean returnConnectionToPool, Executor executor) { - Log.logTrace("Reading body on stream {0}", streamid); - BodySubscriber bodySubscriber = handler.apply(responseCode, responseHeaders); - CompletableFuture cf = receiveData(bodySubscriber, executor); + try { + Log.logTrace("Reading body on stream {0}", streamid); + BodySubscriber bodySubscriber = handler.apply(responseCode, responseHeaders); + CompletableFuture cf = receiveData(bodySubscriber, executor); - PushGroup pg = exchange.getPushGroup(); - if (pg != null) { - // if an error occurs make sure it is recorded in the PushGroup - cf = cf.whenComplete((t,e) -> pg.pushError(e)); + PushGroup pg = exchange.getPushGroup(); + if (pg != null) { + // if an error occurs make sure it is recorded in the PushGroup + cf = cf.whenComplete((t, e) -> pg.pushError(e)); + } + return cf; + } catch (Throwable t) { + // may be thrown by handler.apply + return MinimalFuture.failedFuture(t); } - return cf; } @Override @@ -268,12 +273,16 @@ // We want to allow the subscriber's getBody() method to block so it // can work with InputStreams. So, we offload execution. executor.execute(() -> { - bodySubscriber.getBody().whenComplete((T body, Throwable t) -> { - if (t == null) - responseBodyCF.complete(body); - else - responseBodyCF.completeExceptionally(t); - }); + try { + bodySubscriber.getBody().whenComplete((T body, Throwable t) -> { + if (t == null) + responseBodyCF.complete(body); + else + responseBodyCF.completeExceptionally(t); + }); + } catch(Throwable t) { + cancelImpl(t); + } }); if (isCanceled()) { @@ -281,11 +290,11 @@ responseBodyCF.completeExceptionally(t); } else { bodySubscriber.onSubscribe(userSubscription); + // Set the responseSubscriber field now that onSubscribe has been called. + // This effectively allows the scheduler to start invoking the callbacks. + responseSubscriber = bodySubscriber; + sched.runOrSchedule(); // in case data waiting already to be processed } - // Set the responseSubscriber field now that onSubscribe has been called. - // This effectively allows the scheduler to start invoking the callbacks. - responseSubscriber = bodySubscriber; - sched.runOrSchedule(); // in case data waiting already to be processed return responseBodyCF; } diff -r 4db4bec0e5bb -r 8a6065d830b9 src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Thu Feb 22 14:58:11 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Thu Feb 22 17:33:21 2018 +0000 @@ -805,4 +805,13 @@ public static boolean isHostnameVerificationDisabled() { return isHostnameVerificationDisabled; } + + public static InetSocketAddress resolveAddress(InetSocketAddress address) { + if (address != null && address.isUnresolved()) { + // The default proxy selector may select a proxy whose address is + // unresolved. We must resolve the address before connecting to it. + address = new InetSocketAddress(address.getHostString(), address.getPort()); + } + return address; + } } diff -r 4db4bec0e5bb -r 8a6065d830b9 test/jdk/java/net/httpclient/DigestEchoClient.java --- a/test/jdk/java/net/httpclient/DigestEchoClient.java Thu Feb 22 14:58:11 2018 +0000 +++ b/test/jdk/java/net/httpclient/DigestEchoClient.java Thu Feb 22 17:33:21 2018 +0000 @@ -501,10 +501,11 @@ boolean async, boolean expectContinue) throws Exception { - out.println(format("*** testDigest: client: %s, server: %s, async: %s, useSSL: %s, " + - "authScheme: %s, authType: %s, expectContinue: %s ***", - clientVersion, serverVersion, async, useSSL, - authScheme, authType, expectContinue)); + String test = format("testDigest: client: %s, server: %s, async: %s, useSSL: %s, " + + "authScheme: %s, authType: %s, expectContinue: %s", + clientVersion, serverVersion, async, useSSL, + authScheme, authType, expectContinue); + out.println("*** " + test + " ***"); DigestEchoServer server = EchoServers.of(serverVersion, useSSL ? "https" : "http", authType, authScheme); @@ -554,7 +555,8 @@ resp = client.send(request, asLines()); } System.out.println(resp); - assert challenge != null || resp.statusCode() == 401 || resp.statusCode() == 407; + assert challenge != null || resp.statusCode() == 401 || resp.statusCode() == 407 + : "challenge=" + challenge + ", resp=" + resp + ", test=[" + test + "]"; if (resp.statusCode() == 401 || resp.statusCode() == 407) { // This assert may need to be relaxed if our server happened to // decide to close the tunnel connection, in which case we would diff -r 4db4bec0e5bb -r 8a6065d830b9 test/jdk/java/net/httpclient/FlowAdapterPublisherTest.java --- a/test/jdk/java/net/httpclient/FlowAdapterPublisherTest.java Thu Feb 22 14:58:11 2018 +0000 +++ b/test/jdk/java/net/httpclient/FlowAdapterPublisherTest.java Thu Feb 22 17:33:21 2018 +0000 @@ -192,8 +192,7 @@ HttpResponse response = client.send(request, asString(UTF_8)); fail("Unexpected response: " + response); } catch (IOException expected) { - assertTrue(expected.getMessage().contains("Too few bytes returned"), - "Exception message:[" + expected.toString() + "]"); + assertMessage(expected, "Too few bytes returned"); } } @@ -210,8 +209,14 @@ HttpResponse response = client.send(request, asString(UTF_8)); fail("Unexpected response: " + response); } catch (IOException expected) { - assertTrue(expected.getMessage().contains("Too many bytes in request body"), - "Exception message:[" + expected.toString() + "]"); + assertMessage(expected, "Too many bytes in request body"); + } + } + + private void assertMessage(Throwable t, String contains) { + if (!t.getMessage().contains(contains)) { + String error = "Exception message:[" + t.toString() + "] doesn't contain [" + contains + "]"; + throw new AssertionError(error, t); } } diff -r 4db4bec0e5bb -r 8a6065d830b9 test/jdk/java/net/httpclient/HttpServerAdapters.java --- a/test/jdk/java/net/httpclient/HttpServerAdapters.java Thu Feb 22 14:58:11 2018 +0000 +++ b/test/jdk/java/net/httpclient/HttpServerAdapters.java Thu Feb 22 17:33:21 2018 +0000 @@ -68,6 +68,9 @@ */ public interface HttpServerAdapters { + static final boolean PRINTSTACK = + Boolean.getBoolean("jdk.internal.httpclient.debug"); + static void uncheckedWrite(ByteArrayOutputStream baos, byte[] ba) { try { baos.write(ba); @@ -231,6 +234,7 @@ return this.getClass().getSimpleName() + ": " + exchange.toString(); } } + private static final class Http2TestExchangeImpl extends HttpTestExchange { private final Http2TestExchange exchange; Http2TestExchangeImpl(Http2TestExchange exch) { @@ -287,13 +291,24 @@ void handle(HttpTestExchange t) throws IOException; default HttpHandler toHttpHandler() { - return (t) -> handle(HttpTestExchange.of(t)); + return (t) -> doHandle(HttpTestExchange.of(t)); } default Http2Handler toHttp2Handler() { - return (t) -> handle(HttpTestExchange.of(t)); + return (t) -> doHandle(HttpTestExchange.of(t)); + } + private void doHandle(HttpTestExchange t) throws IOException { + try { + handle(t); + } catch (Throwable x) { + System.out.println("WARNING: exception caught in HttpTestHandler::handle " + x); + System.err.println("WARNING: exception caught in HttpTestHandler::handle " + x); + if (PRINTSTACK) x.printStackTrace(System.out); + throw x; + } } } + public static class HttpTestEchoHandler implements HttpTestHandler { @Override public void handle(HttpTestExchange t) throws IOException { @@ -332,8 +347,15 @@ this.chain = chain; } @Override - public void doFilter(HttpTestExchange exchange) throws IOException{ - exchange.doFilter(chain); + public void doFilter(HttpTestExchange exchange) throws IOException { + try { + exchange.doFilter(chain); + } catch (Throwable t) { + System.out.println("WARNING: exception caught in Http1Chain::doFilter" + t); + System.err.println("WARNING: exception caught in Http1Chain::doFilter" + t); + if (PRINTSTACK) t.printStackTrace(); + throw t; + } } } @@ -346,10 +368,17 @@ } @Override public void doFilter(HttpTestExchange exchange) throws IOException { - if (iter.hasNext()) { - iter.next().doFilter(exchange, this); - } else { - handler.handle(exchange); + try { + if (iter.hasNext()) { + iter.next().doFilter(exchange, this); + } else { + handler.handle(exchange); + } + } catch (Throwable t) { + System.out.println("WARNING: exception caught in Http2Chain::doFilter" + t); + System.err.println("WARNING: exception caught in Http2Chain::doFilter" + t); + if (PRINTSTACK) t.printStackTrace(); + throw t; } } } @@ -420,6 +449,8 @@ public void stop() { impl.stop(0); } @Override public HttpTestContext addHandler(HttpTestHandler handler, String path) { + System.out.println("Http1TestServer[" + getAddress() + + "]::addHandler " + handler + ", " + path); return new Http1TestContext(impl.createContext(path, handler.toHttpHandler())); } @Override @@ -466,7 +497,8 @@ } @Override public HttpTestContext addHandler(HttpTestHandler handler, String path) { - System.out.println("Http2TestServerImpl::addHandler " + handler + ", " + path); + System.out.println("Http2TestServerImpl[" + getAddress() + + "]::addHandler " + handler + ", " + path); Http2TestContext context = new Http2TestContext(handler, path); impl.addHandler(context.toHttp2Handler(), path); return context; diff -r 4db4bec0e5bb -r 8a6065d830b9 test/jdk/java/net/httpclient/ShortRequestBody.java --- a/test/jdk/java/net/httpclient/ShortRequestBody.java Thu Feb 22 14:58:11 2018 +0000 +++ b/test/jdk/java/net/httpclient/ShortRequestBody.java Thu Feb 22 17:33:21 2018 +0000 @@ -168,7 +168,8 @@ HttpResponse resp = cf.get(30, TimeUnit.SECONDS); err.println("Response code: " + resp.statusCode()); - check(resp.statusCode() == 200, "Expected 200, got ", resp.statusCode()); + check(resp.statusCode() == 200, null, + "Expected 200, got ", resp.statusCode()); } static void failureNonBlocking(Supplier clientSupplier, @@ -190,11 +191,11 @@ } catch (ExecutionException expected) { err.println("Caught expected: " + expected); Throwable t = expected.getCause(); - check(t instanceof IOException, - "Expected cause IOException, but got: ", expected.getCause()); + check(t instanceof IOException, t, + "Expected cause IOException, but got: ", t); String msg = t.getMessage(); check(msg.contains("Too many") || msg.contains("Too few"), - "Expected Too many|Too few, got: ", t); + t, "Expected Too many|Too few, got: ", t); } } @@ -215,7 +216,7 @@ err.println("Caught expected: " + expected); String msg = expected.getMessage(); check(msg.contains("Too many") || msg.contains("Too few"), - "Expected Too many|Too few, got: ", expected); + expected,"Expected Too many|Too few, got: ", expected); } } @@ -318,13 +319,13 @@ catch (IOException x) { throw new UncheckedIOException(x); } } - static boolean check(boolean cond, Object... failedArgs) { + static boolean check(boolean cond, Throwable t, Object... failedArgs) { if (cond) return true; // We are going to fail... StringBuilder sb = new StringBuilder(); for (Object o : failedArgs) sb.append(o); - throw new RuntimeException(sb.toString()); + throw new RuntimeException(sb.toString(), t); } } diff -r 4db4bec0e5bb -r 8a6065d830b9 test/jdk/java/net/httpclient/ThrowingSubscribers.java --- a/test/jdk/java/net/httpclient/ThrowingSubscribers.java Thu Feb 22 14:58:11 2018 +0000 +++ b/test/jdk/java/net/httpclient/ThrowingSubscribers.java Thu Feb 22 17:33:21 2018 +0000 @@ -31,7 +31,7 @@ * java.net.http/jdk.internal.net.http.common * java.net.http/jdk.internal.net.http.frame * java.net.http/jdk.internal.net.http.hpack - * @run testng/othervm ThrowingSubscribers + * @run testng/othervm -Djdk.internal.httpclient.debug=true ThrowingSubscribers */ import com.sun.net.httpserver.HttpServer; @@ -62,6 +62,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -97,11 +98,23 @@ String https2URI_fixed; String https2URI_chunk; - static final int ITERATION_COUNT = 2; + static final int ITERATION_COUNT = 1; // a shared executor helps reduce the amount of threads created by the test static final Executor executor = new TestExecutor(Executors.newCachedThreadPool()); static final ConcurrentMap FAILURES = new ConcurrentHashMap<>(); static volatile boolean tasksFailed; + static final AtomicLong serverCount = new AtomicLong(); + static final AtomicLong clientCount = new AtomicLong(); + static final long start = System.nanoTime(); + public static String now() { + long now = System.nanoTime() - start; + long secs = now / 1000_000_000; + long mill = (now % 1000_000_000) / 1000_000; + long nan = now % 1000_000; + return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan); + } + + private volatile HttpClient sharedClient; static class TestExecutor implements Executor { final AtomicLong tasks = new AtomicLong(); @@ -118,8 +131,8 @@ command.run(); } catch (Throwable t) { tasksFailed = true; - System.out.printf("Task %s failed: %s%n", id, t); - System.err.printf("Task %s failed: %s%n", id, t); + System.out.printf(now() + "Task %s failed: %s%n", id, t); + System.err.printf(now() + "Task %s failed: %s%n", id, t); FAILURES.putIfAbsent("Task " + id, t); throw t; } @@ -129,14 +142,21 @@ @AfterClass static final void printFailedTests() { - if (FAILURES.isEmpty()) return; - out.println("Failed tests: "); - FAILURES.entrySet().forEach((e) -> { + out.println("\n========================="); + try { + out.printf("%n%sCreated %d servers and %d clients%n", + now(), serverCount.get(), clientCount.get()); + if (FAILURES.isEmpty()) return; + out.println("Failed tests: "); + FAILURES.entrySet().forEach((e) -> { out.printf("\t%s: %s%n", e.getKey(), e.getValue()); e.getValue().printStackTrace(); - }); - if (tasksFailed) { - throw new RuntimeException("Some tasks failed"); + }); + if (tasksFailed) { + System.out.println("WARNING: Some tasks failed"); + } + } finally { + out.println("\n=========================\n"); } } @@ -157,13 +177,16 @@ public Object[][] noThrows() { String[] uris = uris(); Object[][] result = new Object[uris.length * 2][]; + //Object[][] result = new Object[uris.length][]; int i = 0; for (boolean sameClient : List.of(false, true)) { + //if (!sameClient) continue; for (String uri: uris()) { result[i++] = new Object[] {uri, sameClient}; } } assert i == uris.length * 2; + // assert i == uris.length ; return result; } @@ -171,34 +194,53 @@ public Object[][] variants() { String[] uris = uris(); Object[][] result = new Object[uris.length * 2 * 2][]; + //Object[][] result = new Object[(uris.length/2) * 2 * 2][]; int i = 0; for (Thrower thrower : List.of( - new UncheckedCustomExceptionThrower(), - new UncheckedIOExceptionThrower())) { + new UncheckedIOExceptionThrower(), + new UncheckedCustomExceptionThrower())) { for (boolean sameClient : List.of(false, true)) { for (String uri : uris()) { + // if (uri.contains("http2") || uri.contains("https2")) continue; + // if (!sameClient) continue; result[i++] = new Object[]{uri, sameClient, thrower}; } } } assert i == uris.length * 2 * 2; + //assert Stream.of(result).filter(o -> o != null).count() == result.length; return result; } - HttpClient newHttpClient() { + private HttpClient makeNewClient() { + clientCount.incrementAndGet(); return HttpClient.newBuilder() - .executor(executor) - .sslContext(sslContext) - .build(); + .executor(executor) + .sslContext(sslContext) + .build(); + } + + HttpClient newHttpClient(boolean share) { + if (!share) return makeNewClient(); + HttpClient shared = sharedClient; + if (shared != null) return shared; + synchronized (this) { + shared = sharedClient; + if (shared == null) { + shared = sharedClient = makeNewClient(); + } + return shared; + } } @Test(dataProvider = "noThrows") public void testNoThrows(String uri, boolean sameClient) throws Exception { HttpClient client = null; + out.printf("%ntestNoThrows(%s, %b)%n", uri, sameClient); for (int i=0; i< ITERATION_COUNT; i++) { if (!sameClient || client == null) - client = newHttpClient(); + client = newHttpClient(sameClient); HttpRequest req = HttpRequest.newBuilder(URI.create(uri)) .build(); @@ -255,7 +297,7 @@ { String test = format("testThrowingAsStringAsync(%s, %b, %s)", uri, sameClient, thrower); - testThrowing(uri, sameClient, BodyHandler::asString, + testThrowing(test, uri, sameClient, BodyHandler::asString, this::shouldHaveThrown, thrower, true); } @@ -288,7 +330,7 @@ Finisher finisher, Thrower thrower, boolean async) throws Exception { - out.printf("%n%s%n", name); + out.printf("%n%s%s%n", now(), name); try { testThrowing(uri, sameClient, handlers, finisher, thrower, async); } catch (Error | Exception x) { @@ -307,9 +349,8 @@ for (Where where : Where.values()) { if (where == Where.ON_SUBSCRIBE) continue; if (where == Where.ON_ERROR) continue; - if (where == Where.GET_BODY) continue; // doesn't work with HTTP/2 if (!sameClient || client == null) - client = newHttpClient(); + client = newHttpClient(sameClient); HttpRequest req = HttpRequest. newBuilder(URI.create(uri)) @@ -324,14 +365,14 @@ } catch (Error | Exception x) { Throwable cause = findCause(x, thrower); if (cause == null) throw x; - System.out.println("Got expected exception: " + cause); + System.out.println(now() + "Got expected exception: " + cause); } } else { try { response = client.send(req, handler); } catch (Error | Exception t) { if (thrower.test(t)) { - System.out.println("Got expected exception: " + t); + System.out.println(now() + "Got expected exception: " + t); } else throw t; } } @@ -341,7 +382,8 @@ } } - enum Where {BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY; + enum Where { + BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY, BODY_CF; public Consumer select(Consumer consumer) { return new Consumer() { @Override @@ -362,15 +404,16 @@ U finish(Where w, HttpResponse resp, Thrower thrower) throws IOException; } - U shouldHaveThrown(Where w, HttpResponse resp, Thrower thrower) { + final U shouldHaveThrown(Where w, HttpResponse resp, Thrower thrower) { throw new RuntimeException("Expected exception not thrown in " + w); } - List checkAsLines(Where w, HttpResponse> resp, Thrower thrower) { + final List checkAsLines(Where w, HttpResponse> resp, Thrower thrower) { switch(w) { case BODY_HANDLER: return shouldHaveThrown(w, resp, thrower); case ON_SUBSCRIBE: return shouldHaveThrown(w, resp, thrower); case GET_BODY: return shouldHaveThrown(w, resp, thrower); + case BODY_CF: return shouldHaveThrown(w, resp, thrower); default: break; } List result = null; @@ -379,7 +422,7 @@ } catch (Error | Exception x) { Throwable cause = findCause(x, thrower); if (cause != null) { - out.println("Got expected exception in " + w + ": " + cause); + out.println(now() + "Got expected exception in " + w + ": " + cause); return result; } throw x; @@ -387,7 +430,7 @@ throw new RuntimeException("Expected exception not thrown in " + w); } - List checkAsInputStream(Where w, HttpResponse resp, + final List checkAsInputStream(Where w, HttpResponse resp, Thrower thrower) throws IOException { @@ -395,6 +438,7 @@ case BODY_HANDLER: return shouldHaveThrown(w, resp, thrower); case ON_SUBSCRIBE: return shouldHaveThrown(w, resp, thrower); case GET_BODY: return shouldHaveThrown(w, resp, thrower); + case BODY_CF: return shouldHaveThrown(w, resp, thrower); default: break; } List result = null; @@ -403,10 +447,9 @@ try { result = r.lines().collect(Collectors.toList()); } catch (Error | Exception x) { - Throwable cause = - findCause(x, thrower); + Throwable cause = findCause(x, thrower); if (cause != null) { - out.println("Got expected exception in " + w + ": " + x); + out.println(now() + "Got expected exception in " + w + ": " + cause); return result; } throw x; @@ -421,9 +464,10 @@ return x; } - static class UncheckedCustomExceptionThrower implements Thrower { + static final class UncheckedCustomExceptionThrower implements Thrower { @Override public void accept(Where where) { + out.println(now() + "Throwing in " + where); throw new UncheckedCustomException(where.name()); } @@ -438,9 +482,10 @@ } } - static class UncheckedIOExceptionThrower implements Thrower { + static final class UncheckedIOExceptionThrower implements Thrower { @Override public void accept(Where where) { + out.println(now() + "Throwing in " + where); throw new UncheckedIOException(new CustomIOException(where.name())); } @@ -456,7 +501,7 @@ } } - static class UncheckedCustomException extends RuntimeException { + static final class UncheckedCustomException extends RuntimeException { UncheckedCustomException(String message) { super(message); } @@ -465,7 +510,7 @@ } } - static class CustomIOException extends IOException { + static final class CustomIOException extends IOException { CustomIOException(String message) { super(message); } @@ -474,7 +519,7 @@ } } - static class ThrowingBodyHandler implements BodyHandler { + static final class ThrowingBodyHandler implements BodyHandler { final Consumer throwing; final BodyHandler bodyHandler; ThrowingBodyHandler(Consumer throwing, BodyHandler bodyHandler) { @@ -489,7 +534,7 @@ } } - static class ThrowingBodySubscriber implements BodySubscriber { + static final class ThrowingBodySubscriber implements BodySubscriber { private final BodySubscriber subscriber; volatile boolean onSubscribeCalled; final Consumer throwing; @@ -533,6 +578,11 @@ @Override public CompletionStage getBody() { throwing.accept(Where.GET_BODY); + try { + throwing.accept(Where.BODY_CF); + } catch (Throwable t) { + return CompletableFuture.failedFuture(t); + } return subscriber.getBody(); } } @@ -580,6 +630,7 @@ https2URI_fixed = "https://127.0.0.1:" + port + "/https2/fixed/x"; https2URI_chunk = "https://127.0.0.1:" + port + "/https2/chunk/x"; + serverCount.addAndGet(4); httpTestServer.start(); httpsTestServer.start(); http2TestServer.start(); @@ -588,6 +639,7 @@ @AfterTest public void teardown() throws Exception { + sharedClient = null; httpTestServer.stop(); httpsTestServer.stop(); http2TestServer.stop();