http-client-branch: some SSLFlowDelegate fixes from Daniel and Chris http-client-branch
authorchegar
Sat, 03 Mar 2018 09:57:25 +0000
branchhttp-client-branch
changeset 56234 fc391230cf7b
parent 56233 1753108d07b9
child 56235 6218673d7fa0
http-client-branch: some SSLFlowDelegate fixes from Daniel and Chris
src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Sat Mar 03 09:54:31 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Sat Mar 03 09:57:25 2018 +0000
@@ -307,10 +307,10 @@
         // work function where it all happens
         void processData() {
             try {
-                debugr.log(Level.DEBUG, () -> "processData: " + readBuf.remaining()
-                           + " bytes to unwrap "
-                           + states(handshakeState)
-                           + ", " + engine.getHandshakeStatus());
+                debugr.log(Level.DEBUG, () -> "processData:"
+                           + " readBuf remaining:" + readBuf.remaining()
+                           + ", state:" + states(handshakeState)
+                           + ", engine handshake status:" + engine.getHandshakeStatus());
                 int len;
                 boolean complete = false;
                 while ((len = readBuf.remaining()) > 0) {
@@ -344,8 +344,9 @@
                         }
                         if (result.handshaking() && !complete) {
                             debugr.log(Level.DEBUG, "handshaking");
-                            doHandshake(result, READER);
-                            resumeActivity();
+                            if (doHandshake(result, READER)) {
+                                resumeActivity();
+                            }
                             handshaking = true;
                         } else {
                             if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
@@ -507,8 +508,8 @@
         }
 
         protected void onSubscribe() {
-            doHandshake(EngineResult.INIT, INIT);
-            resumeActivity();
+            debugw.log(Level.DEBUG, "onSubscribe initiating handshaking");
+            addData(HS_TRIGGER);  // initiates handshaking
         }
 
         void schedule() {
@@ -550,14 +551,20 @@
             boolean completing = isCompleting();
 
             try {
-                debugw.log(Level.DEBUG, () -> "processData(" + Utils.remaining(writeList) + ")");
-                while (Utils.remaining(writeList) > 0 || hsTriggered()
-                        || needWrap()) {
+                debugw.log(Level.DEBUG, () -> "processData, writeList remaining:"
+                        + Utils.remaining(writeList) + ", hsTriggered:"
+                        + hsTriggered() + ", needWrap:" + needWrap());
+
+                while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) {
                     ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
                     EngineResult result = wrapBuffers(outbufs);
                     debugw.log(Level.DEBUG, "wrapBuffer returned %s", result.result);
 
                     if (result.status() == Status.CLOSED) {
+                        if (!upstreamCompleted) {
+                            upstreamCompleted = true;
+                            upstreamSubscription.cancel();
+                        }
                         if (result.bytesProduced() <= 0)
                             return;
 
@@ -571,7 +578,7 @@
                     boolean handshaking = false;
                     if (result.handshaking()) {
                         debugw.log(Level.DEBUG, "handshaking");
-                        doHandshake(result, WRITER);
+                        doHandshake(result, WRITER);  // ok to ignore return
                         handshaking = true;
                     } else {
                         if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
@@ -582,20 +589,14 @@
                     cleanList(writeList); // tidy up the source list
                     sendResultBytes(result);
                     if (handshaking && !completing) {
-                        if (writeList.isEmpty() && !result.needUnwrap()) {
-                            writer.addData(HS_TRIGGER);
+                        if (needWrap()) {
+                            continue;
+                        } else {
+                            return;
                         }
-                        if (needWrap()) continue;
-                        return;
                     }
                 }
                 if (completing && Utils.remaining(writeList) == 0) {
-                    /*
-                    System.out.println("WRITER DOO 3");
-                    engine.closeOutbound();
-                    EngineResult result = wrapBuffers(Utils.EMPTY_BB_ARRAY);
-                    sendResultBytes(result);
-                    */
                     if (!completed) {
                         completed = true;
                         writeList.clear();
@@ -685,18 +686,19 @@
         writer.stop();
     }
 
-    private void normalStop() {
-        stopOnError(null);
-    }
+    boolean stopped;
 
-    boolean stopped = false;
-
-    synchronized private Void stopOnError(Throwable t) {
+    private synchronized void normalStop() {
         if (stopped)
-            return null;
+            return;
         stopped = true;
         reader.stop();
         writer.stop();
+    }
+
+    private Void stopOnError(Throwable currentlyUnused) {
+        // maybe log, etc
+        normalStop();
         return null;
     }
 
@@ -719,7 +721,7 @@
      */
     private static final int NOT_HANDSHAKING = 0;
     private static final int HANDSHAKING = 1;
-    private static final int INIT = 2;
+
     private static final int DOING_TASKS = 4; // bit added to above state
     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
 
@@ -737,9 +739,6 @@
             case HANDSHAKING:
                 sb.append(" HANDSHAKING ");
                 break;
-            case INIT:
-                sb.append(" INIT ");
-                break;
             default:
                 throw new InternalError();
         }
@@ -756,28 +755,37 @@
     final AtomicInteger handshakeState;
     final ConcurrentLinkedQueue<String> stateList = new ConcurrentLinkedQueue<>();
 
-    private void doHandshake(EngineResult r, int caller) {
-        int s = handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
+    private boolean doHandshake(EngineResult r, int caller) {
+        // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS
+        handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
         stateList.add(r.handshakeStatus().toString());
         stateList.add(Integer.toString(caller));
         switch (r.handshakeStatus()) {
             case NEED_TASK:
+                int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
                 if ((s & DOING_TASKS) > 0) // someone else was doing tasks
-                    return;
+                    return false;
+
+                debug.log(Level.DEBUG, "obtaining and initiating task execution");
                 List<Runnable> tasks = obtainTasks();
                 executeTasks(tasks);
-                break;
+                return false;  // executeTasks will resume activity
             case NEED_WRAP:
-                writer.addData(HS_TRIGGER);
+                if (caller == READER) {
+                    writer.addData(HS_TRIGGER);
+                    return false;
+                }
                 break;
             case NEED_UNWRAP:
             case NEED_UNWRAP_AGAIN:
                 // do nothing else
+                // receiving-side data will trigger unwrap
                 break;
             default:
                 throw new InternalError("Unexpected handshake status:"
                                         + r.handshakeStatus());
         }
+        return true;
     }
 
     private List<Runnable> obtainTasks() {
@@ -790,9 +798,10 @@
     }
 
     private void executeTasks(List<Runnable> tasks) {
+        if (tasks.isEmpty())
+            return;
         exec.execute(() -> {
             try {
-                handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
                 List<Runnable> nextTasks = tasks;
                 do {
                     nextTasks.forEach(Runnable::run);
@@ -803,7 +812,7 @@
                     }
                 } while (true);
                 handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
-                writer.addData(HS_TRIGGER);
+                //writer.addData(HS_TRIGGER);
                 resumeActivity();
             } catch (Throwable t) {
                 handleError(t);
@@ -869,11 +878,6 @@
             this.destBuffer = destBuffer;
         }
 
-        // Special result used to trigger handshaking in constructor
-        static EngineResult INIT =
-            new EngineResult(
-                new SSLEngineResult(SSLEngineResult.Status.OK, HandshakeStatus.NEED_WRAP, 0, 0));
-
         boolean handshaking() {
             HandshakeStatus s = result.getHandshakeStatus();
             return s != HandshakeStatus.FINISHED
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java	Sat Mar 03 09:54:31 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java	Sat Mar 03 09:57:25 2018 +0000
@@ -204,7 +204,7 @@
      * Sometime it might be necessary to complete the downstream subscriber
      * before the upstream completes. For instance, when an SSL server
      * sends a notify_close. In that case we should let the outgoing
-     * complete before upstream us completed.
+     * complete before upstream is completed.
      * @return true, may be overridden by subclasses.
      */
     public boolean closing() {
@@ -217,18 +217,19 @@
             assert Utils.remaining(buffers) == 0;
             boolean closing = closing();
             logger.log(Level.DEBUG,
-                    "completionAcknowledged upstreamCompleted:%s, downstreamCompleted:%s, closing:%s",
-                    upstreamCompleted, downstreamCompleted, closing);
-            if (!upstreamCompleted && !closing)
+                       "completionAcknowledged upstreamCompleted:%s,"
+                       + " downstreamCompleted:%s, closing:%s",
+                       upstreamCompleted, downstreamCompleted, closing);
+            if (!upstreamCompleted && !closing) {
                 throw new IllegalStateException("upstream not completed");
+            }
             completionAcknowledged = true;
         } else {
             logger.log(Level.DEBUG, () -> "Adding "
-                                   + Utils.remaining(buffers)
-                                   + " to outputQ queue");
+                    + Utils.remaining(buffers) + " to outputQ queue");
             outputQ.add(buffers);
         }
-        logger.log(Level.DEBUG, () -> "pushScheduler "
+        logger.log(Level.DEBUG, () -> "pushScheduler"
                    + (pushScheduler.isStopped() ? " is stopped!" : " is alive"));
         pushScheduler.runOrSchedule();
     }
@@ -281,7 +282,8 @@
             Throwable error = errorRef.get();
             if (error != null) {
                 synchronized(this) {
-                    if (downstreamCompleted) return;
+                    if (downstreamCompleted)
+                        return;
                     downstreamCompleted = true;
                 }
                 logger.log(Level.DEBUG,
@@ -319,9 +321,11 @@
 
     void upstreamWindowUpdate() {
         long downstreamQueueSize = outputQ.size();
-        long n = upstreamWindowUpdate(upstreamWindow.get(), downstreamQueueSize);
-        logger.log(Level.DEBUG, "upstreamWindowUpdate, downstreamQueueSize:%d, upstreamWindow:%d",
-                downstreamQueueSize, upstreamWindow.get());
+        long upstreamWindowSize = upstreamWindow.get();
+        long n = upstreamWindowUpdate(upstreamWindowSize, downstreamQueueSize);
+        logger.log(Level.DEBUG, "upstreamWindowUpdate, "
+                        + "downstreamQueueSize:%d, upstreamWindow:%d",
+                        downstreamQueueSize, upstreamWindowSize);
         if (n > 0)
             upstreamRequest(n);
     }