diff -r 8e1ed2a15845 -r 4690a2871b44 src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Wed May 02 10:47:16 2018 +0200 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Wed May 02 02:36:17 2018 -0700 @@ -33,7 +33,6 @@ import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLException; import java.io.IOException; -import java.lang.System.Logger.Level; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -46,6 +45,7 @@ import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; /** * Implements SSL using two SubscriberWrappers. @@ -97,6 +97,7 @@ volatile boolean close_notify_received; final CompletableFuture readerCF; final CompletableFuture writerCF; + final Consumer recycler; static AtomicInteger scount = new AtomicInteger(1); final int id; @@ -110,8 +111,23 @@ Subscriber> downReader, Subscriber> downWriter) { + this(engine, exec, null, downReader, downWriter); + } + + /** + * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each + * Flow.Subscriber requires an associated {@link CompletableFuture} + * for errors that need to be signaled from downstream to upstream. + */ + public SSLFlowDelegate(SSLEngine engine, + Executor exec, + Consumer recycler, + Subscriber> downReader, + Subscriber> downWriter) + { this.id = scount.getAndIncrement(); this.tubeName = String.valueOf(downWriter); + this.recycler = recycler; this.reader = new Reader(); this.writer = new Writer(); this.engine = engine; @@ -181,9 +197,11 @@ sb.append("SSL: id ").append(id); sb.append(" HS state: " + states(handshakeState)); sb.append(" Engine state: " + engine.getHandshakeStatus().toString()); - sb.append(" LL : "); - for (String s: stateList) { - sb.append(s).append(" "); + if (stateList != null) { + sb.append(" LL : "); + for (String s : stateList) { + sb.append(s).append(" "); + } } sb.append("\r\n"); sb.append("Reader:: ").append(reader.toString()); @@ -213,15 +231,20 @@ * Upstream subscription strategy is to try and keep no more than * TARGET_BUFSIZE bytes in readBuf */ - class Reader extends SubscriberWrapper { + final class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber { + // Maximum record size is 16k. + // Because SocketTube can feeds us up to 3 16K buffers, + // then setting this size to 16K means that the readBuf + // can store up to 64K-1 (16K-1 + 3*16K) + static final int TARGET_BUFSIZE = 16 * 1024; + final SequentialScheduler scheduler; - static final int TARGET_BUFSIZE = 16 * 1024; volatile ByteBuffer readBuf; volatile boolean completing; final Object readBufferLock = new Object(); final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); - class ReaderDownstreamPusher implements Runnable { + private final class ReaderDownstreamPusher implements Runnable { @Override public void run() { processData(); } } @@ -233,6 +256,11 @@ readBuf.limit(0); // keep in read mode } + @Override + public boolean supportsRecycling() { + return recycler != null; + } + protected SchedulingAction enterScheduling() { return enterReadScheduling(); } @@ -250,7 +278,7 @@ debugr.log("Adding %d bytes to read buffer", Utils.remaining(buffers)); addToReadBuf(buffers, complete); - scheduler.runOrSchedule(); + scheduler.runOrSchedule(exec); } @Override @@ -270,6 +298,9 @@ @Override protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { if (readBuf.remaining() > TARGET_BUFSIZE) { + if (debugr.on()) + debugr.log("readBuf has more than TARGET_BUFSIZE: %d", + readBuf.remaining()); return 0; } else { return super.upstreamWindowUpdate(currentWindow, downstreamQsize); @@ -285,6 +316,11 @@ reallocReadBuf(); readBuf.put(buf); readBuf.flip(); + // should be safe to call inside lock + // since the only implementation + // offers the buffer to an unbounded queue. + // WARNING: do not touch buf after this point! + if (recycler != null) recycler.accept(buf); } if (complete) { this.completing = complete; @@ -293,7 +329,7 @@ } void schedule() { - scheduler.runOrSchedule(); + scheduler.runOrSchedule(exec); } void stop() { @@ -303,8 +339,13 @@ AtomicInteger count = new AtomicInteger(0); + // minimum number of bytes required to call unwrap. + // Usually this is 0, unless there was a buffer underflow. + // In this case we need to wait for more bytes than what + // we had before calling unwrap() again. + volatile int minBytesRequired; // work function where it all happens - void processData() { + final void processData() { try { if (debugr.on()) debugr.log("processData:" @@ -313,15 +354,23 @@ + ", engine handshake status:" + engine.getHandshakeStatus()); int len; boolean complete = false; - while ((len = readBuf.remaining()) > 0) { + while (readBuf.remaining() > (len = minBytesRequired)) { boolean handshaking = false; try { EngineResult result; synchronized (readBufferLock) { complete = this.completing; + if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining()); + // Unless there is a BUFFER_UNDERFLOW, we should try to + // unwrap any number of bytes. Set minBytesRequired to 0: + // we only need to do that if minBytesRequired is not already 0. + len = len > 0 ? minBytesRequired = 0 : len; result = unwrapBuffer(readBuf); - if (debugr.on()) - debugr.log("Unwrapped: %s", result.result); + len = readBuf.remaining(); + if (debugr.on()) { + debugr.log("Unwrapped: result: %s", result.result); + debugr.log("Unwrapped: consumed: %s", result.bytesConsumed()); + } } if (result.bytesProduced() > 0) { if (debugr.on()) @@ -332,12 +381,19 @@ if (result.status() == Status.BUFFER_UNDERFLOW) { if (debugr.on()) debugr.log("BUFFER_UNDERFLOW"); // not enough data in the read buffer... - requestMore(); + // no need to try to unwrap again unless we get more bytes + // than minBytesRequired = len in the read buffer. + minBytesRequired = len; synchronized (readBufferLock) { - // check if we have received some data + // more bytes could already have been added... + assert readBuf.remaining() >= len; + // check if we have received some data, and if so + // we can just re-spin the loop if (readBuf.remaining() > len) continue; - return; } + // request more data and return. + requestMore(); + return; } if (complete && result.status() == Status.CLOSED) { if (debugr.on()) debugr.log("Closed: completing"); @@ -352,8 +408,10 @@ handshaking = true; } else { if ((handshakeState.getAndSet(NOT_HANDSHAKING)& ~DOING_TASKS) == HANDSHAKING) { + handshaking = false; + applicationBufferSize = engine.getSession().getApplicationBufferSize(); + packetBufferSize = engine.getSession().getPacketBufferSize(); setALPN(); - handshaking = false; resumeActivity(); } } @@ -391,7 +449,8 @@ case BUFFER_OVERFLOW: // may happen only if app size buffer was changed. // get it again if app buffer size changed - int appSize = engine.getSession().getApplicationBufferSize(); + int appSize = applicationBufferSize = + engine.getSession().getApplicationBufferSize(); ByteBuffer b = ByteBuffer.allocate(appSize + dst.position()); dst.flip(); b.put(dst); @@ -489,7 +548,7 @@ @Override protected void incoming(List buffers, boolean complete) { - assert complete ? buffers == Utils.EMPTY_BB_LIST : true; + assert complete ? buffers == Utils.EMPTY_BB_LIST : true; assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true; if (complete) { if (debugw.on()) debugw.log("adding SENTINEL"); @@ -549,6 +608,15 @@ } } + void triggerWrite() { + synchronized (writeList) { + if (writeList.isEmpty()) { + writeList.add(HS_TRIGGER); + } + } + scheduler.runOrSchedule(); + } + private void processData() { boolean completing = isCompleting(); @@ -586,6 +654,8 @@ handshaking = true; } else { if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { + applicationBufferSize = engine.getSession().getApplicationBufferSize(); + packetBufferSize = engine.getSession().getPacketBufferSize(); setALPN(); resumeActivity(); } @@ -630,8 +700,9 @@ // Shouldn't happen. We allocated buffer with packet size // get it again if net buffer size was changed if (debugw.on()) debugw.log("BUFFER_OVERFLOW"); - int appSize = engine.getSession().getApplicationBufferSize(); - ByteBuffer b = ByteBuffer.allocate(appSize + dst.position()); + int netSize = packetBufferSize + = engine.getSession().getPacketBufferSize(); + ByteBuffer b = ByteBuffer.allocate(netSize + dst.position()); dst.flip(); b.put(dst); dst = b; @@ -759,13 +830,16 @@ } final AtomicInteger handshakeState; - final ConcurrentLinkedQueue stateList = new ConcurrentLinkedQueue<>(); + final ConcurrentLinkedQueue stateList = + debug.on() ? new ConcurrentLinkedQueue<>() : null; private boolean doHandshake(EngineResult r, int caller) { // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS)); - stateList.add(r.handshakeStatus().toString()); - stateList.add(Integer.toString(caller)); + if (stateList != null && debug.on()) { + stateList.add(r.handshakeStatus().toString()); + stateList.add(Integer.toString(caller)); + } switch (r.handshakeStatus()) { case NEED_TASK: int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS); @@ -778,7 +852,7 @@ return false; // executeTasks will resume activity case NEED_WRAP: if (caller == READER) { - writer.addData(HS_TRIGGER); + writer.triggerWrite(); return false; } break; @@ -818,7 +892,6 @@ } } while (true); handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS); - //writer.addData(HS_TRIGGER); resumeActivity(); } catch (Throwable t) { handleError(t); @@ -839,7 +912,17 @@ if (engine.isInboundDone() && !engine.isOutboundDone()) { if (debug.on()) debug.log("doClosure: close_notify received"); close_notify_received = true; - doHandshake(r, READER); + if (!writer.scheduler.isStopped()) { + doHandshake(r, READER); + } else { + // We have received closed notify, but we + // won't be able to send the acknowledgement. + // Nothing more will come from the socket either, + // so mark the reader as completed. + synchronized (reader.readBufferLock) { + reader.completing = true; + } + } } } return r; @@ -914,12 +997,22 @@ } } - public ByteBuffer getNetBuffer() { - return ByteBuffer.allocate(engine.getSession().getPacketBufferSize()); + volatile int packetBufferSize; + final ByteBuffer getNetBuffer() { + int netSize = packetBufferSize; + if (netSize <= 0) { + packetBufferSize = netSize = engine.getSession().getPacketBufferSize(); + } + return ByteBuffer.allocate(netSize); } - private ByteBuffer getAppBuffer() { - return ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()); + volatile int applicationBufferSize; + final ByteBuffer getAppBuffer() { + int appSize = applicationBufferSize; + if (appSize <= 0) { + applicationBufferSize = appSize = engine.getSession().getApplicationBufferSize(); + } + return ByteBuffer.allocate(appSize); } final String dbgString() {