--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Tue Jun 19 09:13:58 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Tue Jun 19 09:32:13 2018 +0100
@@ -46,6 +46,7 @@
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import java.util.function.IntBinaryOperator;
/**
* Implements SSL using two SubscriberWrappers.
@@ -91,6 +92,7 @@
private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
// When handshake is in progress trying to wrap may produce no bytes.
private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
+ private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
final Executor exec;
final Reader reader;
@@ -150,7 +152,8 @@
// Writer to the downWriter.
connect(downReader, downWriter);
- //Monitor.add(this::monitor);
+ if (monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true")))
+ Monitor.add(this::monitor);
}
/**
@@ -864,7 +867,12 @@
private static final int NOT_HANDSHAKING = 0;
private static final int HANDSHAKING = 1;
- private static final int DOING_TASKS = 4; // bit added to above state
+ // Bit flags
+ // a thread is currently executing tasks
+ private static final int DOING_TASKS = 4;
+ // a thread wants to execute tasks, while another thread is executing
+ private static final int REQUESTING_TASKS = 8;
+ private static final int TASK_BITS = 12; // Both bits
private static final int READER = 1;
private static final int WRITER = 2;
@@ -872,7 +880,7 @@
private static String states(AtomicInteger state) {
int s = state.get();
StringBuilder sb = new StringBuilder();
- int x = s & ~DOING_TASKS;
+ int x = s & ~TASK_BITS;
switch (x) {
case NOT_HANDSHAKING:
sb.append(" NOT_HANDSHAKING ");
@@ -885,6 +893,8 @@
}
if ((s & DOING_TASKS) > 0)
sb.append("|DOING_TASKS");
+ if ((s & REQUESTING_TASKS) > 0)
+ sb.append("|REQUESTING_TASKS");
return sb.toString();
}
@@ -897,18 +907,37 @@
final ConcurrentLinkedQueue<String> stateList =
debug.on() ? new ConcurrentLinkedQueue<>() : null;
+ // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS
+ // depending on previous value
+ private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> {
+ if ((current & DOING_TASKS) == 0)
+ return DOING_TASKS | (current & HANDSHAKING);
+ else
+ return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING);
+ };
+
+ // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set
+ // clears bits if not.
+ private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> {
+ if ((current & REQUESTING_TASKS) != 0)
+ return DOING_TASKS | (current & HANDSHAKING);
+ // clear both bits
+ return (current & HANDSHAKING);
+ };
+
private boolean doHandshake(EngineResult r, int caller) {
- // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS
- handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
+ // unconditionally sets the HANDSHAKING bit, while preserving task bits
+ handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS));
if (stateList != null && debug.on()) {
stateList.add(r.handshakeStatus().toString());
stateList.add(Integer.toString(caller));
}
switch (r.handshakeStatus()) {
case NEED_TASK:
- int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
- if ((s & DOING_TASKS) > 0) // someone else was doing tasks
+ int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS);
+ if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks
return false;
+ }
if (debug.on()) debug.log("obtaining and initiating task execution");
List<Runnable> tasks = obtainTasks();
@@ -942,20 +971,25 @@
}
private void executeTasks(List<Runnable> tasks) {
- if (tasks.isEmpty())
- return;
exec.execute(() -> {
try {
List<Runnable> nextTasks = tasks;
+ if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size()));
do {
nextTasks.forEach(Runnable::run);
if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
nextTasks = obtainTasks();
} else {
+ int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS);
+ if ((s & DOING_TASKS) != 0) {
+ if (debug.on()) debug.log("re-running tasks (B)");
+ nextTasks = obtainTasks();
+ continue;
+ }
break;
}
} while (true);
- handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
+ if (debug.on()) debug.log("finished task execution");
resumeActivity();
} catch (Throwable t) {
handleError(t);