http-client-branch: SSLFlowDelegate::cleanList should not remove the final SENTINEL. HttpResponseInputStream.close should wakeup the reader, if any.
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Mon Dec 04 16:54:26 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Tue Dec 05 10:07:50 2017 +0000
@@ -554,6 +554,7 @@
if (frame instanceof MalformedFrame) {
Log.logError(((MalformedFrame) frame).getMessage());
if (streamid == 0) {
+ framesDecoder.close("Malformed frame on stream 0");
protocolError(((MalformedFrame) frame).getErrorCode(),
((MalformedFrame) frame).getMessage());
} else {
@@ -568,6 +569,8 @@
} else {
if (frame instanceof SettingsFrame) {
// The stream identifier for a SETTINGS frame MUST be zero
+ framesDecoder.close(
+ "The stream identifier for a SETTINGS frame MUST be zero");
protocolError(GoAwayFrame.PROTOCOL_ERROR);
return;
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Mon Dec 04 16:54:26 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Tue Dec 05 10:07:50 2017 +0000
@@ -447,10 +447,14 @@
subscription = null;
}
// s will be null if already completed
- if (s != null) {
- s.cancel();
+ try {
+ if (s != null) {
+ s.cancel();
+ }
+ } finally {
+ buffers.offer(LAST_LIST);
+ super.close();
}
- super.close();
}
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Mon Dec 04 16:54:26 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Tue Dec 05 10:07:50 2017 +0000
@@ -300,13 +300,13 @@
+ states(handshakeState)
+ ", " + engine.getHandshakeStatus());
int len;
- boolean completing = false;
+ boolean complete = false;
while ((len = readBuf.remaining()) > 0) {
boolean handshaking = false;
try {
EngineResult result;
synchronized (readBufferLock) {
- completing = this.completing;
+ complete = this.completing;
result = unwrapBuffer(readBuf);
debugr.log(Level.DEBUG, "Unwrapped: %s", result.result);
}
@@ -325,12 +325,12 @@
return;
}
}
- if (completing && result.status() == Status.CLOSED) {
+ if (complete && result.status() == Status.CLOSED) {
debugr.log(Level.DEBUG, "Closed: completing");
outgoing(Utils.EMPTY_BB_LIST, true);
return;
}
- if (result.handshaking() && !completing) {
+ if (result.handshaking() && !complete) {
debugr.log(Level.DEBUG, "handshaking");
doHandshake(result, READER);
resumeActivity();
@@ -346,15 +346,15 @@
errorCommon(ex);
handleError(ex);
}
- if (handshaking && !completing)
+ if (handshaking && !complete)
return;
}
- if (!completing) {
+ if (!complete) {
synchronized (readBufferLock) {
- completing = this.completing && !readBuf.hasRemaining();
+ complete = this.completing && !readBuf.hasRemaining();
}
}
- if (completing) {
+ if (complete) {
debugr.log(Level.DEBUG, "completing");
// Complete the alpnCF, if not already complete, regardless of
// whether or not the ALPN is available, there will be no more
@@ -441,6 +441,8 @@
final List<ByteBuffer> writeList;
final System.Logger debugw =
Utils.getDebugLogger(this::dbgString, DEBUG);
+ volatile boolean completing;
+ boolean completed; // only accessed in processData
class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
@Override public void run() { processData(); }
@@ -458,6 +460,7 @@
assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
if (complete) {
debugw.log(Level.DEBUG, "adding SENTINEL");
+ completing = true;
writeList.add(SENTINEL);
} else {
writeList.addAll(buffers);
@@ -492,12 +495,7 @@
}
private boolean isCompleting() {
- synchronized(writeList) {
- int lastIndex = writeList.size() - 1;
- if (lastIndex < 0)
- return false;
- return writeList.get(lastIndex) == SENTINEL;
- }
+ return completing;
}
@Override
@@ -532,9 +530,11 @@
if (result.bytesProduced() <= 0)
return;
- completing = true;
- // There could still be some outgoing data in outbufs.
- writeList.add(SENTINEL);
+ if (!completing && !completed) {
+ completing = this.completing = true;
+ // There could still be some outgoing data in outbufs.
+ writeList.add(SENTINEL);
+ }
}
boolean handshaking = false;
@@ -565,7 +565,11 @@
EngineResult result = wrapBuffers(Utils.EMPTY_BB_ARRAY);
sendResultBytes(result);
*/
- outgoing(Utils.EMPTY_BB_LIST, true);
+ if (!completed) {
+ completed = true;
+ writeList.clear();
+ outgoing(Utils.EMPTY_BB_LIST, true);
+ }
return;
}
if (writeList.isEmpty() && needWrap()) {
@@ -615,7 +619,7 @@
Iterator<ByteBuffer> iter = l.iterator();
while (iter.hasNext()) {
ByteBuffer b = iter.next();
- if (!b.hasRemaining()) {
+ if (!b.hasRemaining() && b != SENTINEL) {
iter.remove();
}
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java Mon Dec 04 16:54:26 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java Tue Dec 05 10:07:50 2017 +0000
@@ -257,6 +257,7 @@
subscribed = subscribedDone;
}
if (subscribed) {
+ debug.log(Level.DEBUG, "DelegateWrapper: completing subscriber");
delegate.onComplete();
} else {
debug.log(Level.DEBUG,
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java Mon Dec 04 16:54:26 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java Tue Dec 05 10:07:50 2017 +0000
@@ -211,8 +211,11 @@
Objects.requireNonNull(buffers);
if (complete) {
assert Utils.remaining(buffers) == 0;
- logger.log(Level.DEBUG, "completionAcknowledged");
- if (!upstreamCompleted && !closing())
+ boolean closing = closing();
+ logger.log(Level.DEBUG,
+ "completionAcknowledged upstreamCompleted:%s, downstreamCompleted:%s, closing:%s",
+ upstreamCompleted, downstreamCompleted, closing);
+ if (!upstreamCompleted && !closing)
throw new IllegalStateException("upstream not completed");
completionAcknowledged = true;
} else {
@@ -417,6 +420,7 @@
return;
}
if (completionAcknowledged) {
+ logger.log(Level.DEBUG, "calling downstreamSubscriber.onComplete()");
downstreamSubscriber.onComplete();
// Fix me subscriber.onComplete.run();
downstreamCompleted = true;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/frame/FramesDecoder.java Mon Dec 04 16:54:26 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/frame/FramesDecoder.java Tue Dec 05 10:07:50 2017 +0000
@@ -75,6 +75,7 @@
private int frameType;
private int frameFlags;
private int frameStreamid;
+ private boolean closed;
/**
* Creates Frame Decoder
@@ -109,6 +110,12 @@
* decoded and the FrameProcessor is invoked.
*/
public void decode(ByteBuffer inBoundBuffer) throws IOException {
+ if (closed) {
+ DEBUG_LOGGER.log(Level.DEBUG, "closed: ignoring buffer (%s bytes)",
+ inBoundBuffer.remaining());
+ inBoundBuffer.position(inBoundBuffer.limit());
+ return;
+ }
int remaining = inBoundBuffer.remaining();
DEBUG_LOGGER.log(Level.DEBUG, "decodes: %d", remaining);
if (remaining > 0) {
@@ -290,6 +297,20 @@
return res;
}
+ public void close(String msg) {
+ closed = true;
+ tailBuffers.clear();
+ int bytes = tailSize;
+ ByteBuffer b = currentBuffer;
+ if (b != null) {
+ bytes += b.remaining();
+ b.position(b.limit());
+ }
+ tailSize = 0;
+ currentBuffer = null;
+ DEBUG_LOGGER.log(Level.DEBUG, "closed %s, ignoring %d bytes", msg, bytes);
+ }
+
public void skipBytes(int bytecount) {
while (bytecount > 0) {
int remaining = currentBuffer.remaining();