--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Fri Dec 01 19:25:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Sat Dec 02 17:40:57 2017 +0000
@@ -238,10 +238,7 @@
public void incoming(List<ByteBuffer> buffers, boolean complete) {
debugr.log(Level.DEBUG, () -> "Adding " + Utils.remaining(buffers)
+ " bytes to read buffer");
- addToReadBuf(buffers);
- if (complete) {
- this.completing = true;
- }
+ addToReadBuf(buffers, complete);
scheduler.runOrSchedule();
}
@@ -269,7 +266,7 @@
}
// readBuf is kept ready for reading outside of this method
- private void addToReadBuf(List<ByteBuffer> buffers) {
+ private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
synchronized (readBufferLock) {
for (ByteBuffer buf : buffers) {
readBuf.compact();
@@ -278,6 +275,9 @@
readBuf.put(buf);
readBuf.flip();
}
+ if (complete) {
+ this.completing = complete;
+ }
}
}
@@ -297,19 +297,32 @@
try {
debugr.log(Level.DEBUG, () -> "processData: " + readBuf.remaining()
+ " bytes to unwrap "
- + states(handshakeState));
-
- while (readBuf.hasRemaining()) {
+ + states(handshakeState)
+ + ", " + engine.getHandshakeStatus());
+ int len;
+ boolean completing = false;
+ while ((len = readBuf.remaining()) > 0) {
boolean handshaking = false;
try {
EngineResult result;
synchronized (readBufferLock) {
+ completing = this.completing;
result = unwrapBuffer(readBuf);
debugr.log(Level.DEBUG, "Unwrapped: %s", result.result);
}
+ if (result.bytesProduced() > 0) {
+ debugr.log(Level.DEBUG, "sending %d", result.bytesProduced());
+ count.addAndGet(result.bytesProduced());
+ outgoing(result.destBuffer, false);
+ }
if (result.status() == Status.BUFFER_UNDERFLOW) {
debugr.log(Level.DEBUG, "BUFFER_UNDERFLOW");
- return;
+ // not enough data in the read buffer...
+ synchronized (readBufferLock) {
+ // check if we have received some data
+ if (readBuf.remaining() > len) continue;
+ return;
+ }
}
if (completing && result.status() == Status.CLOSED) {
debugr.log(Level.DEBUG, "Closed: completing");
@@ -328,11 +341,6 @@
resumeActivity();
}
}
- if (result.bytesProduced() > 0) {
- debugr.log(Level.DEBUG, "sending %d", result.bytesProduced());
- count.addAndGet(result.bytesProduced());
- outgoing(result.destBuffer, false);
- }
} catch (IOException ex) {
errorCommon(ex);
handleError(ex);
@@ -340,6 +348,11 @@
if (handshaking && !completing)
return;
}
+ if (!completing) {
+ synchronized (readBufferLock) {
+ completing = this.completing && !readBuf.hasRemaining();
+ }
+ }
if (completing) {
debugr.log(Level.DEBUG, "completing");
// Complete the alpnCF, if not already complete, regardless of
@@ -443,6 +456,7 @@
assert complete ? buffers == Utils.EMPTY_BB_LIST : true;
assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
if (complete) {
+ debugw.log(Level.DEBUG, "adding SENTINEL");
writeList.add(SENTINEL);
} else {
writeList.addAll(buffers);
@@ -507,7 +521,8 @@
try {
debugw.log(Level.DEBUG, () -> "processData(" + Utils.remaining(writeList) + ")");
- while (Utils.remaining(writeList) > 0 || hsTriggered()) {
+ while (Utils.remaining(writeList) > 0 || hsTriggered()
+ || needWrap()) {
ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
EngineResult result = wrapBuffers(outbufs);
debugw.log(Level.DEBUG, "wrapBuffer returned %s", result.result);
@@ -538,6 +553,7 @@
if (writeList.isEmpty() && !result.needUnwrap()) {
writer.addData(HS_TRIGGER);
}
+ if (needWrap()) continue;
return;
}
}
@@ -551,11 +567,18 @@
outgoing(Utils.EMPTY_BB_LIST, true);
return;
}
+ if (writeList.isEmpty() && needWrap()) {
+ writer.addData(HS_TRIGGER);
+ }
} catch (Throwable ex) {
handleError(ex);
}
}
+ private boolean needWrap() {
+ return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP;
+ }
+
private void sendResultBytes(EngineResult result) {
if (result.bytesProduced() > 0) {
debugw.log(Level.DEBUG, "Sending %d bytes downstream",
@@ -678,13 +701,19 @@
private void executeTasks(List<Runnable> tasks) {
exec.execute(() -> {
handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
- try {
- tasks.forEach((r) -> {
- r.run();
- });
- } catch (Throwable t) {
- handleError(t);
- }
+ List<Runnable> nextTasks = tasks;
+ do {
+ try {
+ nextTasks.forEach((r) -> {
+ r.run();
+ });
+ if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+ nextTasks = obtainTasks();
+ } else break;
+ } catch (Throwable t) {
+ handleError(t);
+ }
+ } while(true);
handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
writer.addData(HS_TRIGGER);
resumeActivity();