--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Mon Feb 26 16:17:03 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Mon Feb 26 17:57:43 2018 +0000
@@ -76,6 +76,9 @@
* <--------------- | | <--------------
* supplied to constructor | | obtained from this
* +------------------+
+ *
+ * Errors are reported to the downReader Flow.Subscriber
+ *
* }
* </pre>
*/
@@ -90,10 +93,12 @@
final Writer writer;
final SSLEngine engine;
final String tubeName; // hack
- private final CompletableFuture<Void> cf;
final CompletableFuture<String> alpnCF; // completes on initial handshake
final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
volatile boolean close_notify_received;
+ volatile Flow.Subscriber<?> downReader;
+ static AtomicInteger scount = new AtomicInteger(1);
+ final int id;
/**
* Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
@@ -105,22 +110,26 @@
Subscriber<? super List<ByteBuffer>> downReader,
Subscriber<? super List<ByteBuffer>> downWriter)
{
+ this.id = scount.getAndIncrement();
this.tubeName = String.valueOf(downWriter);
this.reader = new Reader();
this.writer = new Writer();
this.engine = engine;
this.exec = exec;
this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
- CompletableFuture<Void> cs = CompletableFuture.allOf(
- reader.completion(), writer.completion()).thenRun(this::normalStop);
- this.cf = MinimalFuture.of(cs);
+ CompletableFuture.anyOf(reader.completion(), writer.completion())
+ .exceptionally(this::stopOnError);
+
+ CompletableFuture.allOf(reader.completion(), writer.completion())
+ .thenRun(this::normalStop);
this.alpnCF = new MinimalFuture<>();
+ this.downReader = downReader;
// connect the Reader to the downReader and the
// Writer to the downWriter.
connect(downReader, downWriter);
- //Monitor.add(this::monitor);
+ Monitor.add(this::monitor);
}
/**
@@ -144,6 +153,7 @@
*/
void connect(Subscriber<? super List<ByteBuffer>> downReader,
Subscriber<? super List<ByteBuffer>> downWriter) {
+ this.downReader = downReader;
this.reader.subscribe(downReader);
this.writer.subscribe(downWriter);
}
@@ -168,7 +178,8 @@
public String monitor() {
StringBuilder sb = new StringBuilder();
- sb.append("SSL: HS state: " + states(handshakeState));
+ sb.append("SSL: id ").append(id);
+ sb.append(" HS state: " + states(handshakeState));
sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
sb.append(" LL : ");
for (String s: stateList) {
@@ -369,16 +380,6 @@
}
}
- /**
- * Returns a CompletableFuture which completes after all activity
- * in the delegate is terminated (whether normally or exceptionally).
- *
- * @return
- */
- public CompletableFuture<Void> completion() {
- return cf;
- }
-
public interface Monitorable {
public String getInfo();
}
@@ -606,7 +607,7 @@
private void handleError(Throwable t) {
debug.log(Level.DEBUG, "handleError", t);
- cf.completeExceptionally(t);
+ downReader.onError(t);
// no-op if already completed
alpnCF.completeExceptionally(t);
reader.stop();
@@ -614,8 +615,13 @@
}
private void normalStop() {
+ stopOnError(null);
+ }
+
+ private Void stopOnError(Throwable t) {
reader.stop();
writer.stop();
+ return null;
}
private void cleanList(List<ByteBuffer> l) {
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Mon Feb 26 16:17:03 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Mon Feb 26 17:57:43 2018 +0000
@@ -194,6 +194,7 @@
SSLServerSocket se = (SSLServerSocket) fac.createServerSocket(port);
SSLParameters sslp = se.getSSLParameters();
sslp.setApplicationProtocols(new String[]{"h2"});
+ sslp.setEndpointIdentificationAlgorithm("HTTPS");
se.setSSLParameters(sslp);
se.setEnabledCipherSuites(se.getSupportedCipherSuites());
se.setEnabledProtocols(se.getSupportedProtocols());
@@ -222,19 +223,22 @@
try {
while (!stopping) {
Socket socket = server.accept();
- InetSocketAddress addr = (InetSocketAddress) socket.getRemoteSocketAddress();
- Http2TestServerConnection c =
- new Http2TestServerConnection(this, socket, exchangeSupplier);
- putConnection(addr, c);
+ Http2TestServerConnection c = null;
+ InetSocketAddress addr = null;
try {
+ addr = (InetSocketAddress) socket.getRemoteSocketAddress();
+ c = new Http2TestServerConnection(this, socket, exchangeSupplier);
+ putConnection(addr, c);
c.run();
} catch (Throwable e) {
// we should not reach here, but if we do
// the connection might not have been closed
// and if so then the client might wait
// forever.
- removeConnection(addr, c);
- c.close(ErrorFrame.PROTOCOL_ERROR);
+ if (c != null) {
+ removeConnection(addr, c);
+ c.close(ErrorFrame.PROTOCOL_ERROR);
+ }
System.err.println("TestServer: start exception: " + e);
//throw e;
}