http-client-branch: fixed Http2TestServerConnection problem http-client-branch
authormichaelm
Wed, 29 Nov 2017 16:59:38 +0000
branchhttp-client-branch
changeset 55912 dfa9489d1cb1
parent 55911 d3298b9365e7
child 55922 77feac3903d9
http-client-branch: fixed Http2TestServerConnection problem
test/jdk/java/net/httpclient/http2/BasicTest.java
test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java
--- a/test/jdk/java/net/httpclient/http2/BasicTest.java	Wed Nov 29 16:34:52 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/BasicTest.java	Wed Nov 29 16:59:38 2017 +0000
@@ -93,21 +93,23 @@
     static List<CompletableFuture<Long>> cfs = Collections
         .synchronizedList( new LinkedList<>());
 
-    static AtomicReference<CompletableFuture<Long>> currentCF =
-        new AtomicReference<>();
+    static CompletableFuture<Long> currentCF;
 
     static class EchoWithPingHandler extends Http2EchoHandler {
+        private final Object lock = new Object();
+
         @Override
         public void handle(Http2TestExchange exchange) throws IOException {
-            // ensure only one ping active at a time.
-            currentCF.getAndUpdate((cf) -> {
-                if (cf  == null || cf.isDone()) {
+            // for now only one ping active at a time. don't want to saturate
+            synchronized(lock) {
+                CompletableFuture<Long> cf = currentCF;
+                if (cf == null || cf.isDone()) {
                     cf = exchange.sendPing();
                     assert cf != null;
                     cfs.add(cf);
+                    currentCF = cf;
                 }
-                return cf;
-            });
+            }
             super.handle(exchange);
         }
     }
@@ -122,7 +124,6 @@
             streamTest(false);
             streamTest(true);
             paramsTest();
-            Thread.sleep(1000 * 4);
             CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0])).join();
             synchronized (cfs) {
                 for (CompletableFuture<Long> cf : cfs) {
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java	Wed Nov 29 16:34:52 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java	Wed Nov 29 16:59:38 2017 +0000
@@ -38,7 +38,7 @@
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.function.Consumer;
 import jdk.incubator.http.internal.common.HttpHeadersImpl;
 import jdk.incubator.http.internal.frame.*;
@@ -73,9 +73,7 @@
     final boolean secure;
     volatile boolean stopping;
     volatile int nextPushStreamId = 2;
-    volatile byte[] pingData;
-    volatile CompletableFuture<Long> pingResponseHandler;
-    final AtomicLong pingStamp; // milliseconds at time PING was sent
+    ConcurrentLinkedQueue<PingRequest> pings = new ConcurrentLinkedQueue<>();
 
     final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
     final static byte[] EMPTY_BARRAY = new byte[0];
@@ -87,6 +85,35 @@
         Sentinel() { super(-1,-1);}
     }
 
+    class PingRequest {
+        final byte[] pingData;
+        final long pingStamp;
+        final CompletableFuture<Long> response;
+
+        PingRequest() {
+            pingData = new byte[8];
+            random.nextBytes(pingData);
+            pingStamp = System.currentTimeMillis();
+            response = new CompletableFuture<>();
+        }
+
+        PingFrame frame() {
+            return new PingFrame(0, pingData);
+        }
+
+        CompletableFuture<Long> response() {
+            return response;
+        }
+
+        void success() {
+            response.complete(System.currentTimeMillis() - pingStamp);
+        }
+
+        void fail(Throwable t) {
+            response.completeExceptionally(t);
+        }
+    }
+
     static Sentinel sentinel;
 
     Http2TestServerConnection(Http2TestServer server,
@@ -110,7 +137,6 @@
         this.exec = server.exec;
         this.secure = server.secure;
         this.pushStreams = new HashSet<>();
-        this.pingStamp = new AtomicLong();
         is = new BufferedInputStream(socket.getInputStream());
         os = new BufferedOutputStream(socket.getOutputStream());
     }
@@ -120,24 +146,24 @@
      * CF when the PING ack is received. The CF is given
      * an integer, whose value is the number of milliseconds
      * between PING and ACK.
-     *
-     * Only one PING is allowed to be outstanding at any time
      */
     CompletableFuture<Long> sendPing() {
-        this.pingResponseHandler = new CompletableFuture<>();
+        PingRequest ping = null;
         try {
-            if (pingData != null) {
-                throw new IllegalStateException("PING already outstanding");
-            }
-            pingData = new byte[8];
-            random.nextBytes(pingData);
-            pingStamp.set(System.currentTimeMillis());
-            PingFrame ping = new PingFrame(0, pingData);
-            outputQ.put(ping);
+            ping = new PingRequest();
+            pings.add(ping);
+            outputQ.put(ping.frame());
         } catch (Throwable t) {
-            pingResponseHandler.completeExceptionally(t);
+            ping.fail(t);
         }
-        return pingResponseHandler;
+        return ping.response();
+    }
+
+    /**
+     * Returns the first PingRequest from Queue
+     */
+    private PingRequest getNextRequest() {
+        return pings.poll();
     }
 
     /**
@@ -152,17 +178,16 @@
         }
         if (ping.getFlag(PingFrame.ACK)) {
             // did we send a Ping?
-            if (pingData == null) {
-                System.err.println("Invalid ping received");
+            PingRequest request = getNextRequest();
+            if (request == null) {
+                System.err.println("Invalid ping ACK received");
                 close();
                 return;
-            } else if (!Arrays.equals(pingData, ping.getData())) {
-                pingResponseHandler.completeExceptionally(new RuntimeException("Wrong ping data in ACK"));
+            } else if (!Arrays.equals(request.pingData, ping.getData())) {
+                request.fail(new RuntimeException("Wrong ping data in ACK"));
             } else {
-                pingResponseHandler.complete(System.currentTimeMillis() - pingStamp.getAndSet(0));
+                request.success();
             }
-            pingResponseHandler = null;
-            pingData = null;
         } else {
             // client originated PING. Just send it back with ACK set
             ping.setFlag(PingFrame.ACK);