http-client-branch: Fix SSLFlowDelegate race http-client-branch
authormichaelm
Tue, 19 Jun 2018 09:32:13 +0100
branchhttp-client-branch
changeset 56772 38a290eb60eb
parent 56771 73a6534bce94
child 56773 02ab5f60fbcd
http-client-branch: Fix SSLFlowDelegate race
src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Tue Jun 19 09:13:58 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Tue Jun 19 09:32:13 2018 +0100
@@ -46,6 +46,7 @@
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+import java.util.function.IntBinaryOperator;
 
 /**
  * Implements SSL using two SubscriberWrappers.
@@ -91,6 +92,7 @@
     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
     // When handshake is in progress trying to wrap may produce no bytes.
     private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
+    private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
 
     final Executor exec;
     final Reader reader;
@@ -150,7 +152,8 @@
         // Writer to the downWriter.
         connect(downReader, downWriter);
 
-        //Monitor.add(this::monitor);
+        if (monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true")))
+            Monitor.add(this::monitor);
     }
 
     /**
@@ -864,7 +867,12 @@
     private static final int NOT_HANDSHAKING = 0;
     private static final int HANDSHAKING = 1;
 
-    private static final int DOING_TASKS = 4; // bit added to above state
+    // Bit flags
+    // a thread is currently executing tasks
+    private static final int DOING_TASKS = 4;
+    // a thread wants to execute tasks, while another thread is executing
+    private static final int REQUESTING_TASKS = 8;
+    private static final int TASK_BITS = 12; // Both bits
 
     private static final int READER = 1;
     private static final int WRITER = 2;
@@ -872,7 +880,7 @@
     private static String states(AtomicInteger state) {
         int s = state.get();
         StringBuilder sb = new StringBuilder();
-        int x = s & ~DOING_TASKS;
+        int x = s & ~TASK_BITS;
         switch (x) {
             case NOT_HANDSHAKING:
                 sb.append(" NOT_HANDSHAKING ");
@@ -885,6 +893,8 @@
         }
         if ((s & DOING_TASKS) > 0)
             sb.append("|DOING_TASKS");
+        if ((s & REQUESTING_TASKS) > 0)
+            sb.append("|REQUESTING_TASKS");
         return sb.toString();
     }
 
@@ -897,18 +907,37 @@
     final ConcurrentLinkedQueue<String> stateList =
             debug.on() ? new ConcurrentLinkedQueue<>() : null;
 
+    // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS
+    // depending on previous value
+    private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> {
+        if ((current & DOING_TASKS) == 0)
+            return DOING_TASKS | (current & HANDSHAKING);
+        else
+            return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING);
+    };
+
+    // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set
+    // clears bits if not.
+    private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> {
+        if ((current & REQUESTING_TASKS) != 0)
+            return DOING_TASKS | (current & HANDSHAKING);
+        // clear both bits
+        return (current & HANDSHAKING);
+    };
+
     private boolean doHandshake(EngineResult r, int caller) {
-        // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS
-        handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
+        // unconditionally sets the HANDSHAKING bit, while preserving task bits
+        handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS));
         if (stateList != null && debug.on()) {
             stateList.add(r.handshakeStatus().toString());
             stateList.add(Integer.toString(caller));
         }
         switch (r.handshakeStatus()) {
             case NEED_TASK:
-                int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
-                if ((s & DOING_TASKS) > 0) // someone else was doing tasks
+                int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS);
+                if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks
                     return false;
+                }
 
                 if (debug.on()) debug.log("obtaining and initiating task execution");
                 List<Runnable> tasks = obtainTasks();
@@ -942,20 +971,25 @@
     }
 
     private void executeTasks(List<Runnable> tasks) {
-        if (tasks.isEmpty())
-            return;
         exec.execute(() -> {
             try {
                 List<Runnable> nextTasks = tasks;
+                if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size()));
                 do {
                     nextTasks.forEach(Runnable::run);
                     if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
                         nextTasks = obtainTasks();
                     } else {
+                        int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS);
+                        if ((s & DOING_TASKS) != 0) {
+                            if (debug.on()) debug.log("re-running tasks (B)");
+                            nextTasks = obtainTasks();
+                            continue;
+                        }
                         break;
                     }
                 } while (true);
-                handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
+                if (debug.on()) debug.log("finished task execution");
                 resumeActivity();
             } catch (Throwable t) {
                 handleError(t);