--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Wed Jun 20 17:15:16 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Wed Jun 20 09:05:57 2018 -0700
@@ -46,6 +46,7 @@
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import java.util.function.IntBinaryOperator;
/**
* Implements SSL using two SubscriberWrappers.
@@ -87,13 +88,18 @@
final Logger debug =
Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
+ private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
+ private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
+ // When handshake is in progress trying to wrap may produce no bytes.
+ private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
+ private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
+
final Executor exec;
final Reader reader;
final Writer writer;
final SSLEngine engine;
final String tubeName; // hack
final CompletableFuture<String> alpnCF; // completes on initial handshake
- final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
volatile boolean close_notify_received;
final CompletableFuture<Void> readerCF;
final CompletableFuture<Void> writerCF;
@@ -146,7 +152,8 @@
// Writer to the downWriter.
connect(downReader, downWriter);
- //Monitor.add(this::monitor);
+ if (monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true")))
+ Monitor.add(this::monitor);
}
/**
@@ -245,13 +252,16 @@
final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
private final class ReaderDownstreamPusher implements Runnable {
- @Override public void run() { processData(); }
+ @Override
+ public void run() {
+ processData();
+ }
}
Reader() {
super();
scheduler = SequentialScheduler.synchronizedScheduler(
- new ReaderDownstreamPusher());
+ new ReaderDownstreamPusher());
this.readBuf = ByteBuffer.allocate(1024);
readBuf.limit(0); // keep in read mode
}
@@ -276,7 +286,7 @@
public void incoming(List<ByteBuffer> buffers, boolean complete) {
if (debugr.on())
debugr.log("Adding %d bytes to read buffer",
- Utils.remaining(buffers));
+ Utils.remaining(buffers));
addToReadBuf(buffers, complete);
scheduler.runOrSchedule(exec);
}
@@ -289,7 +299,7 @@
private void reallocReadBuf() {
int sz = readBuf.capacity();
- ByteBuffer newb = ByteBuffer.allocate(sz*2);
+ ByteBuffer newb = ByteBuffer.allocate(sz * 2);
readBuf.flip();
Utils.copy(readBuf, newb);
readBuf = newb;
@@ -300,7 +310,7 @@
if (readBuf.remaining() > TARGET_BUFSIZE) {
if (debugr.on())
debugr.log("readBuf has more than TARGET_BUFSIZE: %d",
- readBuf.remaining());
+ readBuf.remaining());
return 0;
} else {
return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
@@ -309,6 +319,7 @@
// readBuf is kept ready for reading outside of this method
private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
+ assert Utils.remaining(buffers) > 0 || buffers.isEmpty();
synchronized (readBufferLock) {
for (ByteBuffer buf : buffers) {
readBuf.compact();
@@ -344,14 +355,15 @@
// In this case we need to wait for more bytes than what
// we had before calling unwrap() again.
volatile int minBytesRequired;
+
// work function where it all happens
final void processData() {
try {
if (debugr.on())
debugr.log("processData:"
- + " readBuf remaining:" + readBuf.remaining()
- + ", state:" + states(handshakeState)
- + ", engine handshake status:" + engine.getHandshakeStatus());
+ + " readBuf remaining:" + readBuf.remaining()
+ + ", state:" + states(handshakeState)
+ + ", engine handshake status:" + engine.getHandshakeStatus());
int len;
boolean complete = false;
while (readBuf.remaining() > (len = minBytesRequired)) {
@@ -400,14 +412,13 @@
outgoing(Utils.EMPTY_BB_LIST, true);
return;
}
- if (result.handshaking() && !complete) {
+ if (result.handshaking()) {
+ handshaking = true;
if (debugr.on()) debugr.log("handshaking");
- if (doHandshake(result, READER)) {
- resumeActivity();
- }
- handshaking = true;
+ if (doHandshake(result, READER)) continue; // need unwrap
+ else break; // doHandshake will have triggered the write scheduler if necessary
} else {
- if ((handshakeState.getAndSet(NOT_HANDSHAKING)& ~DOING_TASKS) == HANDSHAKING) {
+ if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
handshaking = false;
applicationBufferSize = engine.getSession().getApplicationBufferSize();
packetBufferSize = engine.getSession().getPacketBufferSize();
@@ -443,12 +454,19 @@
EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
ByteBuffer dst = getAppBuffer();
+ int len = src.remaining();
while (true) {
SSLEngineResult sslResult = engine.unwrap(src, dst);
switch (sslResult.getStatus()) {
case BUFFER_OVERFLOW:
- // may happen only if app size buffer was changed.
- // get it again if app buffer size changed
+ // may happen if app size buffer was changed, or if
+ // our 'adaptiveBufferSize' guess was too small for
+ // the current payload. In that case, update the
+ // value of applicationBufferSize, and allocate a
+ // buffer of that size, which we are sure will be
+ // big enough to decode whatever needs to be
+ // decoded. We will later update adaptiveBufferSize
+ // in OK: below.
int appSize = applicationBufferSize =
engine.getSession().getApplicationBufferSize();
ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
@@ -457,11 +475,26 @@
dst = b;
break;
case CLOSED:
+ assert dst.position() == 0;
return doClosure(new EngineResult(sslResult));
case BUFFER_UNDERFLOW:
// handled implicitly by compaction/reallocation of readBuf
+ assert dst.position() == 0;
return new EngineResult(sslResult);
case OK:
+ int size = dst.position();
+ if (debug.on()) {
+ debugr.log("Decoded " + size + " bytes out of " + len
+ + " into buffer of " + dst.capacity()
+ + " remaining to decode: " + src.remaining());
+ }
+ // if the record payload was bigger than what was originally
+ // allocated, then sets the adaptiveAppBufferSize to size
+ // and we will use that new size as a guess for the next app
+ // buffer.
+ if (size > adaptiveAppBufferSize) {
+ adaptiveAppBufferSize = ((size + 7) >>> 3) << 3;
+ }
dst.flip();
return new EngineResult(sslResult, dst);
}
@@ -662,8 +695,8 @@
}
cleanList(writeList); // tidy up the source list
sendResultBytes(result);
- if (handshaking && !completing) {
- if (needWrap()) {
+ if (handshaking) {
+ if (!completing && needWrap()) {
continue;
} else {
return;
@@ -687,11 +720,30 @@
}
}
+ // The SSLEngine insists on being given a buffer that is at least
+ // SSLSession.getPacketBufferSize() long (usually 16K). If given
+ // a smaller buffer it will go in BUFFER_OVERFLOW, even if it only
+ // has 6 bytes to wrap. Typical usage shows that for GET we
+ // usually produce an average of ~ 100 bytes.
+ // To avoid wasting space, and because allocating and zeroing
+ // 16K buffers for encoding 6 bytes is costly, we are reusing the
+ // same writeBuffer to interact with SSLEngine.wrap().
+ // If the SSLEngine produces less than writeBuffer.capacity() / 2,
+ // then we copy off the bytes to a smaller buffer that we send
+ // downstream. Otherwise, we send the writeBuffer downstream
+ // and will allocate a new one next time.
+ volatile ByteBuffer writeBuffer;
@SuppressWarnings("fallthrough")
EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
+ long len = Utils.remaining(src);
if (debugw.on())
- debugw.log("wrapping " + Utils.remaining(src) + " bytes");
- ByteBuffer dst = getNetBuffer();
+ debugw.log("wrapping " + len + " bytes");
+
+ ByteBuffer dst = writeBuffer;
+ if (dst == null) dst = writeBuffer = getNetBuffer();
+ assert dst.position() == 0 : "buffer position is " + dst.position();
+ assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity();
+
while (true) {
SSLEngineResult sslResult = engine.wrap(src, dst);
if (debugw.on()) debugw.log("SSLResult: " + sslResult);
@@ -702,7 +754,7 @@
if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
int netSize = packetBufferSize
= engine.getSession().getPacketBufferSize();
- ByteBuffer b = ByteBuffer.allocate(netSize + dst.position());
+ ByteBuffer b = writeBuffer = ByteBuffer.allocate(netSize + dst.position());
dst.flip();
b.put(dst);
dst = b;
@@ -712,11 +764,27 @@
// fallthrough. There could be some remaining data in dst.
// CLOSED will be handled by the caller.
case OK:
- dst.flip();
- final ByteBuffer dest = dst;
+ final ByteBuffer dest;
+ if (dst.position() == 0) {
+ dest = NOTHING; // can happen if handshake is in progress
+ } else if (dst.position() < dst.capacity() / 2) {
+ // less than half the buffer was used.
+ // copy off the bytes to a smaller buffer, and keep
+ // the writeBuffer for next time.
+ dst.flip();
+ dest = Utils.copyAligned(dst);
+ dst.clear();
+ } else {
+ // more than half the buffer was used.
+ // just send that buffer downstream, and we will
+ // get a new writeBuffer next time it is needed.
+ dst.flip();
+ dest = dst;
+ writeBuffer = null;
+ }
if (debugw.on())
- debugw.log("OK => produced: %d, not wrapped: %d",
- dest.remaining(), Utils.remaining(src));
+ debugw.log("OK => produced: %d bytes into %d, not wrapped: %d",
+ dest.remaining(), dest.capacity(), Utils.remaining(src));
return new EngineResult(sslResult, dest);
case BUFFER_UNDERFLOW:
// Shouldn't happen. Doesn't returns when wrap()
@@ -799,8 +867,12 @@
private static final int NOT_HANDSHAKING = 0;
private static final int HANDSHAKING = 1;
- private static final int DOING_TASKS = 4; // bit added to above state
- private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
+ // Bit flags
+ // a thread is currently executing tasks
+ private static final int DOING_TASKS = 4;
+ // a thread wants to execute tasks, while another thread is executing
+ private static final int REQUESTING_TASKS = 8;
+ private static final int TASK_BITS = 12; // Both bits
private static final int READER = 1;
private static final int WRITER = 2;
@@ -808,7 +880,7 @@
private static String states(AtomicInteger state) {
int s = state.get();
StringBuilder sb = new StringBuilder();
- int x = s & ~DOING_TASKS;
+ int x = s & ~TASK_BITS;
switch (x) {
case NOT_HANDSHAKING:
sb.append(" NOT_HANDSHAKING ");
@@ -821,6 +893,8 @@
}
if ((s & DOING_TASKS) > 0)
sb.append("|DOING_TASKS");
+ if ((s & REQUESTING_TASKS) > 0)
+ sb.append("|REQUESTING_TASKS");
return sb.toString();
}
@@ -833,18 +907,37 @@
final ConcurrentLinkedQueue<String> stateList =
debug.on() ? new ConcurrentLinkedQueue<>() : null;
+ // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS
+ // depending on previous value
+ private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> {
+ if ((current & DOING_TASKS) == 0)
+ return DOING_TASKS | (current & HANDSHAKING);
+ else
+ return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING);
+ };
+
+ // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set
+ // clears bits if not.
+ private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> {
+ if ((current & REQUESTING_TASKS) != 0)
+ return DOING_TASKS | (current & HANDSHAKING);
+ // clear both bits
+ return (current & HANDSHAKING);
+ };
+
private boolean doHandshake(EngineResult r, int caller) {
- // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS
- handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
+ // unconditionally sets the HANDSHAKING bit, while preserving task bits
+ handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS));
if (stateList != null && debug.on()) {
stateList.add(r.handshakeStatus().toString());
stateList.add(Integer.toString(caller));
}
switch (r.handshakeStatus()) {
case NEED_TASK:
- int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
- if ((s & DOING_TASKS) > 0) // someone else was doing tasks
+ int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS);
+ if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks
return false;
+ }
if (debug.on()) debug.log("obtaining and initiating task execution");
List<Runnable> tasks = obtainTasks();
@@ -878,20 +971,25 @@
}
private void executeTasks(List<Runnable> tasks) {
- if (tasks.isEmpty())
- return;
exec.execute(() -> {
try {
List<Runnable> nextTasks = tasks;
+ if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size()));
do {
nextTasks.forEach(Runnable::run);
if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
nextTasks = obtainTasks();
} else {
+ int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS);
+ if ((s & DOING_TASKS) != 0) {
+ if (debug.on()) debug.log("re-running tasks (B)");
+ nextTasks = obtainTasks();
+ continue;
+ }
break;
}
} while (true);
- handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
+ if (debug.on()) debug.log("finished task execution");
resumeActivity();
} catch (Throwable t) {
handleError(t);
@@ -997,6 +1095,8 @@
}
}
+ // The maximum network buffer size negotiated during
+ // the handshake. Usually 16K.
volatile int packetBufferSize;
final ByteBuffer getNetBuffer() {
int netSize = packetBufferSize;
@@ -1006,13 +1106,32 @@
return ByteBuffer.allocate(netSize);
}
+ // The maximum application buffer size negotiated during
+ // the handshake. Usually close to 16K.
volatile int applicationBufferSize;
+ // Despite of the maximum applicationBufferSize negotiated
+ // above, TLS records usually have a much smaller payload.
+ // The adaptativeAppBufferSize records the max payload
+ // ever decoded, and we use that as a guess for how big
+ // a buffer we will need for the next payload.
+ // This avoids allocating and zeroing a 16K buffer for
+ // nothing...
+ volatile int adaptiveAppBufferSize;
final ByteBuffer getAppBuffer() {
int appSize = applicationBufferSize;
if (appSize <= 0) {
- applicationBufferSize = appSize = engine.getSession().getApplicationBufferSize();
+ applicationBufferSize = appSize
+ = engine.getSession().getApplicationBufferSize();
}
- return ByteBuffer.allocate(appSize);
+ int size = adaptiveAppBufferSize;
+ if (size <= 0) {
+ size = 512; // start with 512 this is usually enough for handshaking / headers
+ } else if (size > appSize) {
+ size = appSize;
+ }
+ // will cause a BUFFER_OVERFLOW if not big enough, but
+ // that's OK.
+ return ByteBuffer.allocate(size);
}
final String dbgString() {