http-client-branch: Add a test for BodySubscribers throwing in getBody() or returning exceptionally completed CFs
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Thu Feb 22 17:33:21 2018 +0000
@@ -477,9 +477,9 @@
if (dp.throwable != null) {
state = State.ERROR;
exec.execute(() -> {
- connection.close();
headersSentCF.completeExceptionally(dp.throwable);
bodySentCF.completeExceptionally(dp.throwable);
+ connection.close();
});
return dp;
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Thu Feb 22 17:33:21 2018 +0000
@@ -29,9 +29,8 @@
import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
-import java.util.function.BiConsumer;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.net.http.HttpHeaders;
@@ -67,8 +66,9 @@
static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
private volatile State readProgress = State.INITIAL;
static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
- final System.Logger debug = Utils.getDebugLogger(this.getClass()::getSimpleName, DEBUG);
-
+ final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+ final static AtomicLong responseCount = new AtomicLong();
+ final long id = responseCount.incrementAndGet();
Http1Response(HttpConnection conn,
Http1Exchange<T> exchange,
@@ -82,6 +82,56 @@
bodyReader = new BodyReader(this::advance);
}
+ String dbgTag;
+ private String dbgString() {
+ String dbg = dbgTag;
+ if (dbg == null) {
+ String cdbg = connection.dbgTag;
+ if (cdbg != null) {
+ dbgTag = dbg = "Http1Response(id=" + id + ", " + cdbg + ")";
+ } else {
+ dbg = "Http1Response(id=" + id + ")";
+ }
+ }
+ return dbg;
+ }
+
+ // The ClientRefCountTracker is used to track the state
+ // of a pending operation. Altough there usually is a single
+ // point where the operation starts, it may terminate at
+ // different places.
+ private final class ClientRefCountTracker {
+ final HttpClientImpl client = connection.client();
+ // state & 0x01 != 0 => acquire called
+ // state & 0x02 != 0 => tryRelease called
+ byte state;
+
+ public synchronized void acquire() {
+ if (state == 0) {
+ // increment the reference count on the HttpClientImpl
+ // to prevent the SelectorManager thread from exiting
+ // until our operation is complete.
+ debug.log(Level.DEBUG, "incrementing ref count for %s", client);
+ client.reference();
+ state = 0x01;
+ } else {
+ assert (state & 0x01) == 0 : "reference count already incremented";
+ }
+ }
+
+ public synchronized void tryRelease() {
+ if (state == 0x01) {
+ // decrement the reference count on the HttpClientImpl
+ // to allow the SelectorManager thread to exit if no
+ // other operation is pending and the facade is no
+ // longer referenced.
+ debug.log(Level.DEBUG, "decrementing ref count for %s", client);
+ client.unreference();
+ state |= 0x02;
+ }
+ }
+ }
+
public CompletableFuture<Response> readHeadersAsync(Executor executor) {
debug.log(Level.DEBUG, () -> "Reading Headers: (remaining: "
+ asyncReceiver.remaining() +") " + readProgress);
@@ -157,6 +207,7 @@
}
}
+
public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
boolean return2Cache,
Executor executor) {
@@ -173,10 +224,10 @@
// if we reach here, we must reset the headersReader state.
asyncReceiver.unsubscribe(headersReader);
headersReader.reset();
+ ClientRefCountTracker refCountTracker = new ClientRefCountTracker();
executor.execute(() -> {
try {
- HttpClientImpl client = connection.client();
content = new ResponseContent(
connection, clen, headers, pusher,
this::onFinished
@@ -189,7 +240,7 @@
// increment the reference count on the HttpClientImpl
// to prevent the SelectorManager thread from exiting until
// the body is fully read.
- client.reference();
+ refCountTracker.acquire();
bodyReader.start(content.getBodyParser(
(t) -> {
try {
@@ -200,11 +251,6 @@
cf.completeExceptionally(t);
}
} finally {
- // decrement the reference count on the HttpClientImpl
- // to allow the SelectorManager thread to exit if no
- // other operation is pending and the facade is no
- // longer referenced.
- client.unreference();
bodyReader.onComplete(t);
}
}));
@@ -216,7 +262,7 @@
CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) -> {
t = Utils.getCompletionCause(t);
try {
- if (t != null) {
+ if (t == null) {
debug.log(Level.DEBUG, () ->
"Finished reading body: " + s);
assert s == State.READING_BODY;
@@ -228,6 +274,10 @@
} catch (Throwable x) {
// not supposed to happen
asyncReceiver.onReadError(x);
+ } finally {
+ // we're done: release the ref count for
+ // the current operation.
+ refCountTracker.tryRelease();
}
});
connection.addTrailingOperation(trailingOp);
@@ -243,14 +293,31 @@
}
}
});
- p.getBody().whenComplete((U u, Throwable t) -> {
- if (t == null)
- cf.complete(u);
- else
- cf.completeExceptionally(t);
+ try {
+ p.getBody().whenComplete((U u, Throwable t) -> {
+ if (t == null)
+ cf.complete(u);
+ else
+ cf.completeExceptionally(t);
+ });
+ } catch (Throwable t) {
+ cf.completeExceptionally(t);
+ asyncReceiver.setRetryOnError(false);
+ asyncReceiver.onReadError(t);
+ }
+
+ return cf.whenComplete((s,t) -> {
+ if (t != null) {
+ // If an exception occurred, release the
+ // ref count for the current operation, as
+ // it may never be triggered otherwise
+ // (BodySubscriber ofInputStream)
+ // If there was no exception then the
+ // ref count will be/have been released when
+ // the last byte of the response is/was received
+ refCountTracker.tryRelease();
+ }
});
-
- return cf;
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Thu Feb 22 17:33:21 2018 +0000
@@ -520,9 +520,17 @@
void eventUpdated(AsyncEvent e) throws ClosedChannelException {
if (Thread.currentThread() == this) {
SelectionKey key = e.channel().keyFor(selector);
- if (key != null) {
+ if (key != null && key.isValid()) {
SelectorAttachment sa = (SelectorAttachment) key.attachment();
- if (sa != null) sa.register(e);
+ sa.register(e);
+ } else if (e.interestOps() != 0){
+ // We don't care about paused events.
+ // These are actually handled by
+ // SelectorAttachment::resetInterestOps later on.
+ // But if we reach here when trying to resume an
+ // event then it's better to fail fast.
+ debug.log(Level.DEBUG, "No key for channel");
+ e.abort(new IOException("No key for channel"));
}
} else {
register(e);
@@ -563,11 +571,13 @@
public void run() {
List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
List<AsyncEvent> readyList = new ArrayList<>();
+ List<Runnable> resetList = new ArrayList<>();
try {
while (!Thread.currentThread().isInterrupted()) {
synchronized (this) {
assert errorList.isEmpty();
assert readyList.isEmpty();
+ assert resetList.isEmpty();
for (AsyncEvent event : registrations) {
if (event instanceof AsyncTriggerEvent) {
readyList.add(event);
@@ -673,9 +683,10 @@
owner.purgeTimeoutsAndReturnNextDeadline();
continue;
}
- Set<SelectionKey> keys = selector.selectedKeys();
+ Set<SelectionKey> keys = selector.selectedKeys();
assert errorList.isEmpty();
+
for (SelectionKey key : keys) {
SelectorAttachment sa = (SelectorAttachment) key.attachment();
if (!key.isValid()) {
@@ -697,17 +708,24 @@
continue;
}
sa.events(eventsOccurred).forEach(readyList::add);
- sa.resetInterestOps(eventsOccurred);
+ resetList.add(() -> sa.resetInterestOps(eventsOccurred));
}
+
selector.selectNow(); // complete cancellation
selector.selectedKeys().clear();
- for (AsyncEvent event : readyList) {
- handleEvent(event, null); // will be delegated to executor
- }
+ // handle selected events
+ readyList.forEach((e) -> handleEvent(e, null));
readyList.clear();
+
+ // handle errors (closed channels etc...)
errorList.forEach((p) -> handleEvent(p.first, p.second));
errorList.clear();
+
+ // reset interest ops for selected channels
+ resetList.forEach(r -> r.run());
+ resetList.clear();
+
}
} catch (Throwable e) {
//e.printStackTrace();
@@ -746,6 +764,20 @@
}
}
+ final String debugInterestOps(SelectableChannel channel) {
+ try {
+ SelectionKey key = channel.keyFor(selmgr.selector);
+ if (key == null) return "channel not registered with selector";
+ String keyInterestOps = key.isValid()
+ ? "key.interestOps=" + key.interestOps() : "invalid key";
+ return String.format("channel registered with selector, %s, sa.interestOps=%s",
+ keyInterestOps,
+ ((SelectorAttachment)key.attachment()).interestOps);
+ } catch (Throwable t) {
+ return String.valueOf(t);
+ }
+ }
+
/**
* Tracks multiple user level registrations associated with one NIO
* registration (SelectionKey). In this implementation, registrations
@@ -772,6 +804,9 @@
void register(AsyncEvent e) throws ClosedChannelException {
int newOps = e.interestOps();
+ // re register interest if we are not already interested
+ // in the event. If the event is paused, then the pause will
+ // be taken into account later when resetInterestOps is called.
boolean reRegister = (interestOps & newOps) != newOps;
interestOps |= newOps;
pending.add(e);
@@ -779,9 +814,11 @@
// first time registration happens here also
try {
chan.register(selector, interestOps, this);
- } catch (CancelledKeyException x) {
+ } catch (Throwable x) {
abortPending(x);
}
+ } else if (!chan.isOpen()) {
+ abortPending(new ClosedChannelException());
}
}
@@ -818,11 +855,20 @@
this.interestOps = newOps;
SelectionKey key = chan.keyFor(selector);
- if (newOps == 0 && pending.isEmpty()) {
+ if (newOps == 0 && key != null && pending.isEmpty()) {
key.cancel();
} else {
try {
+ if (key == null || !key.isValid()) {
+ throw new CancelledKeyException();
+ }
key.interestOps(newOps);
+ // double check after
+ if (!chan.isOpen()) {
+ abortPending(new ClosedChannelException());
+ return;
+ }
+ assert key.interestOps() == newOps;
} catch (CancelledKeyException x) {
// channel may have been closed
debug.log(Level.DEBUG, "key cancelled for " + chan);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java Thu Feb 22 17:33:21 2018 +0000
@@ -179,13 +179,10 @@
HttpClientImpl client,
HttpRequestImpl request,
Version version) {
+ // The default proxy selector may select a proxy whose address is
+ // unresolved. We must resolve the address before connecting to it.
+ InetSocketAddress proxy = Utils.resolveAddress(request.proxy());
HttpConnection c = null;
- InetSocketAddress proxy = request.proxy();
- if (proxy != null && proxy.isUnresolved()) {
- // The default proxy selector may select a proxy whose address is
- // unresolved. We must resolve the address before connecting to it.
- proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort());
- }
boolean secure = request.secure();
ConnectionPool pool = client.connectionPool();
--- a/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Thu Feb 22 17:33:21 2018 +0000
@@ -51,7 +51,7 @@
private final Object reading = new Object();
protected final SocketChannel chan;
- private final FlowTube tube;
+ private final SocketTube tube; // need SocketTube to call signalClosed().
private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
private volatile boolean connected;
private boolean closed;
@@ -84,7 +84,8 @@
boolean finished = chan.finishConnect();
assert finished : "Expected channel to be connected";
debug.log(Level.DEBUG,
- "ConnectEvent: connect finished: %s Local addr: %s", finished, chan.getLocalAddress());
+ "ConnectEvent: connect finished: %s Local addr: %s",
+ finished, chan.getLocalAddress());
connected = true;
// complete async since the event runs on the SelectorManager thread
cf.completeAsync(() -> null, client().theExecutor());
@@ -107,7 +108,8 @@
assert !connected : "Already connected";
assert !chan.isBlocking() : "Unexpected blocking channel";
boolean finished = false;
- PrivilegedExceptionAction<Boolean> pa = () -> chan.connect(address);
+ PrivilegedExceptionAction<Boolean> pa =
+ () -> chan.connect(Utils.resolveAddress(address));
try {
finished = AccessController.doPrivileged(pa);
} catch (PrivilegedActionException e) {
@@ -179,14 +181,19 @@
* Closes this connection
*/
@Override
- public synchronized void close() {
- if (closed) {
- return;
+ public void close() {
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ closed = true;
}
- closed = true;
try {
Log.logTrace("Closing: " + toString());
+ debug.log(Level.DEBUG, () -> "Closing channel: "
+ + client().debugInterestOps(chan));
chan.close();
+ tube.signalClosed();
} catch (IOException e) {
Log.logTrace("Closing resulted in " + e);
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Thu Feb 22 17:33:21 2018 +0000
@@ -154,6 +154,14 @@
// Events //
// ======================================================================//
+ void signalClosed() {
+ // Ensure that the subscriber will be terminated
+ // and that future subscribers will be notified
+ // when the connection is closed.
+ readPublisher.subscriptionImpl.signalError(
+ new IOException("connection closed locally"));
+ }
+
/**
* A restartable task used to process tasks in sequence.
*/
@@ -498,6 +506,7 @@
}
void signalError(Throwable error) {
+ debug.log(Level.DEBUG, () -> "error signalled " + error);
if (!errorRef.compareAndSet(null, error)) {
return;
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Thu Feb 22 17:33:21 2018 +0000
@@ -230,16 +230,21 @@
boolean returnConnectionToPool,
Executor executor)
{
- Log.logTrace("Reading body on stream {0}", streamid);
- BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
- CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
+ try {
+ Log.logTrace("Reading body on stream {0}", streamid);
+ BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
+ CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
- PushGroup<?> pg = exchange.getPushGroup();
- if (pg != null) {
- // if an error occurs make sure it is recorded in the PushGroup
- cf = cf.whenComplete((t,e) -> pg.pushError(e));
+ PushGroup<?> pg = exchange.getPushGroup();
+ if (pg != null) {
+ // if an error occurs make sure it is recorded in the PushGroup
+ cf = cf.whenComplete((t, e) -> pg.pushError(e));
+ }
+ return cf;
+ } catch (Throwable t) {
+ // may be thrown by handler.apply
+ return MinimalFuture.failedFuture(t);
}
- return cf;
}
@Override
@@ -268,12 +273,16 @@
// We want to allow the subscriber's getBody() method to block so it
// can work with InputStreams. So, we offload execution.
executor.execute(() -> {
- bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
- if (t == null)
- responseBodyCF.complete(body);
- else
- responseBodyCF.completeExceptionally(t);
- });
+ try {
+ bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
+ if (t == null)
+ responseBodyCF.complete(body);
+ else
+ responseBodyCF.completeExceptionally(t);
+ });
+ } catch(Throwable t) {
+ cancelImpl(t);
+ }
});
if (isCanceled()) {
@@ -281,11 +290,11 @@
responseBodyCF.completeExceptionally(t);
} else {
bodySubscriber.onSubscribe(userSubscription);
+ // Set the responseSubscriber field now that onSubscribe has been called.
+ // This effectively allows the scheduler to start invoking the callbacks.
+ responseSubscriber = bodySubscriber;
+ sched.runOrSchedule(); // in case data waiting already to be processed
}
- // Set the responseSubscriber field now that onSubscribe has been called.
- // This effectively allows the scheduler to start invoking the callbacks.
- responseSubscriber = bodySubscriber;
- sched.runOrSchedule(); // in case data waiting already to be processed
return responseBodyCF;
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Thu Feb 22 14:58:11 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Thu Feb 22 17:33:21 2018 +0000
@@ -805,4 +805,13 @@
public static boolean isHostnameVerificationDisabled() {
return isHostnameVerificationDisabled;
}
+
+ public static InetSocketAddress resolveAddress(InetSocketAddress address) {
+ if (address != null && address.isUnresolved()) {
+ // The default proxy selector may select a proxy whose address is
+ // unresolved. We must resolve the address before connecting to it.
+ address = new InetSocketAddress(address.getHostString(), address.getPort());
+ }
+ return address;
+ }
}
--- a/test/jdk/java/net/httpclient/DigestEchoClient.java Thu Feb 22 14:58:11 2018 +0000
+++ b/test/jdk/java/net/httpclient/DigestEchoClient.java Thu Feb 22 17:33:21 2018 +0000
@@ -501,10 +501,11 @@
boolean async, boolean expectContinue)
throws Exception
{
- out.println(format("*** testDigest: client: %s, server: %s, async: %s, useSSL: %s, " +
- "authScheme: %s, authType: %s, expectContinue: %s ***",
- clientVersion, serverVersion, async, useSSL,
- authScheme, authType, expectContinue));
+ String test = format("testDigest: client: %s, server: %s, async: %s, useSSL: %s, " +
+ "authScheme: %s, authType: %s, expectContinue: %s",
+ clientVersion, serverVersion, async, useSSL,
+ authScheme, authType, expectContinue);
+ out.println("*** " + test + " ***");
DigestEchoServer server = EchoServers.of(serverVersion,
useSSL ? "https" : "http", authType, authScheme);
@@ -554,7 +555,8 @@
resp = client.send(request, asLines());
}
System.out.println(resp);
- assert challenge != null || resp.statusCode() == 401 || resp.statusCode() == 407;
+ assert challenge != null || resp.statusCode() == 401 || resp.statusCode() == 407
+ : "challenge=" + challenge + ", resp=" + resp + ", test=[" + test + "]";
if (resp.statusCode() == 401 || resp.statusCode() == 407) {
// This assert may need to be relaxed if our server happened to
// decide to close the tunnel connection, in which case we would
--- a/test/jdk/java/net/httpclient/FlowAdapterPublisherTest.java Thu Feb 22 14:58:11 2018 +0000
+++ b/test/jdk/java/net/httpclient/FlowAdapterPublisherTest.java Thu Feb 22 17:33:21 2018 +0000
@@ -192,8 +192,7 @@
HttpResponse<String> response = client.send(request, asString(UTF_8));
fail("Unexpected response: " + response);
} catch (IOException expected) {
- assertTrue(expected.getMessage().contains("Too few bytes returned"),
- "Exception message:[" + expected.toString() + "]");
+ assertMessage(expected, "Too few bytes returned");
}
}
@@ -210,8 +209,14 @@
HttpResponse<String> response = client.send(request, asString(UTF_8));
fail("Unexpected response: " + response);
} catch (IOException expected) {
- assertTrue(expected.getMessage().contains("Too many bytes in request body"),
- "Exception message:[" + expected.toString() + "]");
+ assertMessage(expected, "Too many bytes in request body");
+ }
+ }
+
+ private void assertMessage(Throwable t, String contains) {
+ if (!t.getMessage().contains(contains)) {
+ String error = "Exception message:[" + t.toString() + "] doesn't contain [" + contains + "]";
+ throw new AssertionError(error, t);
}
}
--- a/test/jdk/java/net/httpclient/HttpServerAdapters.java Thu Feb 22 14:58:11 2018 +0000
+++ b/test/jdk/java/net/httpclient/HttpServerAdapters.java Thu Feb 22 17:33:21 2018 +0000
@@ -68,6 +68,9 @@
*/
public interface HttpServerAdapters {
+ static final boolean PRINTSTACK =
+ Boolean.getBoolean("jdk.internal.httpclient.debug");
+
static void uncheckedWrite(ByteArrayOutputStream baos, byte[] ba) {
try {
baos.write(ba);
@@ -231,6 +234,7 @@
return this.getClass().getSimpleName() + ": " + exchange.toString();
}
}
+
private static final class Http2TestExchangeImpl extends HttpTestExchange {
private final Http2TestExchange exchange;
Http2TestExchangeImpl(Http2TestExchange exch) {
@@ -287,13 +291,24 @@
void handle(HttpTestExchange t) throws IOException;
default HttpHandler toHttpHandler() {
- return (t) -> handle(HttpTestExchange.of(t));
+ return (t) -> doHandle(HttpTestExchange.of(t));
}
default Http2Handler toHttp2Handler() {
- return (t) -> handle(HttpTestExchange.of(t));
+ return (t) -> doHandle(HttpTestExchange.of(t));
+ }
+ private void doHandle(HttpTestExchange t) throws IOException {
+ try {
+ handle(t);
+ } catch (Throwable x) {
+ System.out.println("WARNING: exception caught in HttpTestHandler::handle " + x);
+ System.err.println("WARNING: exception caught in HttpTestHandler::handle " + x);
+ if (PRINTSTACK) x.printStackTrace(System.out);
+ throw x;
+ }
}
}
+
public static class HttpTestEchoHandler implements HttpTestHandler {
@Override
public void handle(HttpTestExchange t) throws IOException {
@@ -332,8 +347,15 @@
this.chain = chain;
}
@Override
- public void doFilter(HttpTestExchange exchange) throws IOException{
- exchange.doFilter(chain);
+ public void doFilter(HttpTestExchange exchange) throws IOException {
+ try {
+ exchange.doFilter(chain);
+ } catch (Throwable t) {
+ System.out.println("WARNING: exception caught in Http1Chain::doFilter" + t);
+ System.err.println("WARNING: exception caught in Http1Chain::doFilter" + t);
+ if (PRINTSTACK) t.printStackTrace();
+ throw t;
+ }
}
}
@@ -346,10 +368,17 @@
}
@Override
public void doFilter(HttpTestExchange exchange) throws IOException {
- if (iter.hasNext()) {
- iter.next().doFilter(exchange, this);
- } else {
- handler.handle(exchange);
+ try {
+ if (iter.hasNext()) {
+ iter.next().doFilter(exchange, this);
+ } else {
+ handler.handle(exchange);
+ }
+ } catch (Throwable t) {
+ System.out.println("WARNING: exception caught in Http2Chain::doFilter" + t);
+ System.err.println("WARNING: exception caught in Http2Chain::doFilter" + t);
+ if (PRINTSTACK) t.printStackTrace();
+ throw t;
}
}
}
@@ -420,6 +449,8 @@
public void stop() { impl.stop(0); }
@Override
public HttpTestContext addHandler(HttpTestHandler handler, String path) {
+ System.out.println("Http1TestServer[" + getAddress()
+ + "]::addHandler " + handler + ", " + path);
return new Http1TestContext(impl.createContext(path, handler.toHttpHandler()));
}
@Override
@@ -466,7 +497,8 @@
}
@Override
public HttpTestContext addHandler(HttpTestHandler handler, String path) {
- System.out.println("Http2TestServerImpl::addHandler " + handler + ", " + path);
+ System.out.println("Http2TestServerImpl[" + getAddress()
+ + "]::addHandler " + handler + ", " + path);
Http2TestContext context = new Http2TestContext(handler, path);
impl.addHandler(context.toHttp2Handler(), path);
return context;
--- a/test/jdk/java/net/httpclient/ShortRequestBody.java Thu Feb 22 14:58:11 2018 +0000
+++ b/test/jdk/java/net/httpclient/ShortRequestBody.java Thu Feb 22 17:33:21 2018 +0000
@@ -168,7 +168,8 @@
HttpResponse<Void> resp = cf.get(30, TimeUnit.SECONDS);
err.println("Response code: " + resp.statusCode());
- check(resp.statusCode() == 200, "Expected 200, got ", resp.statusCode());
+ check(resp.statusCode() == 200, null,
+ "Expected 200, got ", resp.statusCode());
}
static void failureNonBlocking(Supplier<HttpClient> clientSupplier,
@@ -190,11 +191,11 @@
} catch (ExecutionException expected) {
err.println("Caught expected: " + expected);
Throwable t = expected.getCause();
- check(t instanceof IOException,
- "Expected cause IOException, but got: ", expected.getCause());
+ check(t instanceof IOException, t,
+ "Expected cause IOException, but got: ", t);
String msg = t.getMessage();
check(msg.contains("Too many") || msg.contains("Too few"),
- "Expected Too many|Too few, got: ", t);
+ t, "Expected Too many|Too few, got: ", t);
}
}
@@ -215,7 +216,7 @@
err.println("Caught expected: " + expected);
String msg = expected.getMessage();
check(msg.contains("Too many") || msg.contains("Too few"),
- "Expected Too many|Too few, got: ", expected);
+ expected,"Expected Too many|Too few, got: ", expected);
}
}
@@ -318,13 +319,13 @@
catch (IOException x) { throw new UncheckedIOException(x); }
}
- static boolean check(boolean cond, Object... failedArgs) {
+ static boolean check(boolean cond, Throwable t, Object... failedArgs) {
if (cond)
return true;
// We are going to fail...
StringBuilder sb = new StringBuilder();
for (Object o : failedArgs)
sb.append(o);
- throw new RuntimeException(sb.toString());
+ throw new RuntimeException(sb.toString(), t);
}
}
--- a/test/jdk/java/net/httpclient/ThrowingSubscribers.java Thu Feb 22 14:58:11 2018 +0000
+++ b/test/jdk/java/net/httpclient/ThrowingSubscribers.java Thu Feb 22 17:33:21 2018 +0000
@@ -31,7 +31,7 @@
* java.net.http/jdk.internal.net.http.common
* java.net.http/jdk.internal.net.http.frame
* java.net.http/jdk.internal.net.http.hpack
- * @run testng/othervm ThrowingSubscribers
+ * @run testng/othervm -Djdk.internal.httpclient.debug=true ThrowingSubscribers
*/
import com.sun.net.httpserver.HttpServer;
@@ -62,6 +62,7 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -97,11 +98,23 @@
String https2URI_fixed;
String https2URI_chunk;
- static final int ITERATION_COUNT = 2;
+ static final int ITERATION_COUNT = 1;
// a shared executor helps reduce the amount of threads created by the test
static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
static volatile boolean tasksFailed;
+ static final AtomicLong serverCount = new AtomicLong();
+ static final AtomicLong clientCount = new AtomicLong();
+ static final long start = System.nanoTime();
+ public static String now() {
+ long now = System.nanoTime() - start;
+ long secs = now / 1000_000_000;
+ long mill = (now % 1000_000_000) / 1000_000;
+ long nan = now % 1000_000;
+ return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);
+ }
+
+ private volatile HttpClient sharedClient;
static class TestExecutor implements Executor {
final AtomicLong tasks = new AtomicLong();
@@ -118,8 +131,8 @@
command.run();
} catch (Throwable t) {
tasksFailed = true;
- System.out.printf("Task %s failed: %s%n", id, t);
- System.err.printf("Task %s failed: %s%n", id, t);
+ System.out.printf(now() + "Task %s failed: %s%n", id, t);
+ System.err.printf(now() + "Task %s failed: %s%n", id, t);
FAILURES.putIfAbsent("Task " + id, t);
throw t;
}
@@ -129,14 +142,21 @@
@AfterClass
static final void printFailedTests() {
- if (FAILURES.isEmpty()) return;
- out.println("Failed tests: ");
- FAILURES.entrySet().forEach((e) -> {
+ out.println("\n=========================");
+ try {
+ out.printf("%n%sCreated %d servers and %d clients%n",
+ now(), serverCount.get(), clientCount.get());
+ if (FAILURES.isEmpty()) return;
+ out.println("Failed tests: ");
+ FAILURES.entrySet().forEach((e) -> {
out.printf("\t%s: %s%n", e.getKey(), e.getValue());
e.getValue().printStackTrace();
- });
- if (tasksFailed) {
- throw new RuntimeException("Some tasks failed");
+ });
+ if (tasksFailed) {
+ System.out.println("WARNING: Some tasks failed");
+ }
+ } finally {
+ out.println("\n=========================\n");
}
}
@@ -157,13 +177,16 @@
public Object[][] noThrows() {
String[] uris = uris();
Object[][] result = new Object[uris.length * 2][];
+ //Object[][] result = new Object[uris.length][];
int i = 0;
for (boolean sameClient : List.of(false, true)) {
+ //if (!sameClient) continue;
for (String uri: uris()) {
result[i++] = new Object[] {uri, sameClient};
}
}
assert i == uris.length * 2;
+ // assert i == uris.length ;
return result;
}
@@ -171,34 +194,53 @@
public Object[][] variants() {
String[] uris = uris();
Object[][] result = new Object[uris.length * 2 * 2][];
+ //Object[][] result = new Object[(uris.length/2) * 2 * 2][];
int i = 0;
for (Thrower thrower : List.of(
- new UncheckedCustomExceptionThrower(),
- new UncheckedIOExceptionThrower())) {
+ new UncheckedIOExceptionThrower(),
+ new UncheckedCustomExceptionThrower())) {
for (boolean sameClient : List.of(false, true)) {
for (String uri : uris()) {
+ // if (uri.contains("http2") || uri.contains("https2")) continue;
+ // if (!sameClient) continue;
result[i++] = new Object[]{uri, sameClient, thrower};
}
}
}
assert i == uris.length * 2 * 2;
+ //assert Stream.of(result).filter(o -> o != null).count() == result.length;
return result;
}
- HttpClient newHttpClient() {
+ private HttpClient makeNewClient() {
+ clientCount.incrementAndGet();
return HttpClient.newBuilder()
- .executor(executor)
- .sslContext(sslContext)
- .build();
+ .executor(executor)
+ .sslContext(sslContext)
+ .build();
+ }
+
+ HttpClient newHttpClient(boolean share) {
+ if (!share) return makeNewClient();
+ HttpClient shared = sharedClient;
+ if (shared != null) return shared;
+ synchronized (this) {
+ shared = sharedClient;
+ if (shared == null) {
+ shared = sharedClient = makeNewClient();
+ }
+ return shared;
+ }
}
@Test(dataProvider = "noThrows")
public void testNoThrows(String uri, boolean sameClient)
throws Exception {
HttpClient client = null;
+ out.printf("%ntestNoThrows(%s, %b)%n", uri, sameClient);
for (int i=0; i< ITERATION_COUNT; i++) {
if (!sameClient || client == null)
- client = newHttpClient();
+ client = newHttpClient(sameClient);
HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
.build();
@@ -255,7 +297,7 @@
{
String test = format("testThrowingAsStringAsync(%s, %b, %s)",
uri, sameClient, thrower);
- testThrowing(uri, sameClient, BodyHandler::asString,
+ testThrowing(test, uri, sameClient, BodyHandler::asString,
this::shouldHaveThrown, thrower, true);
}
@@ -288,7 +330,7 @@
Finisher finisher, Thrower thrower, boolean async)
throws Exception
{
- out.printf("%n%s%n", name);
+ out.printf("%n%s%s%n", now(), name);
try {
testThrowing(uri, sameClient, handlers, finisher, thrower, async);
} catch (Error | Exception x) {
@@ -307,9 +349,8 @@
for (Where where : Where.values()) {
if (where == Where.ON_SUBSCRIBE) continue;
if (where == Where.ON_ERROR) continue;
- if (where == Where.GET_BODY) continue; // doesn't work with HTTP/2
if (!sameClient || client == null)
- client = newHttpClient();
+ client = newHttpClient(sameClient);
HttpRequest req = HttpRequest.
newBuilder(URI.create(uri))
@@ -324,14 +365,14 @@
} catch (Error | Exception x) {
Throwable cause = findCause(x, thrower);
if (cause == null) throw x;
- System.out.println("Got expected exception: " + cause);
+ System.out.println(now() + "Got expected exception: " + cause);
}
} else {
try {
response = client.send(req, handler);
} catch (Error | Exception t) {
if (thrower.test(t)) {
- System.out.println("Got expected exception: " + t);
+ System.out.println(now() + "Got expected exception: " + t);
} else throw t;
}
}
@@ -341,7 +382,8 @@
}
}
- enum Where {BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY;
+ enum Where {
+ BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY, BODY_CF;
public Consumer<Where> select(Consumer<Where> consumer) {
return new Consumer<Where>() {
@Override
@@ -362,15 +404,16 @@
U finish(Where w, HttpResponse<T> resp, Thrower thrower) throws IOException;
}
- <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {
+ final <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {
throw new RuntimeException("Expected exception not thrown in " + w);
}
- List<String> checkAsLines(Where w, HttpResponse<Stream<String>> resp, Thrower thrower) {
+ final List<String> checkAsLines(Where w, HttpResponse<Stream<String>> resp, Thrower thrower) {
switch(w) {
case BODY_HANDLER: return shouldHaveThrown(w, resp, thrower);
case ON_SUBSCRIBE: return shouldHaveThrown(w, resp, thrower);
case GET_BODY: return shouldHaveThrown(w, resp, thrower);
+ case BODY_CF: return shouldHaveThrown(w, resp, thrower);
default: break;
}
List<String> result = null;
@@ -379,7 +422,7 @@
} catch (Error | Exception x) {
Throwable cause = findCause(x, thrower);
if (cause != null) {
- out.println("Got expected exception in " + w + ": " + cause);
+ out.println(now() + "Got expected exception in " + w + ": " + cause);
return result;
}
throw x;
@@ -387,7 +430,7 @@
throw new RuntimeException("Expected exception not thrown in " + w);
}
- List<String> checkAsInputStream(Where w, HttpResponse<InputStream> resp,
+ final List<String> checkAsInputStream(Where w, HttpResponse<InputStream> resp,
Thrower thrower)
throws IOException
{
@@ -395,6 +438,7 @@
case BODY_HANDLER: return shouldHaveThrown(w, resp, thrower);
case ON_SUBSCRIBE: return shouldHaveThrown(w, resp, thrower);
case GET_BODY: return shouldHaveThrown(w, resp, thrower);
+ case BODY_CF: return shouldHaveThrown(w, resp, thrower);
default: break;
}
List<String> result = null;
@@ -403,10 +447,9 @@
try {
result = r.lines().collect(Collectors.toList());
} catch (Error | Exception x) {
- Throwable cause =
- findCause(x, thrower);
+ Throwable cause = findCause(x, thrower);
if (cause != null) {
- out.println("Got expected exception in " + w + ": " + x);
+ out.println(now() + "Got expected exception in " + w + ": " + cause);
return result;
}
throw x;
@@ -421,9 +464,10 @@
return x;
}
- static class UncheckedCustomExceptionThrower implements Thrower {
+ static final class UncheckedCustomExceptionThrower implements Thrower {
@Override
public void accept(Where where) {
+ out.println(now() + "Throwing in " + where);
throw new UncheckedCustomException(where.name());
}
@@ -438,9 +482,10 @@
}
}
- static class UncheckedIOExceptionThrower implements Thrower {
+ static final class UncheckedIOExceptionThrower implements Thrower {
@Override
public void accept(Where where) {
+ out.println(now() + "Throwing in " + where);
throw new UncheckedIOException(new CustomIOException(where.name()));
}
@@ -456,7 +501,7 @@
}
}
- static class UncheckedCustomException extends RuntimeException {
+ static final class UncheckedCustomException extends RuntimeException {
UncheckedCustomException(String message) {
super(message);
}
@@ -465,7 +510,7 @@
}
}
- static class CustomIOException extends IOException {
+ static final class CustomIOException extends IOException {
CustomIOException(String message) {
super(message);
}
@@ -474,7 +519,7 @@
}
}
- static class ThrowingBodyHandler<T> implements BodyHandler<T> {
+ static final class ThrowingBodyHandler<T> implements BodyHandler<T> {
final Consumer<Where> throwing;
final BodyHandler<T> bodyHandler;
ThrowingBodyHandler(Consumer<Where> throwing, BodyHandler<T> bodyHandler) {
@@ -489,7 +534,7 @@
}
}
- static class ThrowingBodySubscriber<T> implements BodySubscriber<T> {
+ static final class ThrowingBodySubscriber<T> implements BodySubscriber<T> {
private final BodySubscriber<T> subscriber;
volatile boolean onSubscribeCalled;
final Consumer<Where> throwing;
@@ -533,6 +578,11 @@
@Override
public CompletionStage<T> getBody() {
throwing.accept(Where.GET_BODY);
+ try {
+ throwing.accept(Where.BODY_CF);
+ } catch (Throwable t) {
+ return CompletableFuture.failedFuture(t);
+ }
return subscriber.getBody();
}
}
@@ -580,6 +630,7 @@
https2URI_fixed = "https://127.0.0.1:" + port + "/https2/fixed/x";
https2URI_chunk = "https://127.0.0.1:" + port + "/https2/chunk/x";
+ serverCount.addAndGet(4);
httpTestServer.start();
httpsTestServer.start();
http2TestServer.start();
@@ -588,6 +639,7 @@
@AfterTest
public void teardown() throws Exception {
+ sharedClient = null;
httpTestServer.stop();
httpsTestServer.stop();
http2TestServer.stop();