--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Sat Mar 03 09:54:31 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Sat Mar 03 09:57:25 2018 +0000
@@ -307,10 +307,10 @@
// work function where it all happens
void processData() {
try {
- debugr.log(Level.DEBUG, () -> "processData: " + readBuf.remaining()
- + " bytes to unwrap "
- + states(handshakeState)
- + ", " + engine.getHandshakeStatus());
+ debugr.log(Level.DEBUG, () -> "processData:"
+ + " readBuf remaining:" + readBuf.remaining()
+ + ", state:" + states(handshakeState)
+ + ", engine handshake status:" + engine.getHandshakeStatus());
int len;
boolean complete = false;
while ((len = readBuf.remaining()) > 0) {
@@ -344,8 +344,9 @@
}
if (result.handshaking() && !complete) {
debugr.log(Level.DEBUG, "handshaking");
- doHandshake(result, READER);
- resumeActivity();
+ if (doHandshake(result, READER)) {
+ resumeActivity();
+ }
handshaking = true;
} else {
if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
@@ -507,8 +508,8 @@
}
protected void onSubscribe() {
- doHandshake(EngineResult.INIT, INIT);
- resumeActivity();
+ debugw.log(Level.DEBUG, "onSubscribe initiating handshaking");
+ addData(HS_TRIGGER); // initiates handshaking
}
void schedule() {
@@ -550,14 +551,20 @@
boolean completing = isCompleting();
try {
- debugw.log(Level.DEBUG, () -> "processData(" + Utils.remaining(writeList) + ")");
- while (Utils.remaining(writeList) > 0 || hsTriggered()
- || needWrap()) {
+ debugw.log(Level.DEBUG, () -> "processData, writeList remaining:"
+ + Utils.remaining(writeList) + ", hsTriggered:"
+ + hsTriggered() + ", needWrap:" + needWrap());
+
+ while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) {
ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
EngineResult result = wrapBuffers(outbufs);
debugw.log(Level.DEBUG, "wrapBuffer returned %s", result.result);
if (result.status() == Status.CLOSED) {
+ if (!upstreamCompleted) {
+ upstreamCompleted = true;
+ upstreamSubscription.cancel();
+ }
if (result.bytesProduced() <= 0)
return;
@@ -571,7 +578,7 @@
boolean handshaking = false;
if (result.handshaking()) {
debugw.log(Level.DEBUG, "handshaking");
- doHandshake(result, WRITER);
+ doHandshake(result, WRITER); // ok to ignore return
handshaking = true;
} else {
if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
@@ -582,20 +589,14 @@
cleanList(writeList); // tidy up the source list
sendResultBytes(result);
if (handshaking && !completing) {
- if (writeList.isEmpty() && !result.needUnwrap()) {
- writer.addData(HS_TRIGGER);
+ if (needWrap()) {
+ continue;
+ } else {
+ return;
}
- if (needWrap()) continue;
- return;
}
}
if (completing && Utils.remaining(writeList) == 0) {
- /*
- System.out.println("WRITER DOO 3");
- engine.closeOutbound();
- EngineResult result = wrapBuffers(Utils.EMPTY_BB_ARRAY);
- sendResultBytes(result);
- */
if (!completed) {
completed = true;
writeList.clear();
@@ -685,18 +686,19 @@
writer.stop();
}
- private void normalStop() {
- stopOnError(null);
- }
+ boolean stopped;
- boolean stopped = false;
-
- synchronized private Void stopOnError(Throwable t) {
+ private synchronized void normalStop() {
if (stopped)
- return null;
+ return;
stopped = true;
reader.stop();
writer.stop();
+ }
+
+ private Void stopOnError(Throwable currentlyUnused) {
+ // maybe log, etc
+ normalStop();
return null;
}
@@ -719,7 +721,7 @@
*/
private static final int NOT_HANDSHAKING = 0;
private static final int HANDSHAKING = 1;
- private static final int INIT = 2;
+
private static final int DOING_TASKS = 4; // bit added to above state
private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
@@ -737,9 +739,6 @@
case HANDSHAKING:
sb.append(" HANDSHAKING ");
break;
- case INIT:
- sb.append(" INIT ");
- break;
default:
throw new InternalError();
}
@@ -756,28 +755,37 @@
final AtomicInteger handshakeState;
final ConcurrentLinkedQueue<String> stateList = new ConcurrentLinkedQueue<>();
- private void doHandshake(EngineResult r, int caller) {
- int s = handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
+ private boolean doHandshake(EngineResult r, int caller) {
+ // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS
+ handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
stateList.add(r.handshakeStatus().toString());
stateList.add(Integer.toString(caller));
switch (r.handshakeStatus()) {
case NEED_TASK:
+ int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
if ((s & DOING_TASKS) > 0) // someone else was doing tasks
- return;
+ return false;
+
+ debug.log(Level.DEBUG, "obtaining and initiating task execution");
List<Runnable> tasks = obtainTasks();
executeTasks(tasks);
- break;
+ return false; // executeTasks will resume activity
case NEED_WRAP:
- writer.addData(HS_TRIGGER);
+ if (caller == READER) {
+ writer.addData(HS_TRIGGER);
+ return false;
+ }
break;
case NEED_UNWRAP:
case NEED_UNWRAP_AGAIN:
// do nothing else
+ // receiving-side data will trigger unwrap
break;
default:
throw new InternalError("Unexpected handshake status:"
+ r.handshakeStatus());
}
+ return true;
}
private List<Runnable> obtainTasks() {
@@ -790,9 +798,10 @@
}
private void executeTasks(List<Runnable> tasks) {
+ if (tasks.isEmpty())
+ return;
exec.execute(() -> {
try {
- handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
List<Runnable> nextTasks = tasks;
do {
nextTasks.forEach(Runnable::run);
@@ -803,7 +812,7 @@
}
} while (true);
handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
- writer.addData(HS_TRIGGER);
+ //writer.addData(HS_TRIGGER);
resumeActivity();
} catch (Throwable t) {
handleError(t);
@@ -869,11 +878,6 @@
this.destBuffer = destBuffer;
}
- // Special result used to trigger handshaking in constructor
- static EngineResult INIT =
- new EngineResult(
- new SSLEngineResult(SSLEngineResult.Status.OK, HandshakeStatus.NEED_WRAP, 0, 0));
-
boolean handshaking() {
HandshakeStatus s = result.getHandshakeStatus();
return s != HandshakeStatus.FINISHED
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java Sat Mar 03 09:54:31 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java Sat Mar 03 09:57:25 2018 +0000
@@ -204,7 +204,7 @@
* Sometime it might be necessary to complete the downstream subscriber
* before the upstream completes. For instance, when an SSL server
* sends a notify_close. In that case we should let the outgoing
- * complete before upstream us completed.
+ * complete before upstream is completed.
* @return true, may be overridden by subclasses.
*/
public boolean closing() {
@@ -217,18 +217,19 @@
assert Utils.remaining(buffers) == 0;
boolean closing = closing();
logger.log(Level.DEBUG,
- "completionAcknowledged upstreamCompleted:%s, downstreamCompleted:%s, closing:%s",
- upstreamCompleted, downstreamCompleted, closing);
- if (!upstreamCompleted && !closing)
+ "completionAcknowledged upstreamCompleted:%s,"
+ + " downstreamCompleted:%s, closing:%s",
+ upstreamCompleted, downstreamCompleted, closing);
+ if (!upstreamCompleted && !closing) {
throw new IllegalStateException("upstream not completed");
+ }
completionAcknowledged = true;
} else {
logger.log(Level.DEBUG, () -> "Adding "
- + Utils.remaining(buffers)
- + " to outputQ queue");
+ + Utils.remaining(buffers) + " to outputQ queue");
outputQ.add(buffers);
}
- logger.log(Level.DEBUG, () -> "pushScheduler "
+ logger.log(Level.DEBUG, () -> "pushScheduler"
+ (pushScheduler.isStopped() ? " is stopped!" : " is alive"));
pushScheduler.runOrSchedule();
}
@@ -281,7 +282,8 @@
Throwable error = errorRef.get();
if (error != null) {
synchronized(this) {
- if (downstreamCompleted) return;
+ if (downstreamCompleted)
+ return;
downstreamCompleted = true;
}
logger.log(Level.DEBUG,
@@ -319,9 +321,11 @@
void upstreamWindowUpdate() {
long downstreamQueueSize = outputQ.size();
- long n = upstreamWindowUpdate(upstreamWindow.get(), downstreamQueueSize);
- logger.log(Level.DEBUG, "upstreamWindowUpdate, downstreamQueueSize:%d, upstreamWindow:%d",
- downstreamQueueSize, upstreamWindow.get());
+ long upstreamWindowSize = upstreamWindow.get();
+ long n = upstreamWindowUpdate(upstreamWindowSize, downstreamQueueSize);
+ logger.log(Level.DEBUG, "upstreamWindowUpdate, "
+ + "downstreamQueueSize:%d, upstreamWindow:%d",
+ downstreamQueueSize, upstreamWindowSize);
if (n > 0)
upstreamRequest(n);
}