--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Wed May 02 10:47:16 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Wed May 02 02:36:17 2018 -0700
@@ -33,7 +33,6 @@
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
import java.io.IOException;
-import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -46,6 +45,7 @@
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
/**
* Implements SSL using two SubscriberWrappers.
@@ -97,6 +97,7 @@
volatile boolean close_notify_received;
final CompletableFuture<Void> readerCF;
final CompletableFuture<Void> writerCF;
+ final Consumer<ByteBuffer> recycler;
static AtomicInteger scount = new AtomicInteger(1);
final int id;
@@ -110,8 +111,23 @@
Subscriber<? super List<ByteBuffer>> downReader,
Subscriber<? super List<ByteBuffer>> downWriter)
{
+ this(engine, exec, null, downReader, downWriter);
+ }
+
+ /**
+ * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
+ * Flow.Subscriber requires an associated {@link CompletableFuture}
+ * for errors that need to be signaled from downstream to upstream.
+ */
+ public SSLFlowDelegate(SSLEngine engine,
+ Executor exec,
+ Consumer<ByteBuffer> recycler,
+ Subscriber<? super List<ByteBuffer>> downReader,
+ Subscriber<? super List<ByteBuffer>> downWriter)
+ {
this.id = scount.getAndIncrement();
this.tubeName = String.valueOf(downWriter);
+ this.recycler = recycler;
this.reader = new Reader();
this.writer = new Writer();
this.engine = engine;
@@ -181,9 +197,11 @@
sb.append("SSL: id ").append(id);
sb.append(" HS state: " + states(handshakeState));
sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
- sb.append(" LL : ");
- for (String s: stateList) {
- sb.append(s).append(" ");
+ if (stateList != null) {
+ sb.append(" LL : ");
+ for (String s : stateList) {
+ sb.append(s).append(" ");
+ }
}
sb.append("\r\n");
sb.append("Reader:: ").append(reader.toString());
@@ -213,15 +231,20 @@
* Upstream subscription strategy is to try and keep no more than
* TARGET_BUFSIZE bytes in readBuf
*/
- class Reader extends SubscriberWrapper {
+ final class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber {
+ // Maximum record size is 16k.
+ // Because SocketTube can feeds us up to 3 16K buffers,
+ // then setting this size to 16K means that the readBuf
+ // can store up to 64K-1 (16K-1 + 3*16K)
+ static final int TARGET_BUFSIZE = 16 * 1024;
+
final SequentialScheduler scheduler;
- static final int TARGET_BUFSIZE = 16 * 1024;
volatile ByteBuffer readBuf;
volatile boolean completing;
final Object readBufferLock = new Object();
final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
- class ReaderDownstreamPusher implements Runnable {
+ private final class ReaderDownstreamPusher implements Runnable {
@Override public void run() { processData(); }
}
@@ -233,6 +256,11 @@
readBuf.limit(0); // keep in read mode
}
+ @Override
+ public boolean supportsRecycling() {
+ return recycler != null;
+ }
+
protected SchedulingAction enterScheduling() {
return enterReadScheduling();
}
@@ -250,7 +278,7 @@
debugr.log("Adding %d bytes to read buffer",
Utils.remaining(buffers));
addToReadBuf(buffers, complete);
- scheduler.runOrSchedule();
+ scheduler.runOrSchedule(exec);
}
@Override
@@ -270,6 +298,9 @@
@Override
protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
if (readBuf.remaining() > TARGET_BUFSIZE) {
+ if (debugr.on())
+ debugr.log("readBuf has more than TARGET_BUFSIZE: %d",
+ readBuf.remaining());
return 0;
} else {
return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
@@ -285,6 +316,11 @@
reallocReadBuf();
readBuf.put(buf);
readBuf.flip();
+ // should be safe to call inside lock
+ // since the only implementation
+ // offers the buffer to an unbounded queue.
+ // WARNING: do not touch buf after this point!
+ if (recycler != null) recycler.accept(buf);
}
if (complete) {
this.completing = complete;
@@ -293,7 +329,7 @@
}
void schedule() {
- scheduler.runOrSchedule();
+ scheduler.runOrSchedule(exec);
}
void stop() {
@@ -303,8 +339,13 @@
AtomicInteger count = new AtomicInteger(0);
+ // minimum number of bytes required to call unwrap.
+ // Usually this is 0, unless there was a buffer underflow.
+ // In this case we need to wait for more bytes than what
+ // we had before calling unwrap() again.
+ volatile int minBytesRequired;
// work function where it all happens
- void processData() {
+ final void processData() {
try {
if (debugr.on())
debugr.log("processData:"
@@ -313,15 +354,23 @@
+ ", engine handshake status:" + engine.getHandshakeStatus());
int len;
boolean complete = false;
- while ((len = readBuf.remaining()) > 0) {
+ while (readBuf.remaining() > (len = minBytesRequired)) {
boolean handshaking = false;
try {
EngineResult result;
synchronized (readBufferLock) {
complete = this.completing;
+ if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining());
+ // Unless there is a BUFFER_UNDERFLOW, we should try to
+ // unwrap any number of bytes. Set minBytesRequired to 0:
+ // we only need to do that if minBytesRequired is not already 0.
+ len = len > 0 ? minBytesRequired = 0 : len;
result = unwrapBuffer(readBuf);
- if (debugr.on())
- debugr.log("Unwrapped: %s", result.result);
+ len = readBuf.remaining();
+ if (debugr.on()) {
+ debugr.log("Unwrapped: result: %s", result.result);
+ debugr.log("Unwrapped: consumed: %s", result.bytesConsumed());
+ }
}
if (result.bytesProduced() > 0) {
if (debugr.on())
@@ -332,12 +381,19 @@
if (result.status() == Status.BUFFER_UNDERFLOW) {
if (debugr.on()) debugr.log("BUFFER_UNDERFLOW");
// not enough data in the read buffer...
- requestMore();
+ // no need to try to unwrap again unless we get more bytes
+ // than minBytesRequired = len in the read buffer.
+ minBytesRequired = len;
synchronized (readBufferLock) {
- // check if we have received some data
+ // more bytes could already have been added...
+ assert readBuf.remaining() >= len;
+ // check if we have received some data, and if so
+ // we can just re-spin the loop
if (readBuf.remaining() > len) continue;
- return;
}
+ // request more data and return.
+ requestMore();
+ return;
}
if (complete && result.status() == Status.CLOSED) {
if (debugr.on()) debugr.log("Closed: completing");
@@ -352,8 +408,10 @@
handshaking = true;
} else {
if ((handshakeState.getAndSet(NOT_HANDSHAKING)& ~DOING_TASKS) == HANDSHAKING) {
+ handshaking = false;
+ applicationBufferSize = engine.getSession().getApplicationBufferSize();
+ packetBufferSize = engine.getSession().getPacketBufferSize();
setALPN();
- handshaking = false;
resumeActivity();
}
}
@@ -391,7 +449,8 @@
case BUFFER_OVERFLOW:
// may happen only if app size buffer was changed.
// get it again if app buffer size changed
- int appSize = engine.getSession().getApplicationBufferSize();
+ int appSize = applicationBufferSize =
+ engine.getSession().getApplicationBufferSize();
ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
dst.flip();
b.put(dst);
@@ -489,7 +548,7 @@
@Override
protected void incoming(List<ByteBuffer> buffers, boolean complete) {
- assert complete ? buffers == Utils.EMPTY_BB_LIST : true;
+ assert complete ? buffers == Utils.EMPTY_BB_LIST : true;
assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
if (complete) {
if (debugw.on()) debugw.log("adding SENTINEL");
@@ -549,6 +608,15 @@
}
}
+ void triggerWrite() {
+ synchronized (writeList) {
+ if (writeList.isEmpty()) {
+ writeList.add(HS_TRIGGER);
+ }
+ }
+ scheduler.runOrSchedule();
+ }
+
private void processData() {
boolean completing = isCompleting();
@@ -586,6 +654,8 @@
handshaking = true;
} else {
if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
+ applicationBufferSize = engine.getSession().getApplicationBufferSize();
+ packetBufferSize = engine.getSession().getPacketBufferSize();
setALPN();
resumeActivity();
}
@@ -630,8 +700,9 @@
// Shouldn't happen. We allocated buffer with packet size
// get it again if net buffer size was changed
if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
- int appSize = engine.getSession().getApplicationBufferSize();
- ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
+ int netSize = packetBufferSize
+ = engine.getSession().getPacketBufferSize();
+ ByteBuffer b = ByteBuffer.allocate(netSize + dst.position());
dst.flip();
b.put(dst);
dst = b;
@@ -759,13 +830,16 @@
}
final AtomicInteger handshakeState;
- final ConcurrentLinkedQueue<String> stateList = new ConcurrentLinkedQueue<>();
+ final ConcurrentLinkedQueue<String> stateList =
+ debug.on() ? new ConcurrentLinkedQueue<>() : null;
private boolean doHandshake(EngineResult r, int caller) {
// unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS
handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
- stateList.add(r.handshakeStatus().toString());
- stateList.add(Integer.toString(caller));
+ if (stateList != null && debug.on()) {
+ stateList.add(r.handshakeStatus().toString());
+ stateList.add(Integer.toString(caller));
+ }
switch (r.handshakeStatus()) {
case NEED_TASK:
int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
@@ -778,7 +852,7 @@
return false; // executeTasks will resume activity
case NEED_WRAP:
if (caller == READER) {
- writer.addData(HS_TRIGGER);
+ writer.triggerWrite();
return false;
}
break;
@@ -818,7 +892,6 @@
}
} while (true);
handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
- //writer.addData(HS_TRIGGER);
resumeActivity();
} catch (Throwable t) {
handleError(t);
@@ -839,7 +912,17 @@
if (engine.isInboundDone() && !engine.isOutboundDone()) {
if (debug.on()) debug.log("doClosure: close_notify received");
close_notify_received = true;
- doHandshake(r, READER);
+ if (!writer.scheduler.isStopped()) {
+ doHandshake(r, READER);
+ } else {
+ // We have received closed notify, but we
+ // won't be able to send the acknowledgement.
+ // Nothing more will come from the socket either,
+ // so mark the reader as completed.
+ synchronized (reader.readBufferLock) {
+ reader.completing = true;
+ }
+ }
}
}
return r;
@@ -914,12 +997,22 @@
}
}
- public ByteBuffer getNetBuffer() {
- return ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
+ volatile int packetBufferSize;
+ final ByteBuffer getNetBuffer() {
+ int netSize = packetBufferSize;
+ if (netSize <= 0) {
+ packetBufferSize = netSize = engine.getSession().getPacketBufferSize();
+ }
+ return ByteBuffer.allocate(netSize);
}
- private ByteBuffer getAppBuffer() {
- return ByteBuffer.allocate(engine.getSession().getApplicationBufferSize());
+ volatile int applicationBufferSize;
+ final ByteBuffer getAppBuffer() {
+ int appSize = applicationBufferSize;
+ if (appSize <= 0) {
+ applicationBufferSize = appSize = engine.getSession().getApplicationBufferSize();
+ }
+ return ByteBuffer.allocate(appSize);
}
final String dbgString() {