--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Wed Jun 27 21:24:56 2018 +0100
@@ -386,8 +386,11 @@
// we have a flow List<ByteBuffer> upstream.
Http1AsyncDelegateSubscription subscription =
new Http1AsyncDelegateSubscription(scheduler, cancel, onSubscriptionError);
- pending.onSubscribe(subscription);
- this.delegate = delegate = pending;
+ try {
+ pending.onSubscribe(subscription);
+ } finally {
+ this.delegate = delegate = pending;
+ }
final Object captured = delegate;
if (debug.on())
debug.log("delegate is now " + captured
@@ -485,10 +488,11 @@
error = ex;
}
}
- final Throwable t = (recorded == null ? ex : recorded);
- if (debug.on())
- debug.log("recorded " + t + "\n\t delegate: " + delegate
- + "\t\t queue.isEmpty: " + queue.isEmpty(), ex);
+
+ final Throwable t = (recorded == null ? ex : recorded);
+ if (debug.on())
+ debug.log("recorded " + t + "\n\t delegate: " + delegate
+ + "\t\t queue.isEmpty: " + queue.isEmpty(), ex);
if (Log.errors()) {
Log.logError("HTTP/1 read subscriber recorded error: {0} - {1}", describe(), t);
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Wed Jun 27 21:24:56 2018 +0100
@@ -257,6 +257,14 @@
.thenCompose(unused -> {
CompletableFuture<Void> cf = new MinimalFuture<>();
try {
+ asyncReceiver.whenFinished.whenComplete((r,t) -> {
+ if (t != null) {
+ if (debug.on())
+ debug.log("asyncReceiver finished (failed=%s)", (Object)t);
+ if (!headersSentCF.isDone())
+ headersSentCF.completeAsync(() -> this, executor);
+ }
+ });
connectFlows(connection);
if (debug.on()) debug.log("requestAction.headers");
@@ -282,7 +290,8 @@
private void cancelIfFailed(Flow.Subscription s) {
asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> {
- if (debug.on()) debug.log("asyncReceiver finished (failed=%s)", t);
+ if (debug.on())
+ debug.log("asyncReceiver finished (failed=%s)", (Object)t);
if (t != null) {
s.cancel();
// Don't complete exceptionally here as 't'
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Wed Jun 27 21:24:56 2018 +0100
@@ -742,6 +742,9 @@
}
if (!(frame instanceof ResetFrame)) {
+ if (frame instanceof DataFrame) {
+ dropDataFrame((DataFrame)frame);
+ }
if (isServerInitiatedStream(streamid)) {
if (streamid < nextPushStream) {
// trailing data on a cancelled push promise stream,
@@ -780,6 +783,27 @@
}
}
+ final void dropDataFrame(DataFrame df) {
+ if (closed) return;
+ if (debug.on()) {
+ debug.log("Dropping data frame for stream %d (%d payload bytes)",
+ df.streamid(), df.payloadLength());
+ }
+ ensureWindowUpdated(df);
+ }
+
+ final void ensureWindowUpdated(DataFrame df) {
+ try {
+ if (closed) return;
+ int length = df.payloadLength();
+ if (length > 0) {
+ windowUpdater.update(length);
+ }
+ } catch(Throwable t) {
+ Log.logError("Unexpected exception while updating window: {0}", (Object)t);
+ }
+ }
+
private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
throws IOException
{
@@ -988,7 +1012,6 @@
connection.channel().getLocalAddress(),
connection.address());
SettingsFrame sf = new SettingsFrame(clientSettings);
- int initialWindowSize = sf.getParameter(INITIAL_WINDOW_SIZE);
ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
Log.logFrames(sf, "OUT");
// send preface bytes and SettingsFrame together
@@ -1001,9 +1024,20 @@
Log.logTrace("Settings Frame sent");
// send a Window update for the receive buffer we are using
- // minus the initial 64 K specified in protocol
- final int len = windowUpdater.initialWindowSize - initialWindowSize;
- if (len > 0) {
+ // minus the initial 64 K -1 specified in protocol:
+ // RFC 7540, Section 6.9.2:
+ // "[...] the connection flow-control window is set to the default
+ // initial window size until a WINDOW_UPDATE frame is received."
+ //
+ // Note that the default initial window size, not to be confused
+ // with the initial window size, is defined by RFC 7540 as
+ // 64K -1.
+ final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE;
+ if (len != 0) {
+ if (Log.channel()) {
+ Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})",
+ len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE);
+ }
windowUpdater.sendWindowUpdate(len);
}
// there will be an ACK to the windows update - which should
@@ -1136,6 +1170,7 @@
private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
Stream<?> stream = oh.getAttachment();
+ assert stream.streamid == 0;
int streamid = nextstreamid;
nextstreamid += 2;
stream.registerStream(streamid);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java Wed Jun 27 21:24:56 2018 +0100
@@ -185,6 +185,7 @@
int size = Utils.remaining(dsts, Integer.MAX_VALUE);
if (size == 0 && finished) {
inputQ.remove();
+ connection.ensureWindowUpdated(df); // must update connection window
Log.logTrace("responseSubscriber.onComplete");
if (debug.on()) debug.log("incoming: onComplete");
sched.stop();
@@ -197,7 +198,12 @@
inputQ.remove();
Log.logTrace("responseSubscriber.onNext {0}", size);
if (debug.on()) debug.log("incoming: onNext(%d)", size);
- subscriber.onNext(dsts);
+ try {
+ subscriber.onNext(dsts);
+ } catch (Throwable t) {
+ connection.dropDataFrame(df); // must update connection window
+ throw t;
+ }
if (consumed(df)) {
Log.logTrace("responseSubscriber.onComplete");
if (debug.on()) debug.log("incoming: onComplete");
@@ -215,6 +221,8 @@
}
} catch (Throwable throwable) {
errorRef.compareAndSet(null, throwable);
+ } finally {
+ if (sched.isStopped()) drainInputQueue();
}
Throwable t = errorRef.get();
@@ -223,20 +231,35 @@
try {
if (!onCompleteCalled) {
if (debug.on())
- debug.log("calling subscriber.onError: %s", (Object)t);
+ debug.log("calling subscriber.onError: %s", (Object) t);
subscriber.onError(t);
} else {
if (debug.on())
- debug.log("already completed: dropping error %s", (Object)t);
+ debug.log("already completed: dropping error %s", (Object) t);
}
} catch (Throwable x) {
- Log.logError("Subscriber::onError threw exception: {0}", (Object)t);
+ Log.logError("Subscriber::onError threw exception: {0}", (Object) t);
} finally {
cancelImpl(t);
+ drainInputQueue();
}
}
}
+ // must only be called from the scheduler schedule() loop.
+ // ensure that all received data frames are accounted for
+ // in the connection window flow control if the scheduler
+ // is stopped before all the data is consumed.
+ private void drainInputQueue() {
+ Http2Frame frame;
+ while ((frame = inputQ.poll()) != null) {
+ if (frame instanceof DataFrame) {
+ connection.dropDataFrame((DataFrame)frame);
+ }
+ }
+ }
+
+
// Callback invoked after the Response BodySubscriber has consumed the
// buffers contained in a DataFrame.
// Returns true if END_STREAM is reached, false otherwise.
@@ -245,15 +268,19 @@
// The entire DATA frame payload is included in flow control,
// including the Pad Length and Padding fields if present
int len = df.payloadLength();
+ boolean endStream = df.getFlag(DataFrame.END_STREAM);
+ if (len == 0) return endStream;
+
connection.windowUpdater.update(len);
- if (!df.getFlag(DataFrame.END_STREAM)) {
+ if (!endStream) {
// Don't send window update on a stream which is
// closed or half closed.
windowUpdater.update(len);
- return false; // more data coming
}
- return true; // end of stream
+
+ // true: end of stream; false: more data coming
+ return endStream;
}
boolean deRegister() {
@@ -1126,7 +1153,7 @@
connection.resetStream(streamid, ResetFrame.CANCEL);
}
}
- } catch (IOException ex) {
+ } catch (Throwable ex) {
Log.logError(ex);
}
}
@@ -1289,6 +1316,18 @@
int getStreamId() {
return streamid;
}
+
+ @Override
+ String dbgString() {
+ String dbg = dbgString;
+ if (dbg != null) return dbg;
+ if (streamid == 0) {
+ return connection.dbgString() + ":WindowUpdateSender(stream: ?)";
+ } else {
+ dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamid + ")";
+ return dbgString = dbg;
+ }
+ }
}
/**
--- a/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java Wed Jun 27 21:24:56 2018 +0100
@@ -25,6 +25,7 @@
package jdk.internal.net.http;
+import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.frame.SettingsFrame;
import jdk.internal.net.http.frame.WindowUpdateFrame;
@@ -84,8 +85,18 @@
connection.sendUnorderedFrame(new WindowUpdateFrame(getStreamId(), delta));
}
+ volatile String dbgString;
String dbgString() {
- return "WindowUpdateSender(stream: " + getStreamId() + ")";
+ String dbg = dbgString;
+ if (dbg != null) return dbg;
+ FlowTube tube = connection.connection.getConnectionFlow();
+ if (tube == null) {
+ return "WindowUpdateSender(stream: " + getStreamId() + ")";
+ } else {
+ int streamId = getStreamId();
+ dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamId + ")";
+ return streamId == 0 ? dbg : (dbgString = dbg);
+ }
}
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Wed Jun 27 21:24:56 2018 +0100
@@ -448,6 +448,7 @@
} catch (IOException ex) {
errorCommon(ex);
handleError(ex);
+ return;
}
if (handshaking && !complete)
return;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java Wed Jun 27 21:24:56 2018 +0100
@@ -485,14 +485,22 @@
return !(hs == NOT_HANDSHAKING || hs == FINISHED);
}
- private boolean handshakeFailed() {
+ private String handshakeFailed() {
// sslDelegate can be null if we reach here
// during the initial handshake, as that happens
// within the SSLFlowDelegate constructor.
// In that case we will want to raise an exception.
- return handshaking()
+ if (handshaking()
&& (sslDelegate == null
- || !sslDelegate.closeNotifyReceived());
+ || !sslDelegate.closeNotifyReceived())) {
+ return "Remote host terminated the handshake";
+ }
+ // The initial handshake may not have been started yet.
+ // In which case - if we are completed before the initial handshake
+ // is started, we consider this a handshake failure as well.
+ if ("SSL_NULL_WITH_NULL_NULL".equals(engine.getSession().getCipherSuite()))
+ return "Remote host closed the channel";
+ return null;
}
@Override
@@ -503,14 +511,15 @@
subscriberImpl = subscribed;
}
- if (handshakeFailed()) {
+ String handshakeFailed = handshakeFailed();
+ if (handshakeFailed != null) {
if (debug.on())
- debug.log("handshake: %s, inbound done: %s outbound done: %s",
+ debug.log("handshake: %s, inbound done: %s, outbound done: %s: %s",
engine.getHandshakeStatus(),
engine.isInboundDone(),
- engine.isOutboundDone());
- onErrorImpl(new SSLHandshakeException(
- "Remote host terminated the handshake"));
+ engine.isOutboundDone(),
+ handshakeFailed);
+ onErrorImpl(new SSLHandshakeException(handshakeFailed));
} else if (subscriberImpl != null) {
onCompleteReceived = finished = true;
subscriberImpl.onComplete();
--- a/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java Tue Jun 26 13:19:01 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java Wed Jun 27 21:24:56 2018 +0100
@@ -161,14 +161,19 @@
}
}
+ public static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * K -1;
+ public static final int DEFAULT_HEADER_TABLE_SIZE = 4 * K;
+ public static final int DEFAULT_MAX_CONCURRENT_STREAMS = 100;
+ public static final int DEFAULT_MAX_FRAME_SIZE = 16 * K;
+
public static SettingsFrame getDefaultSettings() {
SettingsFrame f = new SettingsFrame();
// TODO: check these values
f.setParameter(ENABLE_PUSH, 1);
- f.setParameter(HEADER_TABLE_SIZE, 4 * K);
- f.setParameter(MAX_CONCURRENT_STREAMS, 100);
- f.setParameter(INITIAL_WINDOW_SIZE, 64 * K - 1);
- f.setParameter(MAX_FRAME_SIZE, 16 * K);
+ f.setParameter(HEADER_TABLE_SIZE, DEFAULT_HEADER_TABLE_SIZE);
+ f.setParameter(MAX_CONCURRENT_STREAMS, DEFAULT_MAX_CONCURRENT_STREAMS);
+ f.setParameter(INITIAL_WINDOW_SIZE, DEFAULT_INITIAL_WINDOW_SIZE);
+ f.setParameter(MAX_FRAME_SIZE, DEFAULT_MAX_FRAME_SIZE);
return f;
}
}
--- a/test/jdk/java/net/httpclient/CancelledResponse.java Tue Jun 26 13:19:01 2018 +0100
+++ b/test/jdk/java/net/httpclient/CancelledResponse.java Wed Jun 27 21:24:56 2018 +0100
@@ -339,9 +339,11 @@
Thread.sleep(10);
}
out.println("sent " + s);
- } catch (SSLException | SocketException x) {
- // if SSL then we might get a "Broken Pipe", otherwise
- // a "Socket closed".
+ } catch (SSLException | SocketException | RuntimeException x) {
+ // if SSL then we might get a "Broken Pipe", or a
+ // RuntimeException wrapping an InvalidAlgorithmParameterException
+ // (probably if the channel is closed during the handshake),
+ // otherwise we get a "Socket closed".
boolean expected = cancelled.get();
if (sent > 0 && expected) {
System.out.println("Connection closed by peer as expected: " + x);
@@ -349,6 +351,7 @@
} else {
System.out.println("Unexpected exception (sent="
+ sent + ", cancelled=" + expected + "): " + x);
+ if (x instanceof RuntimeException) throw (RuntimeException) x;
throw new RuntimeException(x);
}
} catch (IOException | InterruptedException e) {
--- a/test/jdk/java/net/httpclient/MockServer.java Tue Jun 26 13:19:01 2018 +0100
+++ b/test/jdk/java/net/httpclient/MockServer.java Wed Jun 27 21:24:56 2018 +0100
@@ -284,7 +284,7 @@
}
try {
socket.close();
- } catch (IOException e) {}
+ } catch (Throwable e) {}
synchronized (removals) {
removals.add(this);
}
@@ -339,7 +339,7 @@
closed = true;
try {
ss.close();
- } catch (IOException e) {
+ } catch (Throwable e) {
e.printStackTrace();
}
for (Connection c : sockets) {
--- a/test/jdk/java/net/httpclient/ShortResponseBody.java Tue Jun 26 13:19:01 2018 +0100
+++ b/test/jdk/java/net/httpclient/ShortResponseBody.java Wed Jun 27 21:24:56 2018 +0100
@@ -264,19 +264,47 @@
// can be used to prolong request body publication
static final class InfiniteInputStream extends InputStream {
+ int count = 0;
+ int k16 = 0;
@Override
public int read() throws IOException {
+ if (++count == 1) {
+ System.out.println("Start sending 1 byte");
+ }
+ if (count > 16 * 1024) {
+ k16++;
+ System.out.println("... 16K sent.");
+ count = count % (16 * 1024);
+ }
+ if (k16 > 128) {
+ System.out.println("WARNING: InfiniteInputStream: " +
+ "more than 128 16k buffers generated: returning EOF");
+ return -1;
+ }
return 1;
}
@Override
public int read(byte[] buf, int offset, int length) {
//int count = offset;
- //length = Math.max(0, Math.min(buf.length - offset, length));
+ length = Math.max(0, Math.min(buf.length - offset, length));
//for (; count < length; count++)
// buf[offset++] = 0x01;
//return count;
- return Math.max(0, Math.min(buf.length - offset, length));
+ if (count == 0) {
+ System.out.println("Start sending " + length);
+ } else if (count > 16 * 1024) {
+ k16++;
+ System.out.println("... 16K sent.");
+ count = count % (16 * 1024);
+ }
+ if (k16 > 128) {
+ System.out.println("WARNING: InfiniteInputStream: " +
+ "more than 128 16k buffers generated: returning EOF");
+ return -1;
+ }
+ count += length;
+ return length;
}
}
@@ -493,10 +521,13 @@
out.print(requestMethod + " ");
URI uriPath = readRequestPath(is);
out.println(uriPath);
- readRequestHeaders(is);
+ String headers = readRequestHeaders(is);
String query = uriPath.getRawQuery();
- assert query != null;
+ if (query == null) {
+ out.println("Request headers: [" + headers + "]");
+ }
+ assert query != null : "null query for uriPath: " + uriPath;
String qv = query.split("=")[1];
int len;
if (qv.equals("all")) {
@@ -542,9 +573,11 @@
}
// Read until the end of a HTTP request headers
- static void readRequestHeaders(InputStream is) throws IOException {
+ static String readRequestHeaders(InputStream is) throws IOException {
int requestEndCount = 0, r;
+ StringBuilder sb = new StringBuilder();
while ((r = is.read()) != -1) {
+ sb.append((char) r);
if (r == requestEnd[requestEndCount]) {
requestEndCount++;
if (requestEndCount == 4) {
@@ -554,6 +587,7 @@
requestEndCount = 0;
}
}
+ return sb.toString();
}
}
--- a/test/jdk/java/net/httpclient/SplitResponse.java Tue Jun 26 13:19:01 2018 +0100
+++ b/test/jdk/java/net/httpclient/SplitResponse.java Wed Jun 27 21:24:56 2018 +0100
@@ -32,6 +32,7 @@
import java.util.concurrent.CompletableFuture;
import javax.net.ssl.SSLContext;
import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLException;
import javax.net.ssl.SSLServerSocketFactory;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
@@ -268,7 +269,7 @@
String onechar = s.substring(i, i + 1);
try {
conn.send(onechar);
- } catch(SocketException x) {
+ } catch(SocketException | SSLException x) {
if (!useSSL || i != len - 1) throw x;
if (x.getMessage().contains("closed by remote host")) {
String osname = System.getProperty("os.name", "unknown");