http-client-branch: SSLFlowDelegate::cleanList should not remove the final SENTINEL. HttpResponseInputStream.close should wakeup the reader, if any. http-client-branch
authordfuchs
Tue, 05 Dec 2017 10:07:50 +0000
branchhttp-client-branch
changeset 55950 5e1707e5a254
parent 55948 33ffdf2f703e
child 55953 10b34c929b4f
http-client-branch: SSLFlowDelegate::cleanList should not remove the final SENTINEL. HttpResponseInputStream.close should wakeup the reader, if any.
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/frame/FramesDecoder.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Mon Dec 04 16:54:26 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Tue Dec 05 10:07:50 2017 +0000
@@ -554,6 +554,7 @@
         if (frame instanceof MalformedFrame) {
             Log.logError(((MalformedFrame) frame).getMessage());
             if (streamid == 0) {
+                framesDecoder.close("Malformed frame on stream 0");
                 protocolError(((MalformedFrame) frame).getErrorCode(),
                         ((MalformedFrame) frame).getMessage());
             } else {
@@ -568,6 +569,8 @@
         } else {
             if (frame instanceof SettingsFrame) {
                 // The stream identifier for a SETTINGS frame MUST be zero
+                framesDecoder.close(
+                        "The stream identifier for a SETTINGS frame MUST be zero");
                 protocolError(GoAwayFrame.PROTOCOL_ERROR);
                 return;
             }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Mon Dec 04 16:54:26 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Tue Dec 05 10:07:50 2017 +0000
@@ -447,10 +447,14 @@
                 subscription = null;
             }
             // s will be null if already completed
-            if (s != null) {
-                 s.cancel();
+            try {
+                if (s != null) {
+                    s.cancel();
+                }
+            } finally {
+                buffers.offer(LAST_LIST);
+                super.close();
             }
-            super.close();
         }
 
     }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java	Mon Dec 04 16:54:26 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java	Tue Dec 05 10:07:50 2017 +0000
@@ -300,13 +300,13 @@
                            + states(handshakeState)
                            + ", " + engine.getHandshakeStatus());
                 int len;
-                boolean completing = false;
+                boolean complete = false;
                 while ((len = readBuf.remaining()) > 0) {
                     boolean handshaking = false;
                     try {
                         EngineResult result;
                         synchronized (readBufferLock) {
-                            completing = this.completing;
+                            complete = this.completing;
                             result = unwrapBuffer(readBuf);
                             debugr.log(Level.DEBUG, "Unwrapped: %s", result.result);
                         }
@@ -325,12 +325,12 @@
                                 return;
                             }
                         }
