# HG changeset patch # User dfuchs # Date 1520434553 0 # Node ID a339fe1aab551727f8ea94a4caaa3ae2e6589507 # Parent df3f97c19c1d84c36eb767c6867d68f4e5a7c36d http-client-branch: fix issue with client-closed streams in test HTTP/2 server diff -r df3f97c19c1d -r a339fe1aab55 src/java.net.http/share/classes/jdk/internal/net/http/Stream.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Wed Mar 07 14:20:57 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Wed Mar 07 14:55:53 2018 +0000 @@ -121,7 +121,8 @@ volatile RequestSubscriber requestSubscriber; volatile int responseCode; volatile Response response; - volatile Throwable failed; // The exception with which this stream was canceled. + // The exception with which this stream was canceled. + private final AtomicReference errorRef = new AtomicReference<>(); final CompletableFuture requestBodyCF = new MinimalFuture<>(); volatile CompletableFuture responseBodyCF; @@ -155,6 +156,7 @@ // can't process anything yet return; + boolean onCompleteCalled = false; try { while (!inputQ.isEmpty()) { Http2Frame frame = inputQ.peek(); @@ -175,6 +177,7 @@ debug.log(Level.DEBUG, "incoming: onComplete"); sched.stop(); responseSubscriber.onComplete(); + onCompleteCalled = true; setEndStreamReceived(); return; } else if (userSubscription.tryDecrement()) { @@ -187,6 +190,7 @@ debug.log(Level.DEBUG, "incoming: onComplete"); sched.stop(); responseSubscriber.onComplete(); + onCompleteCalled = true; setEndStreamReceived(); return; } @@ -195,14 +199,21 @@ } } } catch (Throwable throwable) { - failed = throwable; + errorRef.compareAndSet(null, throwable); } - Throwable t = failed; + Throwable t = errorRef.get(); if (t != null) { sched.stop(); - responseSubscriber.onError(t); - close(); + try { + if (!onCompleteCalled) { + responseSubscriber.onError(t); + } + } catch (Throwable x) { + Log.logError("Subscriber::onError threw exception: {0}", (Object)t); + } finally { + cancelImpl(t); + } } } @@ -975,6 +986,7 @@ // This method sends a RST_STREAM frame void cancelImpl(Throwable e) { + errorRef.compareAndSet(null, e); debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e); if (Log.trace()) { Log.logTrace("cancelling stream {0}: {1}\n", streamid, e); @@ -982,7 +994,6 @@ boolean closing; if (closing = !closed) { // assigning closing to !closed synchronized (this) { - failed = e; if (closing = !closed) { // assigning closing to !closed closed=true; } @@ -994,10 +1005,10 @@ } completeResponseExceptionally(e); if (!requestBodyCF.isDone()) { - requestBodyCF.completeExceptionally(e); // we may be sending the body.. + requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body.. } if (responseBodyCF != null) { - responseBodyCF.completeExceptionally(e); + responseBodyCF.completeExceptionally(errorRef.get()); } try { // will send a RST_STREAM frame @@ -1173,7 +1184,7 @@ * @return true if this exchange was canceled. */ synchronized boolean isCanceled() { - return failed != null; + return errorRef.get() != null; } /** @@ -1181,7 +1192,7 @@ * @return the cause for which this exchange was canceled, if available. */ synchronized Throwable getCancelCause() { - return failed; + return errorRef.get(); } final String dbgString() { diff -r df3f97c19c1d -r a339fe1aab55 test/jdk/java/net/httpclient/MethodsTest.java --- a/test/jdk/java/net/httpclient/MethodsTest.java Wed Mar 07 14:20:57 2018 +0000 +++ b/test/jdk/java/net/httpclient/MethodsTest.java Wed Mar 07 14:55:53 2018 +0000 @@ -90,7 +90,7 @@ throw new RuntimeException("Unexpected IAE for header:" + name); } } - + public static void main(String[] args) throws Exception { bad("bad:method"); bad("Foo\n"); diff -r df3f97c19c1d -r a339fe1aab55 test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java --- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Wed Mar 07 14:20:57 2018 +0000 +++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Wed Mar 07 14:55:53 2018 +0000 @@ -598,7 +598,18 @@ // give to user Http2Handler handler = server.getHandlerFor(uri.getPath()); - handler.handle(exchange); + try { + handler.handle(exchange); + } catch (IOException closed) { + if (bos.closed) { + Queue q = streams.get(streamid); + if (q != null && (q.isClosed() || q.isClosing())) { + System.err.println("TestServer: Stream " + streamid + " closed: " + closed); + return; + } + } + throw closed; + } // everything happens in the exchange from here. Hopefully will // return though. diff -r df3f97c19c1d -r a339fe1aab55 test/jdk/java/net/httpclient/http2/server/Queue.java --- a/test/jdk/java/net/httpclient/http2/server/Queue.java Wed Mar 07 14:20:57 2018 +0000 +++ b/test/jdk/java/net/httpclient/http2/server/Queue.java Wed Mar 07 14:55:53 2018 +0000 @@ -48,6 +48,14 @@ return q.size(); } + public synchronized boolean isClosed() { + return closed; + } + + public synchronized boolean isClosing() { + return closing; + } + public synchronized void put(T obj) throws IOException { Objects.requireNonNull(obj); if (closed || closing) { diff -r df3f97c19c1d -r a339fe1aab55 test/jdk/java/net/httpclient/security/Driver.java --- a/test/jdk/java/net/httpclient/security/Driver.java Wed Mar 07 14:20:57 2018 +0000 +++ b/test/jdk/java/net/httpclient/security/Driver.java Wed Mar 07 14:55:53 2018 +0000 @@ -59,7 +59,7 @@ */ public class Driver { // change the default value to "true" to get the subprocess traces. - final static boolean DEBUG = Boolean.parseBoolean(System.getProperty("test.debug", "false")); + final static boolean DEBUG = Boolean.parseBoolean(System.getProperty("test.debug", "true")); public static void main(String[] args) throws Throwable { System.out.println("Starting Driver");