src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java Sat Mar 03 09:54:31 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java Sat Mar 03 09:57:25 2018 +0000
@@ -204,7 +204,7 @@
* Sometime it might be necessary to complete the downstream subscriber
* before the upstream completes. For instance, when an SSL server
* sends a notify_close. In that case we should let the outgoing
- * complete before upstream us completed.
+ * complete before upstream is completed.
* @return true, may be overridden by subclasses.
*/
public boolean closing() {
@@ -217,18 +217,19 @@
assert Utils.remaining(buffers) == 0;
boolean closing = closing();
logger.log(Level.DEBUG,
- "completionAcknowledged upstreamCompleted:%s, downstreamCompleted:%s, closing:%s",
- upstreamCompleted, downstreamCompleted, closing);
- if (!upstreamCompleted && !closing)
+ "completionAcknowledged upstreamCompleted:%s,"
+ + " downstreamCompleted:%s, closing:%s",
+ upstreamCompleted, downstreamCompleted, closing);
+ if (!upstreamCompleted && !closing) {
throw new IllegalStateException("upstream not completed");
+ }
completionAcknowledged = true;
} else {
logger.log(Level.DEBUG, () -> "Adding "
- + Utils.remaining(buffers)
- + " to outputQ queue");
+ + Utils.remaining(buffers) + " to outputQ queue");
outputQ.add(buffers);
}
- logger.log(Level.DEBUG, () -> "pushScheduler "
+ logger.log(Level.DEBUG, () -> "pushScheduler"
+ (pushScheduler.isStopped() ? " is stopped!" : " is alive"));
pushScheduler.runOrSchedule();
}
@@ -281,7 +282,8 @@
Throwable error = errorRef.get();
if (error != null) {
synchronized(this) {
- if (downstreamCompleted) return;
+ if (downstreamCompleted)
+ return;
downstreamCompleted = true;
}
logger.log(Level.DEBUG,
@@ -319,9 +321,11 @@
void upstreamWindowUpdate() {
long downstreamQueueSize = outputQ.size();
- long n = upstreamWindowUpdate(upstreamWindow.get(), downstreamQueueSize);
- logger.log(Level.DEBUG, "upstreamWindowUpdate, downstreamQueueSize:%d, upstreamWindow:%d",
- downstreamQueueSize, upstreamWindow.get());
+ long upstreamWindowSize = upstreamWindow.get();
+ long n = upstreamWindowUpdate(upstreamWindowSize, downstreamQueueSize);
+ logger.log(Level.DEBUG, "upstreamWindowUpdate, "
+ + "downstreamQueueSize:%d, upstreamWindow:%d",
+ downstreamQueueSize, upstreamWindowSize);
if (n > 0)
upstreamRequest(n);
}