# HG changeset patch # User michaelm # Date 1511974778 0 # Node ID dfa9489d1cb1cc89804d847213af162137d3eb5a # Parent d3298b9365e77732a020f18a19179e3e3411177e http-client-branch: fixed Http2TestServerConnection problem diff -r d3298b9365e7 -r dfa9489d1cb1 test/jdk/java/net/httpclient/http2/BasicTest.java --- a/test/jdk/java/net/httpclient/http2/BasicTest.java Wed Nov 29 16:34:52 2017 +0000 +++ b/test/jdk/java/net/httpclient/http2/BasicTest.java Wed Nov 29 16:59:38 2017 +0000 @@ -93,21 +93,23 @@ static List> cfs = Collections .synchronizedList( new LinkedList<>()); - static AtomicReference> currentCF = - new AtomicReference<>(); + static CompletableFuture currentCF; static class EchoWithPingHandler extends Http2EchoHandler { + private final Object lock = new Object(); + @Override public void handle(Http2TestExchange exchange) throws IOException { - // ensure only one ping active at a time. - currentCF.getAndUpdate((cf) -> { - if (cf == null || cf.isDone()) { + // for now only one ping active at a time. don't want to saturate + synchronized(lock) { + CompletableFuture cf = currentCF; + if (cf == null || cf.isDone()) { cf = exchange.sendPing(); assert cf != null; cfs.add(cf); + currentCF = cf; } - return cf; - }); + } super.handle(exchange); } } @@ -122,7 +124,6 @@ streamTest(false); streamTest(true); paramsTest(); - Thread.sleep(1000 * 4); CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0])).join(); synchronized (cfs) { for (CompletableFuture cf : cfs) { diff -r d3298b9365e7 -r dfa9489d1cb1 test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java --- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Wed Nov 29 16:34:52 2017 +0000 +++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Wed Nov 29 16:59:38 2017 +0000 @@ -38,7 +38,7 @@ import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Consumer; import jdk.incubator.http.internal.common.HttpHeadersImpl; import jdk.incubator.http.internal.frame.*; @@ -73,9 +73,7 @@ final boolean secure; volatile boolean stopping; volatile int nextPushStreamId = 2; - volatile byte[] pingData; - volatile CompletableFuture pingResponseHandler; - final AtomicLong pingStamp; // milliseconds at time PING was sent + ConcurrentLinkedQueue pings = new ConcurrentLinkedQueue<>(); final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); final static byte[] EMPTY_BARRAY = new byte[0]; @@ -87,6 +85,35 @@ Sentinel() { super(-1,-1);} } + class PingRequest { + final byte[] pingData; + final long pingStamp; + final CompletableFuture response; + + PingRequest() { + pingData = new byte[8]; + random.nextBytes(pingData); + pingStamp = System.currentTimeMillis(); + response = new CompletableFuture<>(); + } + + PingFrame frame() { + return new PingFrame(0, pingData); + } + + CompletableFuture response() { + return response; + } + + void success() { + response.complete(System.currentTimeMillis() - pingStamp); + } + + void fail(Throwable t) { + response.completeExceptionally(t); + } + } + static Sentinel sentinel; Http2TestServerConnection(Http2TestServer server, @@ -110,7 +137,6 @@ this.exec = server.exec; this.secure = server.secure; this.pushStreams = new HashSet<>(); - this.pingStamp = new AtomicLong(); is = new BufferedInputStream(socket.getInputStream()); os = new BufferedOutputStream(socket.getOutputStream()); } @@ -120,24 +146,24 @@ * CF when the PING ack is received. The CF is given * an integer, whose value is the number of milliseconds * between PING and ACK. - * - * Only one PING is allowed to be outstanding at any time */ CompletableFuture sendPing() { - this.pingResponseHandler = new CompletableFuture<>(); + PingRequest ping = null; try { - if (pingData != null) { - throw new IllegalStateException("PING already outstanding"); - } - pingData = new byte[8]; - random.nextBytes(pingData); - pingStamp.set(System.currentTimeMillis()); - PingFrame ping = new PingFrame(0, pingData); - outputQ.put(ping); + ping = new PingRequest(); + pings.add(ping); + outputQ.put(ping.frame()); } catch (Throwable t) { - pingResponseHandler.completeExceptionally(t); + ping.fail(t); } - return pingResponseHandler; + return ping.response(); + } + + /** + * Returns the first PingRequest from Queue + */ + private PingRequest getNextRequest() { + return pings.poll(); } /** @@ -152,17 +178,16 @@ } if (ping.getFlag(PingFrame.ACK)) { // did we send a Ping? - if (pingData == null) { - System.err.println("Invalid ping received"); + PingRequest request = getNextRequest(); + if (request == null) { + System.err.println("Invalid ping ACK received"); close(); return; - } else if (!Arrays.equals(pingData, ping.getData())) { - pingResponseHandler.completeExceptionally(new RuntimeException("Wrong ping data in ACK")); + } else if (!Arrays.equals(request.pingData, ping.getData())) { + request.fail(new RuntimeException("Wrong ping data in ACK")); } else { - pingResponseHandler.complete(System.currentTimeMillis() - pingStamp.getAndSet(0)); + request.success(); } - pingResponseHandler = null; - pingData = null; } else { // client originated PING. Just send it back with ACK set ping.setFlag(PingFrame.ACK);