http-client-branch: fix intermittent problem with stream counting by decrementing counter before completing the subscriber CF http-client-branch
authormichaelm
Mon, 28 May 2018 10:48:38 +0100
branchhttp-client-branch
changeset 56616 5d2446adafaf
parent 56604 8a808d85fc1a
child 56618 e4022357f852
http-client-branch: fix intermittent problem with stream counting by decrementing counter before completing the subscriber CF
src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java
src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Thu May 24 18:41:30 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Mon May 28 10:48:38 2018 +0100
@@ -837,23 +837,28 @@
                           streamid);
             }
         } finally {
+            decrementStreamsCount(streamid);
             closeStream(streamid);
         }
     }
 
+    // reduce count of streams by 1 if stream still exists
+    synchronized void decrementStreamsCount(int streamid) {
+        Stream<?> s = streams.get(streamid);
+        if (s == null || !s.deRegister())
+            return;
+        if (streamid % 2 == 1) {
+            numReservedClientStreams--;
+        } else {
+            numReservedServerStreams--;
+        }
+    }
+
     void closeStream(int streamid) {
         if (debug.on()) debug.log("Closed stream %d", streamid);
         boolean isClient = (streamid % 2) == 1;
         Stream<?> s = streams.remove(streamid);
         if (s != null) {
-            synchronized (this) {
-                if (isClient)
-                    numReservedClientStreams--;
-                else
-                    numReservedServerStreams--;
-            }
-            assert numReservedClientStreams >= 0;
-            assert numReservedServerStreams >= 0;
             // decrement the reference count on the HttpClientImpl
             // to allow the SelectorManager thread to exit if no
             // other operation is pending and the facade is no
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Thu May 24 18:41:30 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Mon May 28 10:48:38 2018 +0100
@@ -39,6 +39,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiPredicate;
 import java.net.http.HttpClient;
@@ -133,6 +134,8 @@
     private volatile boolean closed;
     private volatile boolean endStreamSent;
 
+    final AtomicBoolean deRegistered = new AtomicBoolean(false);
+
     // state flags
     private boolean requestSent, responseReceived;
 
@@ -185,6 +188,7 @@
                     Log.logTrace("responseSubscriber.onComplete");
                     if (debug.on()) debug.log("incoming: onComplete");
                     sched.stop();
+                    connection.decrementStreamsCount(streamid);
                     subscriber.onComplete();
                     onCompleteCalled = true;
                     setEndStreamReceived();
@@ -251,6 +255,10 @@
         return true; // end of stream
     }
 
+    boolean deRegister() {
+        return deRegistered.compareAndSet(false, true);
+    }
+
     @Override
     CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
                                        boolean returnConnectionToPool,
@@ -472,6 +480,7 @@
                     responseBodyCF.completeExceptionally(errorRef.get());
                 }
             } finally {
+                connection.decrementStreamsCount(streamid);
                 connection.closeStream(streamid);
             }
         } else {
@@ -688,6 +697,7 @@
         if (streamid > 0) {
             if (debug.on()) debug.log("Released stream %d", streamid);
             // remove this stream from the Http2Connection map.
+            connection.decrementStreamsCount(streamid);
             connection.closeStream(streamid);
         } else {
             if (debug.on()) debug.log("Can't release stream %d", streamid);
@@ -1102,6 +1112,7 @@
         try {
             // will send a RST_STREAM frame
             if (streamid != 0) {
+                connection.decrementStreamsCount(streamid);
                 e = Utils.getCompletionCause(e);
                 if (e instanceof EOFException) {
                     // read EOF: no need to try & send reset