--- a/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java Tue Jun 26 13:16:40 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java Tue Jun 26 13:19:01 2018 +0100
@@ -316,7 +316,7 @@
proxyResponse.version, true);
return MinimalFuture.completedFuture(syntheticResponse);
} else if (t != null) {
- if (debug.on()) debug.log("checkFor407: no response - %s", t);
+ if (debug.on()) debug.log("checkFor407: no response - %s", (Object)t);
return MinimalFuture.failedFuture(t);
} else {
if (debug.on()) debug.log("checkFor407: all clear");
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Tue Jun 26 13:16:40 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Tue Jun 26 13:19:01 2018 +0100
@@ -673,7 +673,11 @@
client2.deleteConnection(this);
List<Stream<?>> c = new LinkedList<>(streams.values());
for (Stream<?> s : c) {
- s.connectionClosing(t);
+ try {
+ s.connectionClosing(t);
+ } catch (Throwable e) {
+ Log.logError("Failed to close stream {0}: {1}", s.streamid, e);
+ }
}
connection.close();
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Tue Jun 26 13:16:40 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Tue Jun 26 13:19:01 2018 +0100
@@ -500,8 +500,8 @@
{
int amount = frame.getUpdate();
if (amount <= 0) {
- Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
- streamid, streamid, amount);
+ Log.logTrace("Resetting stream: {0}, Window Update amount: {1}",
+ streamid, amount);
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
} else {
assert streamid != 0;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java Tue Jun 26 13:16:40 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java Tue Jun 26 13:19:01 2018 +0100
@@ -66,8 +66,9 @@
abstract int getStreamId();
void update(int delta) {
- if (debug.on()) debug.log("update: %d", delta);
- if (received.addAndGet(delta) > limit) {
+ int rcv = received.addAndGet(delta);
+ if (debug.on()) debug.log("update: %d, received: %d, limit: %d", delta, rcv, limit);
+ if (rcv > limit) {
synchronized (this) {
int tosend = received.get();
if( tosend > limit) {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Tue Jun 26 13:16:40 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Tue Jun 26 13:19:01 2018 +0100
@@ -33,6 +33,9 @@
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -93,6 +96,8 @@
// When handshake is in progress trying to wrap may produce no bytes.
private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
+ private static final boolean isMonitored =
+ monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true"));
final Executor exec;
final Reader reader;
@@ -100,6 +105,7 @@
final SSLEngine engine;
final String tubeName; // hack
final CompletableFuture<String> alpnCF; // completes on initial handshake
+ final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped
volatile boolean close_notify_received;
final CompletableFuture<Void> readerCF;
final CompletableFuture<Void> writerCF;
@@ -152,8 +158,7 @@
// Writer to the downWriter.
connect(downReader, downWriter);
- if (monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true")))
- Monitor.add(this::monitor);
+ if (isMonitored) Monitor.add(monitor);
}
/**
@@ -202,6 +207,7 @@
public String monitor() {
StringBuilder sb = new StringBuilder();
sb.append("SSL: id ").append(id);
+ sb.append(" ").append(dbgString());
sb.append(" HS state: " + states(handshakeState));
sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
if (stateList != null) {
@@ -293,8 +299,10 @@
@Override
public String toString() {
- return "READER: " + super.toString() + " readBuf: " + readBuf.toString()
- + " count: " + count.toString();
+ return "READER: " + super.toString() + ", readBuf: " + readBuf.toString()
+ + ", count: " + count.toString() + ", scheduler: "
+ + (scheduler.isStopped() ? "stopped" : "running")
+ + ", status: " + lastUnwrapStatus;
}
private void reallocReadBuf() {
@@ -335,6 +343,7 @@
}
if (complete) {
this.completing = complete;
+ minBytesRequired = 0;
}
}
}
@@ -395,13 +404,23 @@
// not enough data in the read buffer...
// no need to try to unwrap again unless we get more bytes
// than minBytesRequired = len in the read buffer.
- minBytesRequired = len;
synchronized (readBufferLock) {
+ minBytesRequired = len;
// more bytes could already have been added...
assert readBuf.remaining() >= len;
// check if we have received some data, and if so
// we can just re-spin the loop
if (readBuf.remaining() > len) continue;
+ else if (this.completing) {
+ if (debug.on()) {
+ debugr.log("BUFFER_UNDERFLOW with EOF," +
+ " %d bytes non decrypted.", len);
+ }
+ // The channel won't send us any more data, and
+ // we are in underflow: we need to fail.
+ throw new IOException("BUFFER_UNDERFLOW with EOF, "
+ + len + " bytes non decrypted.");
+ }
}
// request more data and return.
requestMore();
@@ -452,12 +471,13 @@
}
}
+ private volatile Status lastUnwrapStatus;
EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
ByteBuffer dst = getAppBuffer();
int len = src.remaining();
while (true) {
SSLEngineResult sslResult = engine.unwrap(src, dst);
- switch (sslResult.getStatus()) {
+ switch (lastUnwrapStatus = sslResult.getStatus()) {
case BUFFER_OVERFLOW:
// may happen if app size buffer was changed, or if
// our 'adaptiveBufferSize' guess was too small for
@@ -507,7 +527,9 @@
}
public static class Monitor extends Thread {
- final List<Monitorable> list;
+ final List<WeakReference<Monitorable>> list;
+ final List<FinalMonitorable> finalList;
+ final ReferenceQueue<Monitorable> queue = new ReferenceQueue<>();
static Monitor themon;
static {
@@ -515,19 +537,61 @@
themon.start(); // uncomment to enable Monitor
}
+ // An instance used to temporarily store the
+ // last observable state of a monitorable object.
+ // When Monitor.remove(o) is called, we replace
+ // 'o' with a FinalMonitorable whose reference
+ // will be enqueued after the last observable state
+ // has been printed.
+ final class FinalMonitorable implements Monitorable {
+ final String finalState;
+ FinalMonitorable(Monitorable o) {
+ finalState = o.getInfo();
+ finalList.add(this);
+ }
+ @Override
+ public String getInfo() {
+ finalList.remove(this);
+ return finalState;
+ }
+ }
+
Monitor() {
super("Monitor");
setDaemon(true);
list = Collections.synchronizedList(new LinkedList<>());
+ finalList = new ArrayList<>(); // access is synchronized on list above
}
void addTarget(Monitorable o) {
- list.add(o);
+ list.add(new WeakReference<>(o, queue));
+ }
+ void removeTarget(Monitorable o) {
+ // It can take a long time for GC to clean up references.
+ // Calling Monitor.remove() early helps removing noise from the
+ // logs/
+ synchronized (list) {
+ Iterator<WeakReference<Monitorable>> it = list.iterator();
+ while (it.hasNext()) {
+ Monitorable m = it.next().get();
+ if (m == null) it.remove();
+ if (o == m) {
+ it.remove();
+ break;
+ }
+ }
+ FinalMonitorable m = new FinalMonitorable(o);
+ addTarget(m);
+ Reference.reachabilityFence(m);
+ }
}
public static void add(Monitorable o) {
themon.addTarget(o);
}
+ public static void remove(Monitorable o) {
+ themon.removeTarget(o);
+ }
@Override
public void run() {
@@ -536,7 +600,14 @@
while (true) {
Thread.sleep(20 * 1000);
synchronized (list) {
- for (Monitorable o : list) {
+ Reference<? extends Monitorable> expired;
+ while ((expired = queue.poll()) != null) list.remove(expired);
+ for (WeakReference<Monitorable> ref : list) {
+ Monitorable o = ref.get();
+ if (o == null) continue;
+ if (o instanceof FinalMonitorable) {
+ ref.enqueue();
+ }
System.out.println(o.getInfo());
System.out.println("-------------------------");
}
@@ -733,6 +804,7 @@
// downstream. Otherwise, we send the writeBuffer downstream
// and will allocate a new one next time.
volatile ByteBuffer writeBuffer;
+ private volatile Status lastWrappedStatus;
@SuppressWarnings("fallthrough")
EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
long len = Utils.remaining(src);
@@ -747,7 +819,7 @@
while (true) {
SSLEngineResult sslResult = engine.wrap(src, dst);
if (debugw.on()) debugw.log("SSLResult: " + sslResult);
- switch (sslResult.getStatus()) {
+ switch (lastWrappedStatus = sslResult.getStatus()) {
case BUFFER_OVERFLOW:
// Shouldn't happen. We allocated buffer with packet size
// get it again if net buffer size was changed
@@ -815,8 +887,10 @@
@Override
public String toString() {
- return "WRITER: " + super.toString() +
- " writeList size " + Integer.toString(writeList.size());
+ return "WRITER: " + super.toString()
+ + ", writeList size: " + Integer.toString(writeList.size())
+ + ", scheduler: " + (scheduler.isStopped() ? "stopped" : "running")
+ + ", status: " + lastWrappedStatus;
//" writeList: " + writeList.toString();
}
}
@@ -839,6 +913,7 @@
stopped = true;
reader.stop();
writer.stop();
+ if (isMonitored) Monitor.remove(monitor);
}
private Void stopOnError(Throwable currentlyUnused) {