# HG changeset patch # User michaelm # Date 1529397133 -3600 # Node ID 38a290eb60eb48eeb01e80bf143209780a449fb3 # Parent 73a6534bce9414f3e3d14131e7fe82cf1f2f6720 http-client-branch: Fix SSLFlowDelegate race diff -r 73a6534bce94 -r 38a290eb60eb src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Tue Jun 19 09:13:58 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Tue Jun 19 09:32:13 2018 +0100 @@ -46,6 +46,7 @@ import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import java.util.function.IntBinaryOperator; /** * Implements SSL using two SubscriberWrappers. @@ -91,6 +92,7 @@ private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0); // When handshake is in progress trying to wrap may produce no bytes. private static final ByteBuffer NOTHING = ByteBuffer.allocate(0); + private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate"); final Executor exec; final Reader reader; @@ -150,7 +152,8 @@ // Writer to the downWriter. connect(downReader, downWriter); - //Monitor.add(this::monitor); + if (monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true"))) + Monitor.add(this::monitor); } /** @@ -864,7 +867,12 @@ private static final int NOT_HANDSHAKING = 0; private static final int HANDSHAKING = 1; - private static final int DOING_TASKS = 4; // bit added to above state + // Bit flags + // a thread is currently executing tasks + private static final int DOING_TASKS = 4; + // a thread wants to execute tasks, while another thread is executing + private static final int REQUESTING_TASKS = 8; + private static final int TASK_BITS = 12; // Both bits private static final int READER = 1; private static final int WRITER = 2; @@ -872,7 +880,7 @@ private static String states(AtomicInteger state) { int s = state.get(); StringBuilder sb = new StringBuilder(); - int x = s & ~DOING_TASKS; + int x = s & ~TASK_BITS; switch (x) { case NOT_HANDSHAKING: sb.append(" NOT_HANDSHAKING "); @@ -885,6 +893,8 @@ } if ((s & DOING_TASKS) > 0) sb.append("|DOING_TASKS"); + if ((s & REQUESTING_TASKS) > 0) + sb.append("|REQUESTING_TASKS"); return sb.toString(); } @@ -897,18 +907,37 @@ final ConcurrentLinkedQueue stateList = debug.on() ? new ConcurrentLinkedQueue<>() : null; + // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS + // depending on previous value + private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> { + if ((current & DOING_TASKS) == 0) + return DOING_TASKS | (current & HANDSHAKING); + else + return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING); + }; + + // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set + // clears bits if not. + private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> { + if ((current & REQUESTING_TASKS) != 0) + return DOING_TASKS | (current & HANDSHAKING); + // clear both bits + return (current & HANDSHAKING); + }; + private boolean doHandshake(EngineResult r, int caller) { - // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS - handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS)); + // unconditionally sets the HANDSHAKING bit, while preserving task bits + handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS)); if (stateList != null && debug.on()) { stateList.add(r.handshakeStatus().toString()); stateList.add(Integer.toString(caller)); } switch (r.handshakeStatus()) { case NEED_TASK: - int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS); - if ((s & DOING_TASKS) > 0) // someone else was doing tasks + int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS); + if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks return false; + } if (debug.on()) debug.log("obtaining and initiating task execution"); List tasks = obtainTasks(); @@ -942,20 +971,25 @@ } private void executeTasks(List tasks) { - if (tasks.isEmpty()) - return; exec.execute(() -> { try { List nextTasks = tasks; + if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size())); do { nextTasks.forEach(Runnable::run); if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { nextTasks = obtainTasks(); } else { + int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS); + if ((s & DOING_TASKS) != 0) { + if (debug.on()) debug.log("re-running tasks (B)"); + nextTasks = obtainTasks(); + continue; + } break; } } while (true); - handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS); + if (debug.on()) debug.log("finished task execution"); resumeActivity(); } catch (Throwable t) { handleError(t);