-                        if (completing && result.status() == Status.CLOSED) {
+                        if (complete && result.status() == Status.CLOSED) {
                             debugr.log(Level.DEBUG, "Closed: completing");
                             outgoing(Utils.EMPTY_BB_LIST, true);
                             return;
                         }
-                        if (result.handshaking() && !completing) {
+                        if (result.handshaking() && !complete) {
                             debugr.log(Level.DEBUG, "handshaking");
                             doHandshake(result, READER);
                             resumeActivity();
@@ -346,15 +346,15 @@
                         errorCommon(ex);
                         handleError(ex);
                     }
-                    if (handshaking && !completing)
+                    if (handshaking && !complete)
                         return;
                 }
-                if (!completing) {
+                if (!complete) {
                     synchronized (readBufferLock) {
-                        completing = this.completing && !readBuf.hasRemaining();
+                        complete = this.completing && !readBuf.hasRemaining();
                     }
                 }
-                if (completing) {
+                if (complete) {
                     debugr.log(Level.DEBUG, "completing");
                     // Complete the alpnCF, if not already complete, regardless of
                     // whether or not the ALPN is available, there will be no more
@@ -441,6 +441,8 @@
         final List<ByteBuffer> writeList;
         final System.Logger debugw =
             Utils.getDebugLogger(this::dbgString, DEBUG);
+        volatile boolean completing;
+        boolean completed; // only accessed in processData
 
         class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
             @Override public void run() { processData(); }
@@ -458,6 +460,7 @@
             assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
             if (complete) {
                 debugw.log(Level.DEBUG, "adding SENTINEL");
+                completing = true;
                 writeList.add(SENTINEL);
             } else {
                 writeList.addAll(buffers);
@@ -492,12 +495,7 @@
         }
 
         private boolean isCompleting() {
-            synchronized(writeList) {
-                int lastIndex = writeList.size() - 1;
-                if (lastIndex < 0)
-                    return false;
-                return writeList.get(lastIndex) == SENTINEL;
-            }
+            return completing;
         }
 
         @Override
@@ -532,9 +530,11 @@
                         if (result.bytesProduced() <= 0)
                             return;
 
-                        completing = true;
-                        // There could still be some outgoing data in outbufs.
-                        writeList.add(SENTINEL);
+                        if (!completing && !completed) {
+                            completing = this.completing = true;
+                            // There could still be some outgoing data in outbufs.
+                            writeList.add(SENTINEL);
+                        }
                     }
 
                     boolean handshaking = false;
@@ -565,7 +565,11 @@
                     EngineResult result = wrapBuffers(Utils.EMPTY_BB_ARRAY);
                     sendResultBytes(result);
                     */
-                    outgoing(Utils.EMPTY_BB_LIST, true);
+                    if (!completed) {
+                        completed = true;
+                        writeList.clear();
+                        outgoing(Utils.EMPTY_BB_LIST, true);
+                    }
                     return;
                 }
                 if (writeList.isEmpty() && needWrap()) {
@@ -615,7 +619,7 @@
             Iterator<ByteBuffer> iter = l.iterator();
             while (iter.hasNext()) {
                 ByteBuffer b = iter.next();
-                if (!b.hasRemaining()) {
+                if (!b.hasRemaining() && b != SENTINEL) {
                     iter.remove();
                 }
             }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java	Mon Dec 04 16:54:26 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java	Tue Dec 05 10:07:50 2017 +0000
@@ -257,6 +257,7 @@
                 subscribed = subscribedDone;
             }
             if (subscribed) {
+                debug.log(Level.DEBUG, "DelegateWrapper: completing subscriber");
                 delegate.onComplete();
             } else {
                 debug.log(Level.DEBUG,
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java	Mon Dec 04 16:54:26 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java	Tue Dec 05 10:07:50 2017 +0000
@@ -211,8 +211,11 @@
         Objects.requireNonNull(buffers);
         if (complete) {
             assert Utils.remaining(buffers) == 0;
-            logger.log(Level.DEBUG, "completionAcknowledged");
-            if (!upstreamCompleted && !closing())
+            boolean closing = closing();
+            logger.log(Level.DEBUG,
+                    "completionAcknowledged upstreamCompleted:%s, downstreamCompleted:%s, closing:%s",
+                    upstreamCompleted, downstreamCompleted, closing);
+            if (!upstreamCompleted && !closing)
                 throw new IllegalStateException("upstream not completed");
             completionAcknowledged = true;
         } else {
@@ -417,6 +420,7 @@
             return;
         }
         if (completionAcknowledged) {
+            logger.log(Level.DEBUG, "calling downstreamSubscriber.onComplete()");
             downstreamSubscriber.onComplete();
             // Fix me subscriber.onComplete.run();
             downstreamCompleted = true;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/frame/FramesDecoder.java	Mon Dec 04 16:54:26 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/frame/FramesDecoder.java	Tue Dec 05 10:07:50 2017 +0000
@@ -75,6 +75,7 @@
     private int frameType;
     private int frameFlags;
     private int frameStreamid;
+    private boolean closed;
 
     /**
      * Creates Frame Decoder
@@ -109,6 +110,12 @@
      * decoded and the FrameProcessor is invoked.
      */
     public void decode(ByteBuffer inBoundBuffer) throws IOException {
+        if (closed) {
+            DEBUG_LOGGER.log(Level.DEBUG, "closed: ignoring buffer (%s bytes)",
+                    inBoundBuffer.remaining());
+            inBoundBuffer.position(inBoundBuffer.limit());
+            return;
+        }
         int remaining = inBoundBuffer.remaining();
         DEBUG_LOGGER.log(Level.DEBUG, "decodes: %d", remaining);
         if (remaining > 0) {
@@ -290,6 +297,20 @@
         return res;
     }
 
+    public void close(String msg) {
+        closed = true;
+        tailBuffers.clear();
+        int bytes = tailSize;
+        ByteBuffer b = currentBuffer;
+        if (b != null) {
+            bytes += b.remaining();
+            b.position(b.limit());
+        }
+        tailSize = 0;
+        currentBuffer = null;
+        DEBUG_LOGGER.log(Level.DEBUG, "closed %s, ignoring %d bytes", msg, bytes);
+    }
+
     public void skipBytes(int bytecount) {
         while (bytecount > 0) {
             int remaining = currentBuffer.remaining();