--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Fri May 04 12:14:09 2018 -0400
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Fri May 04 17:28:03 2018 +0100
@@ -273,7 +273,7 @@
*/
private final WindowController windowController = new WindowController();
private final FramesController framesController = new FramesController();
- private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber();
+ private final Http2TubeSubscriber subscriber;
final ConnectionWindowUpdateSender windowUpdater;
private volatile Throwable cause;
private volatile Supplier<ByteBuffer> initial;
@@ -290,6 +290,7 @@
String key) {
this.connection = connection;
this.client2 = client2;
+ this.subscriber = new Http2TubeSubscriber(client2.client());
this.nextstreamid = nextstreamid;
this.key = key;
this.clientSettings = this.client2.getClientSettings();
@@ -643,7 +644,7 @@
client2.deleteConnection(this);
List<Stream<?>> c = new LinkedList<>(streams.values());
for (Stream<?> s : c) {
- s.cancelImpl(t);
+ s.connectionClosing(t);
}
connection.close();
}
@@ -797,12 +798,22 @@
}
void resetStream(int streamid, int code) throws IOException {
- Log.logError(
- "Resetting stream {0,number,integer} with error code {1,number,integer}",
- streamid, code);
- ResetFrame frame = new ResetFrame(streamid, code);
- sendFrame(frame);
- closeStream(streamid);
+ try {
+ if (connection.channel().isOpen()) {
+ // no need to try & send a reset frame if the
+ // connection channel is already closed.
+ Log.logError(
+ "Resetting stream {0,number,integer} with error code {1,number,integer}",
+ streamid, code);
+ ResetFrame frame = new ResetFrame(streamid, code);
+ sendFrame(frame);
+ } else if (debug.on()) {
+ debug.log("Channel already closed, no need to reset stream %d",
+ streamid);
+ }
+ } finally {
+ closeStream(streamid);
+ }
}
void closeStream(int streamid) {
@@ -1148,14 +1159,19 @@
* A simple tube subscriber for reading from the connection flow.
*/
final class Http2TubeSubscriber implements TubeSubscriber {
- volatile Flow.Subscription subscription;
- volatile boolean completed;
- volatile boolean dropped;
- volatile Throwable error;
- final ConcurrentLinkedQueue<ByteBuffer> queue
+ private volatile Flow.Subscription subscription;
+ private volatile boolean completed;
+ private volatile boolean dropped;
+ private volatile Throwable error;
+ private final ConcurrentLinkedQueue<ByteBuffer> queue
= new ConcurrentLinkedQueue<>();
- final SequentialScheduler scheduler =
+ private final SequentialScheduler scheduler =
SequentialScheduler.synchronizedScheduler(this::processQueue);
+ private final HttpClientImpl client;
+
+ Http2TubeSubscriber(HttpClientImpl client) {
+ this.client = Objects.requireNonNull(client);
+ }
final void processQueue() {
try {
@@ -1179,6 +1195,12 @@
}
}
+ private final void runOrSchedule() {
+ if (client.isSelectorThread()) {
+ scheduler.runOrSchedule(client.theExecutor());
+ } else scheduler.runOrSchedule();
+ }
+
@Override
public void onSubscribe(Flow.Subscription subscription) {
// supports being called multiple time.
@@ -1202,7 +1224,7 @@
if (debug.on()) debug.log(() -> "onNext: got " + Utils.remaining(item)
+ " bytes in " + item.size() + " buffers");
queue.addAll(item);
- scheduler.runOrSchedule(client().theExecutor());
+ runOrSchedule();
}
@Override
@@ -1210,7 +1232,7 @@
if (debug.on()) debug.log(() -> "onError: " + throwable);
error = throwable;
completed = true;
- scheduler.runOrSchedule(client().theExecutor());
+ runOrSchedule();
}
@Override
@@ -1218,7 +1240,7 @@
if (debug.on()) debug.log("EOF");
error = new EOFException("EOF reached while reading");
completed = true;
- scheduler.runOrSchedule(client().theExecutor());
+ runOrSchedule();
}
@Override
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Fri May 04 12:14:09 2018 -0400
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Fri May 04 17:28:03 2018 +0100
@@ -25,9 +25,9 @@
package jdk.internal.net.http;
+import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.lang.System.Logger.Level;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -171,7 +171,7 @@
Http2Frame frame = inputQ.peek();
if (frame instanceof ResetFrame) {
inputQ.remove();
- handleReset((ResetFrame)frame);
+ handleReset((ResetFrame)frame, subscriber);
return;
}
DataFrame df = (DataFrame)frame;
@@ -424,25 +424,56 @@
} else if (closed) {
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
} else {
- // put it in the input queue in order to read all
- // pending data frames first. Indeed, a server may send
- // RST_STREAM after sending END_STREAM, in which case we should
- // ignore it. However, we won't know if we have received END_STREAM
- // or not until all pending data frames are read.
- receiveResetFrame(frame);
- // RST_STREAM was pushed to the queue. It will be handled by
- // asyncReceive after all pending data frames have been
- // processed.
- Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
+ Flow.Subscriber<?> subscriber =
+ responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
+ if (response == null && subscriber == null) {
+ // we haven't receive the headers yet, and won't receive any!
+ // handle reset now.
+ handleReset(frame, subscriber);
+ } else {
+ // put it in the input queue in order to read all
+ // pending data frames first. Indeed, a server may send
+ // RST_STREAM after sending END_STREAM, in which case we should
+ // ignore it. However, we won't know if we have received END_STREAM
+ // or not until all pending data frames are read.
+ receiveResetFrame(frame);
+ // RST_STREAM was pushed to the queue. It will be handled by
+ // asyncReceive after all pending data frames have been
+ // processed.
+ Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
+ }
}
}
- void handleReset(ResetFrame frame) {
+ void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
if (!closed) {
- close();
- int error = frame.getErrorCode();
- completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error)));
+ synchronized (this) {
+ if (closed) {
+ if (debug.on()) debug.log("Stream already closed: ignoring RESET");
+ return;
+ }
+ closed = true;
+ }
+ try {
+ int error = frame.getErrorCode();
+ IOException e = new IOException("Received RST_STREAM: "
+ + ErrorFrame.stringForCode(error));
+ if (errorRef.compareAndSet(null, e)) {
+ if (subscriber != null) {
+ subscriber.onError(e);
+ }
+ }
+ completeResponseExceptionally(e);
+ if (!requestBodyCF.isDone()) {
+ requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
+ }
+ if (responseBodyCF != null) {
+ responseBodyCF.completeExceptionally(errorRef.get());
+ }
+ } finally {
+ connection.closeStream(streamid);
+ }
} else {
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
}
@@ -1033,6 +1064,15 @@
cancelImpl(cause);
}
+ void connectionClosing(Throwable cause) {
+ Flow.Subscriber<?> subscriber =
+ responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
+ errorRef.compareAndSet(null, cause);
+ if (subscriber != null && !sched.isStopped() && !inputQ.isEmpty()) {
+ sched.runOrSchedule();
+ } else cancelImpl(cause);
+ }
+
// This method sends a RST_STREAM frame
void cancelImpl(Throwable e) {
errorRef.compareAndSet(null, e);
@@ -1062,7 +1102,13 @@
try {
// will send a RST_STREAM frame
if (streamid != 0) {
- connection.resetStream(streamid, ResetFrame.CANCEL);
+ e = Utils.getCompletionCause(e);
+ if (e instanceof EOFException) {
+ // read EOF: no need to try & send reset
+ connection.closeStream(streamid);
+ } else {
+ connection.resetStream(streamid, ResetFrame.CANCEL);
+ }
}
} catch (IOException ex) {
Log.logError(ex);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Fri May 04 12:14:09 2018 -0400
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Fri May 04 17:28:03 2018 +0100
@@ -87,13 +87,17 @@
final Logger debug =
Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
+ private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
+ private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
+ // When handshake is in progress trying to wrap may produce no bytes.
+ private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
+
final Executor exec;
final Reader reader;
final Writer writer;
final SSLEngine engine;
final String tubeName; // hack
final CompletableFuture<String> alpnCF; // completes on initial handshake
- final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
volatile boolean close_notify_received;
final CompletableFuture<Void> readerCF;
final CompletableFuture<Void> writerCF;
@@ -245,13 +249,16 @@
final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
private final class ReaderDownstreamPusher implements Runnable {
- @Override public void run() { processData(); }
+ @Override
+ public void run() {
+ processData();
+ }
}
Reader() {
super();
scheduler = SequentialScheduler.synchronizedScheduler(
- new ReaderDownstreamPusher());
+ new ReaderDownstreamPusher());
this.readBuf = ByteBuffer.allocate(1024);
readBuf.limit(0); // keep in read mode
}
@@ -276,7 +283,7 @@
public void incoming(List<ByteBuffer> buffers, boolean complete) {
if (debugr.on())
debugr.log("Adding %d bytes to read buffer",
- Utils.remaining(buffers));
+ Utils.remaining(buffers));
addToReadBuf(buffers, complete);
scheduler.runOrSchedule(exec);
}
@@ -289,7 +296,7 @@
private void reallocReadBuf() {
int sz = readBuf.capacity();
- ByteBuffer newb = ByteBuffer.allocate(sz*2);
+ ByteBuffer newb = ByteBuffer.allocate(sz * 2);
readBuf.flip();
Utils.copy(readBuf, newb);
readBuf = newb;
@@ -300,7 +307,7 @@
if (readBuf.remaining() > TARGET_BUFSIZE) {
if (debugr.on())
debugr.log("readBuf has more than TARGET_BUFSIZE: %d",
- readBuf.remaining());
+ readBuf.remaining());
return 0;
} else {
return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
@@ -309,6 +316,7 @@
// readBuf is kept ready for reading outside of this method
private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
+ assert Utils.remaining(buffers) > 0 || buffers.isEmpty();
synchronized (readBufferLock) {
for (ByteBuffer buf : buffers) {
readBuf.compact();
@@ -344,14 +352,15 @@
// In this case we need to wait for more bytes than what
// we had before calling unwrap() again.
volatile int minBytesRequired;
+
// work function where it all happens
final void processData() {
try {
if (debugr.on())
debugr.log("processData:"
- + " readBuf remaining:" + readBuf.remaining()
- + ", state:" + states(handshakeState)
- + ", engine handshake status:" + engine.getHandshakeStatus());
+ + " readBuf remaining:" + readBuf.remaining()
+ + ", state:" + states(handshakeState)
+ + ", engine handshake status:" + engine.getHandshakeStatus());
int len;
boolean complete = false;
while (readBuf.remaining() > (len = minBytesRequired)) {
@@ -400,14 +409,13 @@
outgoing(Utils.EMPTY_BB_LIST, true);
return;
}
- if (result.handshaking() && !complete) {
+ if (result.handshaking()) {
+ handshaking = true;
if (debugr.on()) debugr.log("handshaking");
- if (doHandshake(result, READER)) {
- resumeActivity();
- }
- handshaking = true;
+ if (doHandshake(result, READER)) continue; // need unwrap
+ else break; // doHandshake will have triggered the write scheduler if necessary
} else {
- if ((handshakeState.getAndSet(NOT_HANDSHAKING)& ~DOING_TASKS) == HANDSHAKING) {
+ if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
handshaking = false;
applicationBufferSize = engine.getSession().getApplicationBufferSize();
packetBufferSize = engine.getSession().getPacketBufferSize();
@@ -443,12 +451,19 @@
EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
ByteBuffer dst = getAppBuffer();
+ int len = src.remaining();
while (true) {
SSLEngineResult sslResult = engine.unwrap(src, dst);
switch (sslResult.getStatus()) {
case BUFFER_OVERFLOW:
- // may happen only if app size buffer was changed.
- // get it again if app buffer size changed
+ // may happen if app size buffer was changed, or if
+ // our 'adaptiveBufferSize' guess was too small for
+ // the current payload. In that case, update the
+ // value of applicationBufferSize, and allocate a
+ // buffer of that size, which we are sure will be
+ // big enough to decode whatever needs to be
+ // decoded. We will later update adaptiveBufferSize
+ // in OK: below.
int appSize = applicationBufferSize =
engine.getSession().getApplicationBufferSize();
ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
@@ -457,11 +472,26 @@
dst = b;
break;
case CLOSED:
+ assert dst.position() == 0;
return doClosure(new EngineResult(sslResult));
case BUFFER_UNDERFLOW:
// handled implicitly by compaction/reallocation of readBuf
+ assert dst.position() == 0;
return new EngineResult(sslResult);
case OK:
+ int size = dst.position();
+ if (debug.on()) {
+ debugr.log("Decoded " + size + " bytes out of " + len
+ + " into buffer of " + dst.capacity()
+ + " remaining to decode: " + src.remaining());
+ }
+ // if the record payload was bigger than what was originally
+ // allocated, then sets the adaptiveAppBufferSize to size
+ // and we will use that new size as a guess for the next app
+ // buffer.
+ if (size > adaptiveAppBufferSize) {
+ adaptiveAppBufferSize = ((size + 7) >>> 3) << 3;
+ }
dst.flip();
return new EngineResult(sslResult, dst);
}
@@ -662,8 +692,8 @@
}
cleanList(writeList); // tidy up the source list
sendResultBytes(result);
- if (handshaking && !completing) {
- if (needWrap()) {
+ if (handshaking) {
+ if (!completing && needWrap()) {
continue;
} else {
return;
@@ -687,11 +717,30 @@
}
}
+ // The SSLEngine insists on being given a buffer that is at least
+ // SSLSession.getPacketBufferSize() long (usually 16K). If given
+ // a smaller buffer it will go in BUFFER_OVERFLOW, even if it only
+ // has 6 bytes to wrap. Typical usage shows that for GET we
+ // usually produce an average of ~ 100 bytes.
+ // To avoid wasting space, and because allocating and zeroing
+ // 16K buffers for encoding 6 bytes is costly, we are reusing the
+ // same writeBuffer to interact with SSLEngine.wrap().
+ // If the SSLEngine produces less than writeBuffer.capacity() / 2,
+ // then we copy off the bytes to a smaller buffer that we send
+ // downstream. Otherwise, we send the writeBuffer downstream
+ // and will allocate a new one next time.
+ volatile ByteBuffer writeBuffer;
@SuppressWarnings("fallthrough")
EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
+ long len = Utils.remaining(src);
if (debugw.on())
- debugw.log("wrapping " + Utils.remaining(src) + " bytes");
- ByteBuffer dst = getNetBuffer();
+ debugw.log("wrapping " + len + " bytes");
+
+ ByteBuffer dst = writeBuffer;
+ if (dst == null) dst = writeBuffer = getNetBuffer();
+ assert dst.position() == 0 : "buffer position is " + dst.position();
+ assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity();
+
while (true) {
SSLEngineResult sslResult = engine.wrap(src, dst);
if (debugw.on()) debugw.log("SSLResult: " + sslResult);
@@ -702,7 +751,7 @@
if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
int netSize = packetBufferSize
= engine.getSession().getPacketBufferSize();
- ByteBuffer b = ByteBuffer.allocate(netSize + dst.position());
+ ByteBuffer b = writeBuffer = ByteBuffer.allocate(netSize + dst.position());
dst.flip();
b.put(dst);
dst = b;
@@ -712,11 +761,27 @@
// fallthrough. There could be some remaining data in dst.
// CLOSED will be handled by the caller.
case OK:
- dst.flip();
- final ByteBuffer dest = dst;
+ final ByteBuffer dest;
+ if (dst.position() == 0) {
+ dest = NOTHING; // can happen if handshake is in progress
+ } else if (dst.position() < dst.capacity() / 2) {
+ // less than half the buffer was used.
+ // copy off the bytes to a smaller buffer, and keep
+ // the writeBuffer for next time.
+ dst.flip();
+ dest = Utils.copyAligned(dst);
+ dst.clear();
+ } else {
+ // more than half the buffer was used.
+ // just send that buffer downstream, and we will
+ // get a new writeBuffer next time it is needed.
+ dst.flip();
+ dest = dst;
+ writeBuffer = null;
+ }
if (debugw.on())
- debugw.log("OK => produced: %d, not wrapped: %d",
- dest.remaining(), Utils.remaining(src));
+ debugw.log("OK => produced: %d bytes into %d, not wrapped: %d",
+ dest.remaining(), dest.capacity(), Utils.remaining(src));
return new EngineResult(sslResult, dest);
case BUFFER_UNDERFLOW:
// Shouldn't happen. Doesn't returns when wrap()
@@ -800,7 +865,6 @@
private static final int HANDSHAKING = 1;
private static final int DOING_TASKS = 4; // bit added to above state
- private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
private static final int READER = 1;
private static final int WRITER = 2;
@@ -997,6 +1061,8 @@
}
}
+ // The maximum network buffer size negotiated during
+ // the handshake. Usually 16K.
volatile int packetBufferSize;
final ByteBuffer getNetBuffer() {
int netSize = packetBufferSize;
@@ -1006,13 +1072,32 @@
return ByteBuffer.allocate(netSize);
}
+ // The maximum application buffer size negotiated during
+ // the handshake. Usually close to 16K.
volatile int applicationBufferSize;
+ // Despite of the maximum applicationBufferSize negotiated
+ // above, TLS records usually have a much smaller payload.
+ // The adaptativeAppBufferSize records the max payload
+ // ever decoded, and we use that as a guess for how big
+ // a buffer we will need for the next payload.
+ // This avoids allocating and zeroing a 16K buffer for
+ // nothing...
+ volatile int adaptiveAppBufferSize;
final ByteBuffer getAppBuffer() {
int appSize = applicationBufferSize;
if (appSize <= 0) {
- applicationBufferSize = appSize = engine.getSession().getApplicationBufferSize();
+ applicationBufferSize = appSize
+ = engine.getSession().getApplicationBufferSize();
}
- return ByteBuffer.allocate(appSize);
+ int size = adaptiveAppBufferSize;
+ if (size <= 0) {
+ size = 512; // start with 512 this is usually enough for handshaking / headers
+ } else if (size > appSize) {
+ size = appSize;
+ }
+ // will cause a BUFFER_OVERFLOW if not big enough, but
+ // that's OK.
+ return ByteBuffer.allocate(size);
}
final String dbgString() {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Fri May 04 12:14:09 2018 -0400
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Fri May 04 17:28:03 2018 +0100
@@ -534,6 +534,16 @@
return dst;
}
+ public static ByteBuffer copyAligned(ByteBuffer src) {
+ int len = src.remaining();
+ int size = ((len + 7) >> 3) << 3;
+ assert size >= len;
+ ByteBuffer dst = ByteBuffer.allocate(size);
+ dst.put(src);
+ dst.flip();
+ return dst;
+ }
+
public static String dump(Object... objects) {
return Arrays.toString(objects);
}