http-client-branch: fix issue with client-closed streams in test HTTP/2 server http-client-branch
authordfuchs
Wed, 07 Mar 2018 14:55:53 +0000
branchhttp-client-branch
changeset 56261 a339fe1aab55
parent 56260 df3f97c19c1d
child 56262 d818a6a8295a
http-client-branch: fix issue with client-closed streams in test HTTP/2 server
src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
test/jdk/java/net/httpclient/MethodsTest.java
test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java
test/jdk/java/net/httpclient/http2/server/Queue.java
test/jdk/java/net/httpclient/security/Driver.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Wed Mar 07 14:20:57 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Wed Mar 07 14:55:53 2018 +0000
@@ -121,7 +121,8 @@
     volatile RequestSubscriber requestSubscriber;
     volatile int responseCode;
     volatile Response response;
-    volatile Throwable failed; // The exception with which this stream was canceled.
+    // The exception with which this stream was canceled.
+    private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
     final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
     volatile CompletableFuture<T> responseBodyCF;
 
@@ -155,6 +156,7 @@
             // can't process anything yet
             return;
 
+        boolean onCompleteCalled = false;
         try {
             while (!inputQ.isEmpty()) {
                 Http2Frame frame = inputQ.peek();
@@ -175,6 +177,7 @@
                     debug.log(Level.DEBUG, "incoming: onComplete");
                     sched.stop();
                     responseSubscriber.onComplete();
+                    onCompleteCalled = true;
                     setEndStreamReceived();
                     return;
                 } else if (userSubscription.tryDecrement()) {
@@ -187,6 +190,7 @@
                         debug.log(Level.DEBUG, "incoming: onComplete");
                         sched.stop();
                         responseSubscriber.onComplete();
+                        onCompleteCalled = true;
                         setEndStreamReceived();
                         return;
                     }
@@ -195,14 +199,21 @@
                 }
             }
         } catch (Throwable throwable) {
-            failed = throwable;
+            errorRef.compareAndSet(null, throwable);
         }
 
-        Throwable t = failed;
+        Throwable t = errorRef.get();
         if (t != null) {
             sched.stop();
-            responseSubscriber.onError(t);
-            close();
+            try {
+                if (!onCompleteCalled) {
+                    responseSubscriber.onError(t);
+                }
+            } catch (Throwable x) {
+                Log.logError("Subscriber::onError threw exception: {0}", (Object)t);
+            } finally {
+                cancelImpl(t);
+            }
         }
     }
 
@@ -975,6 +986,7 @@
 
     // This method sends a RST_STREAM frame
     void cancelImpl(Throwable e) {
+        errorRef.compareAndSet(null, e);
         debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e);
         if (Log.trace()) {
             Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
@@ -982,7 +994,6 @@
         boolean closing;
         if (closing = !closed) { // assigning closing to !closed
             synchronized (this) {
-                failed = e;
                 if (closing = !closed) { // assigning closing to !closed
                     closed=true;
                 }
@@ -994,10 +1005,10 @@
         }
         completeResponseExceptionally(e);
         if (!requestBodyCF.isDone()) {
-            requestBodyCF.completeExceptionally(e); // we may be sending the body..
+            requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
         }
         if (responseBodyCF != null) {
-            responseBodyCF.completeExceptionally(e);
+            responseBodyCF.completeExceptionally(errorRef.get());
         }
         try {
             // will send a RST_STREAM frame
@@ -1173,7 +1184,7 @@
      * @return true if this exchange was canceled.
      */
     synchronized boolean isCanceled() {
-        return failed != null;
+        return errorRef.get() != null;
     }
 
     /**
@@ -1181,7 +1192,7 @@
      * @return the cause for which this exchange was canceled, if available.
      */
     synchronized Throwable getCancelCause() {
-        return failed;
+        return errorRef.get();
     }
 
     final String dbgString() {
--- a/test/jdk/java/net/httpclient/MethodsTest.java	Wed Mar 07 14:20:57 2018 +0000
+++ b/test/jdk/java/net/httpclient/MethodsTest.java	Wed Mar 07 14:55:53 2018 +0000
@@ -90,7 +90,7 @@
             throw new RuntimeException("Unexpected IAE for header:" + name);
         }
     }
-    
+
     public static void main(String[] args) throws Exception {
         bad("bad:method");
         bad("Foo\n");
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java	Wed Mar 07 14:20:57 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java	Wed Mar 07 14:55:53 2018 +0000
@@ -598,7 +598,18 @@
 
             // give to user
             Http2Handler handler = server.getHandlerFor(uri.getPath());
-            handler.handle(exchange);
+            try {
+                handler.handle(exchange);
+            } catch (IOException closed) {
+                if (bos.closed) {
+                    Queue q = streams.get(streamid);
+                    if (q != null && (q.isClosed() || q.isClosing())) {
+                        System.err.println("TestServer: Stream " + streamid + " closed: " + closed);
+                        return;
+                    }
+                }
+                throw closed;
+            }
 
             // everything happens in the exchange from here. Hopefully will
             // return though.
--- a/test/jdk/java/net/httpclient/http2/server/Queue.java	Wed Mar 07 14:20:57 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Queue.java	Wed Mar 07 14:55:53 2018 +0000
@@ -48,6 +48,14 @@
         return q.size();
     }
 
+    public synchronized boolean isClosed() {
+        return closed;
+    }
+
+    public synchronized boolean isClosing() {
+        return closing;
+    }
+
     public synchronized void put(T obj) throws IOException {
         Objects.requireNonNull(obj);
         if (closed || closing) {
--- a/test/jdk/java/net/httpclient/security/Driver.java	Wed Mar 07 14:20:57 2018 +0000
+++ b/test/jdk/java/net/httpclient/security/Driver.java	Wed Mar 07 14:55:53 2018 +0000
@@ -59,7 +59,7 @@
  */
 public class Driver {
     // change the default value to "true" to get the subprocess traces.
-    final static boolean DEBUG = Boolean.parseBoolean(System.getProperty("test.debug", "false"));
+    final static boolean DEBUG = Boolean.parseBoolean(System.getProperty("test.debug", "true"));
 
     public static void main(String[] args) throws Throwable {
         System.out.println("Starting Driver");