--- a/test/jdk/java/net/httpclient/http2/BasicTest.java Wed Nov 29 16:34:52 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/BasicTest.java Wed Nov 29 16:59:38 2017 +0000
@@ -93,21 +93,23 @@
static List<CompletableFuture<Long>> cfs = Collections
.synchronizedList( new LinkedList<>());
- static AtomicReference<CompletableFuture<Long>> currentCF =
- new AtomicReference<>();
+ static CompletableFuture<Long> currentCF;
static class EchoWithPingHandler extends Http2EchoHandler {
+ private final Object lock = new Object();
+
@Override
public void handle(Http2TestExchange exchange) throws IOException {
- // ensure only one ping active at a time.
- currentCF.getAndUpdate((cf) -> {
- if (cf == null || cf.isDone()) {
+ // for now only one ping active at a time. don't want to saturate
+ synchronized(lock) {
+ CompletableFuture<Long> cf = currentCF;
+ if (cf == null || cf.isDone()) {
cf = exchange.sendPing();
assert cf != null;
cfs.add(cf);
+ currentCF = cf;
}
- return cf;
- });
+ }
super.handle(exchange);
}
}
@@ -122,7 +124,6 @@
streamTest(false);
streamTest(true);
paramsTest();
- Thread.sleep(1000 * 4);
CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0])).join();
synchronized (cfs) {
for (CompletableFuture<Long> cf : cfs) {
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Wed Nov 29 16:34:52 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Wed Nov 29 16:59:38 2017 +0000
@@ -38,7 +38,7 @@
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import jdk.incubator.http.internal.common.HttpHeadersImpl;
import jdk.incubator.http.internal.frame.*;
@@ -73,9 +73,7 @@
final boolean secure;
volatile boolean stopping;
volatile int nextPushStreamId = 2;
- volatile byte[] pingData;
- volatile CompletableFuture<Long> pingResponseHandler;
- final AtomicLong pingStamp; // milliseconds at time PING was sent
+ ConcurrentLinkedQueue<PingRequest> pings = new ConcurrentLinkedQueue<>();
final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
final static byte[] EMPTY_BARRAY = new byte[0];
@@ -87,6 +85,35 @@
Sentinel() { super(-1,-1);}
}
+ class PingRequest {
+ final byte[] pingData;
+ final long pingStamp;
+ final CompletableFuture<Long> response;
+
+ PingRequest() {
+ pingData = new byte[8];
+ random.nextBytes(pingData);
+ pingStamp = System.currentTimeMillis();
+ response = new CompletableFuture<>();
+ }
+
+ PingFrame frame() {
+ return new PingFrame(0, pingData);
+ }
+
+ CompletableFuture<Long> response() {
+ return response;
+ }
+
+ void success() {
+ response.complete(System.currentTimeMillis() - pingStamp);
+ }
+
+ void fail(Throwable t) {
+ response.completeExceptionally(t);
+ }
+ }
+
static Sentinel sentinel;
Http2TestServerConnection(Http2TestServer server,
@@ -110,7 +137,6 @@
this.exec = server.exec;
this.secure = server.secure;
this.pushStreams = new HashSet<>();
- this.pingStamp = new AtomicLong();
is = new BufferedInputStream(socket.getInputStream());
os = new BufferedOutputStream(socket.getOutputStream());
}
@@ -120,24 +146,24 @@
* CF when the PING ack is received. The CF is given
* an integer, whose value is the number of milliseconds
* between PING and ACK.
- *
- * Only one PING is allowed to be outstanding at any time
*/
CompletableFuture<Long> sendPing() {
- this.pingResponseHandler = new CompletableFuture<>();
+ PingRequest ping = null;
try {
- if (pingData != null) {
- throw new IllegalStateException("PING already outstanding");
- }
- pingData = new byte[8];
- random.nextBytes(pingData);
- pingStamp.set(System.currentTimeMillis());
- PingFrame ping = new PingFrame(0, pingData);
- outputQ.put(ping);
+ ping = new PingRequest();
+ pings.add(ping);
+ outputQ.put(ping.frame());
} catch (Throwable t) {
- pingResponseHandler.completeExceptionally(t);
+ ping.fail(t);
}
- return pingResponseHandler;
+ return ping.response();
+ }
+
+ /**
+ * Returns the first PingRequest from Queue
+ */
+ private PingRequest getNextRequest() {
+ return pings.poll();
}
/**
@@ -152,17 +178,16 @@
}
if (ping.getFlag(PingFrame.ACK)) {
// did we send a Ping?
- if (pingData == null) {
- System.err.println("Invalid ping received");
+ PingRequest request = getNextRequest();
+ if (request == null) {
+ System.err.println("Invalid ping ACK received");
close();
return;
- } else if (!Arrays.equals(pingData, ping.getData())) {
- pingResponseHandler.completeExceptionally(new RuntimeException("Wrong ping data in ACK"));
+ } else if (!Arrays.equals(request.pingData, ping.getData())) {
+ request.fail(new RuntimeException("Wrong ping data in ACK"));
} else {
- pingResponseHandler.complete(System.currentTimeMillis() - pingStamp.getAndSet(0));
+ request.success();
}
- pingResponseHandler = null;
- pingData = null;
} else {
// client originated PING. Just send it back with ACK set
ping.setFlag(PingFrame.ACK);