src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java
branchhttp-client-branch
changeset 55942 8d4770c22b63
parent 55909 583695a0ed6a
child 55946 cfa4f84b7fcc
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java	Sat Dec 02 17:40:57 2017 +0000
@@ -238,10 +238,7 @@
         public void incoming(List<ByteBuffer> buffers, boolean complete) {
             debugr.log(Level.DEBUG, () -> "Adding " + Utils.remaining(buffers)
                         + " bytes to read buffer");
-            addToReadBuf(buffers);
-            if (complete) {
-                this.completing = true;
-            }
+            addToReadBuf(buffers, complete);
             scheduler.runOrSchedule();
         }
 
@@ -269,7 +266,7 @@
         }
 
         // readBuf is kept ready for reading outside of this method
-        private void addToReadBuf(List<ByteBuffer> buffers) {
+        private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
             synchronized (readBufferLock) {
                 for (ByteBuffer buf : buffers) {
                     readBuf.compact();
@@ -278,6 +275,9 @@
                     readBuf.put(buf);
                     readBuf.flip();
                 }
+                if (complete) {
+                    this.completing = complete;
+                }
             }
         }
 
@@ -297,19 +297,32 @@
             try {
                 debugr.log(Level.DEBUG, () -> "processData: " + readBuf.remaining()
                            + " bytes to unwrap "
-                           + states(handshakeState));
-
-                while (readBuf.hasRemaining()) {
+                           + states(handshakeState)
+                           + ", " + engine.getHandshakeStatus());
+                int len;
+                boolean completing = false;
+                while ((len = readBuf.remaining()) > 0) {
                     boolean handshaking = false;
                     try {
                         EngineResult result;
                         synchronized (readBufferLock) {
+                            completing = this.completing;
                             result = unwrapBuffer(readBuf);
                             debugr.log(Level.DEBUG, "Unwrapped: %s", result.result);
                         }
+                        if (result.bytesProduced() > 0) {
+                            debugr.log(Level.DEBUG, "sending %d", result.bytesProduced());
+                            count.addAndGet(result.bytesProduced());
+                            outgoing(result.destBuffer, false);
+                        }
                         if (result.status() == Status.BUFFER_UNDERFLOW) {
                             debugr.log(Level.DEBUG, "BUFFER_UNDERFLOW");
-                            return;
+                            // not enough data in the read buffer...
+                            synchronized (readBufferLock) {
+                                // check if we have received some data
+                                if (readBuf.remaining() > len) continue;
+                                return;
+                            }
                         }
                         if (completing && result.status() == Status.CLOSED) {
                             debugr.log(Level.DEBUG, "Closed: completing");
@@ -328,11 +341,6 @@
                                 resumeActivity();
                             }
                         }
-                        if (result.bytesProduced() > 0) {
-                            debugr.log(Level.DEBUG, "sending %d", result.bytesProduced());
-                            count.addAndGet(result.bytesProduced());
-                            outgoing(result.destBuffer, false);
-                        }
                     } catch (IOException ex) {
                         errorCommon(ex);
                         handleError(ex);
@@ -340,6 +348,11 @@
                     if (handshaking && !completing)
                         return;
                 }
+                if (!completing) {
+                    synchronized (readBufferLock) {
+                        completing = this.completing && !readBuf.hasRemaining();
+                    }
+                }
                 if (completing) {
                     debugr.log(Level.DEBUG, "completing");
                     // Complete the alpnCF, if not already complete, regardless of
@@ -443,6 +456,7 @@
             assert complete ? buffers ==  Utils.EMPTY_BB_LIST : true;
             assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
             if (complete) {
+                debugw.log(Level.DEBUG, "adding SENTINEL");
                 writeList.add(SENTINEL);
             } else {
                 writeList.addAll(buffers);
@@ -507,7 +521,8 @@
 
             try {
                 debugw.log(Level.DEBUG, () -> "processData(" + Utils.remaining(writeList) + ")");
-                while (Utils.remaining(writeList) > 0 || hsTriggered()) {
+                while (Utils.remaining(writeList) > 0 || hsTriggered()
+                        || needWrap()) {
                     ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
                     EngineResult result = wrapBuffers(outbufs);
                     debugw.log(Level.DEBUG, "wrapBuffer returned %s", result.result);
@@ -538,6 +553,7 @@
                         if (writeList.isEmpty() && !result.needUnwrap()) {
                             writer.addData(HS_TRIGGER);
                         }
+                        if (needWrap()) continue;
                         return;
                     }
                 }
@@ -551,11 +567,18 @@
                     outgoing(Utils.EMPTY_BB_LIST, true);
                     return;
                 }
+                if (writeList.isEmpty() && needWrap()) {
+                    writer.addData(HS_TRIGGER);
+                }
             } catch (Throwable ex) {
                 handleError(ex);
             }
         }
 
+        private boolean needWrap() {
+            return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP;
+        }
+
         private void sendResultBytes(EngineResult result) {
             if (result.bytesProduced() > 0) {
                 debugw.log(Level.DEBUG, "Sending %d bytes downstream",
@@ -678,13 +701,19 @@
     private void executeTasks(List<Runnable> tasks) {
         exec.execute(() -> {
             handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
-            try {
-                tasks.forEach((r) -> {
-                    r.run();
-                });
-            } catch (Throwable t) {
-                handleError(t);
-            }
+            List<Runnable> nextTasks = tasks;
+            do {
+                try {
+                    nextTasks.forEach((r) -> {
+                        r.run();
+                    });
+                    if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+                        nextTasks = obtainTasks();
+                    } else break;
+                } catch (Throwable t) {
+                    handleError(t);
+                }
+            } while(true);
             handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
             writer.addData(HS_TRIGGER);
             resumeActivity();