http-client-branch: fix intermittent problem with stream counting by decrementing counter before completing the subscriber CF
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Thu May 24 18:41:30 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Mon May 28 10:48:38 2018 +0100
@@ -837,23 +837,28 @@
streamid);
}
} finally {
+ decrementStreamsCount(streamid);
closeStream(streamid);
}
}
+ // reduce count of streams by 1 if stream still exists
+ synchronized void decrementStreamsCount(int streamid) {
+ Stream<?> s = streams.get(streamid);
+ if (s == null || !s.deRegister())
+ return;
+ if (streamid % 2 == 1) {
+ numReservedClientStreams--;
+ } else {
+ numReservedServerStreams--;
+ }
+ }
+
void closeStream(int streamid) {
if (debug.on()) debug.log("Closed stream %d", streamid);
boolean isClient = (streamid % 2) == 1;
Stream<?> s = streams.remove(streamid);
if (s != null) {
- synchronized (this) {
- if (isClient)
- numReservedClientStreams--;
- else
- numReservedServerStreams--;
- }
- assert numReservedClientStreams >= 0;
- assert numReservedServerStreams >= 0;
// decrement the reference count on the HttpClientImpl
// to allow the SelectorManager thread to exit if no
// other operation is pending and the facade is no
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Thu May 24 18:41:30 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Mon May 28 10:48:38 2018 +0100
@@ -39,6 +39,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.net.http.HttpClient;
@@ -133,6 +134,8 @@
private volatile boolean closed;
private volatile boolean endStreamSent;
+ final AtomicBoolean deRegistered = new AtomicBoolean(false);
+
// state flags
private boolean requestSent, responseReceived;
@@ -185,6 +188,7 @@
Log.logTrace("responseSubscriber.onComplete");
if (debug.on()) debug.log("incoming: onComplete");
sched.stop();
+ connection.decrementStreamsCount(streamid);
subscriber.onComplete();
onCompleteCalled = true;
setEndStreamReceived();
@@ -251,6 +255,10 @@
return true; // end of stream
}
+ boolean deRegister() {
+ return deRegistered.compareAndSet(false, true);
+ }
+
@Override
CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
boolean returnConnectionToPool,
@@ -472,6 +480,7 @@
responseBodyCF.completeExceptionally(errorRef.get());
}
} finally {
+ connection.decrementStreamsCount(streamid);
connection.closeStream(streamid);
}
} else {
@@ -688,6 +697,7 @@
if (streamid > 0) {
if (debug.on()) debug.log("Released stream %d", streamid);
// remove this stream from the Http2Connection map.
+ connection.decrementStreamsCount(streamid);
connection.closeStream(streamid);
} else {
if (debug.on()) debug.log("Can't release stream %d", streamid);
@@ -1102,6 +1112,7 @@
try {
// will send a RST_STREAM frame
if (streamid != 0) {
+ connection.decrementStreamsCount(streamid);
e = Utils.getCompletionCause(e);
if (e instanceof EOFException) {
// read EOF: no need to try & send reset