http-client-branch: connection shutdown should no cancel streams immediately if data is pending in the stream queue.
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Fri May 04 14:32:26 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Fri May 04 17:02:19 2018 +0100
@@ -273,7 +273,7 @@
*/
private final WindowController windowController = new WindowController();
private final FramesController framesController = new FramesController();
- private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber();
+ private final Http2TubeSubscriber subscriber;
final ConnectionWindowUpdateSender windowUpdater;
private volatile Throwable cause;
private volatile Supplier<ByteBuffer> initial;
@@ -290,6 +290,7 @@
String key) {
this.connection = connection;
this.client2 = client2;
+ this.subscriber = new Http2TubeSubscriber(client2.client());
this.nextstreamid = nextstreamid;
this.key = key;
this.clientSettings = this.client2.getClientSettings();
@@ -643,7 +644,7 @@
client2.deleteConnection(this);
List<Stream<?>> c = new LinkedList<>(streams.values());
for (Stream<?> s : c) {
- s.cancelImpl(t);
+ s.connectionClosing(t);
}
connection.close();
}
@@ -1158,14 +1159,19 @@
* A simple tube subscriber for reading from the connection flow.
*/
final class Http2TubeSubscriber implements TubeSubscriber {
- volatile Flow.Subscription subscription;
- volatile boolean completed;
- volatile boolean dropped;
- volatile Throwable error;
- final ConcurrentLinkedQueue<ByteBuffer> queue
+ private volatile Flow.Subscription subscription;
+ private volatile boolean completed;
+ private volatile boolean dropped;
+ private volatile Throwable error;
+ private final ConcurrentLinkedQueue<ByteBuffer> queue
= new ConcurrentLinkedQueue<>();
- final SequentialScheduler scheduler =
+ private final SequentialScheduler scheduler =
SequentialScheduler.synchronizedScheduler(this::processQueue);
+ private final HttpClientImpl client;
+
+ Http2TubeSubscriber(HttpClientImpl client) {
+ this.client = Objects.requireNonNull(client);
+ }
final void processQueue() {
try {
@@ -1189,6 +1195,12 @@
}
}
+ private final void runOrSchedule() {
+ if (client.isSelectorThread()) {
+ scheduler.runOrSchedule(client.theExecutor());
+ } else scheduler.runOrSchedule();
+ }
+
@Override
public void onSubscribe(Flow.Subscription subscription) {
// supports being called multiple time.
@@ -1212,7 +1224,7 @@
if (debug.on()) debug.log(() -> "onNext: got " + Utils.remaining(item)
+ " bytes in " + item.size() + " buffers");
queue.addAll(item);
- scheduler.runOrSchedule(client().theExecutor());
+ runOrSchedule();
}
@Override
@@ -1220,7 +1232,7 @@
if (debug.on()) debug.log(() -> "onError: " + throwable);
error = throwable;
completed = true;
- scheduler.runOrSchedule(client().theExecutor());
+ runOrSchedule();
}
@Override
@@ -1228,7 +1240,7 @@
if (debug.on()) debug.log("EOF");
error = new EOFException("EOF reached while reading");
completed = true;
- scheduler.runOrSchedule(client().theExecutor());
+ runOrSchedule();
}
@Override
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Fri May 04 14:32:26 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Fri May 04 17:02:19 2018 +0100
@@ -1064,6 +1064,15 @@
cancelImpl(cause);
}
+ void connectionClosing(Throwable cause) {
+ Flow.Subscriber<?> subscriber =
+ responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
+ errorRef.compareAndSet(null, cause);
+ if (subscriber != null && !sched.isStopped() && !inputQ.isEmpty()) {
+ sched.runOrSchedule();
+ } else cancelImpl(cause);
+ }
+
// This method sends a RST_STREAM frame
void cancelImpl(Throwable e) {
errorRef.compareAndSet(null, e);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Fri May 04 14:32:26 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Fri May 04 17:02:19 2018 +0100
@@ -480,6 +480,11 @@
return new EngineResult(sslResult);
case OK:
int size = dst.position();
+ if (debug.on()) {
+ debugr.log("Decoded " + size + " bytes out of " + len
+ + " into buffer of " + dst.capacity()
+ + " remaining to decode: " + src.remaining());
+ }
// if the record payload was bigger than what was originally
// allocated, then sets the adaptiveAppBufferSize to size
// and we will use that new size as a guess for the next app
@@ -764,7 +769,7 @@
// copy off the bytes to a smaller buffer, and keep
// the writeBuffer for next time.
dst.flip();
- dest = Utils.copy(dst);
+ dest = Utils.copyAligned(dst);
dst.clear();
} else {
// more than half the buffer was used.
@@ -775,8 +780,8 @@
writeBuffer = null;
}
if (debugw.on())
- debugw.log("OK => produced: %d, not wrapped: %d",
- dest.remaining(), Utils.remaining(src));
+ debugw.log("OK => produced: %d bytes into %d, not wrapped: %d",
+ dest.remaining(), dest.capacity(), Utils.remaining(src));
return new EngineResult(sslResult, dest);
case BUFFER_UNDERFLOW:
// Shouldn't happen. Doesn't returns when wrap()
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Fri May 04 14:32:26 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Fri May 04 17:02:19 2018 +0100
@@ -534,6 +534,16 @@
return dst;
}
+ public static ByteBuffer copyAligned(ByteBuffer src) {
+ int len = src.remaining();
+ int size = ((len + 7) >> 3) << 3;
+ assert size >= len;
+ ByteBuffer dst = ByteBuffer.allocate(size);
+ dst.put(src);
+ dst.flip();
+ return dst;
+ }
+
public static String dump(Object... objects) {
return Arrays.toString(objects);
}