http-client-branch: added ping support to test http2 server and some test pings from server to client in the BasicTest http-client-branch
authormichaelm
Fri, 24 Nov 2017 17:34:57 +0000
branchhttp-client-branch
changeset 55868 5899aa5e1837
parent 55867 1b8734a5c696
child 55869 54f89370f26a
http-client-branch: added ping support to test http2 server and some test pings from server to client in the BasicTest
test/jdk/java/net/httpclient/http2/BasicTest.java
test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java
test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java
test/jdk/java/net/httpclient/http2/server/Http2TestServer.java
test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java
--- a/test/jdk/java/net/httpclient/http2/BasicTest.java	Fri Nov 24 17:51:51 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/BasicTest.java	Fri Nov 24 17:34:57 2017 +0000
@@ -33,12 +33,14 @@
  * @run testng/othervm -Djdk.httpclient.HttpClient.log=ssl,requests,responses,errors BasicTest
  */
 
+import java.io.IOException;
 import java.net.*;
 import jdk.incubator.http.*;
 import static jdk.incubator.http.HttpClient.Version.HTTP_2;
 import javax.net.ssl.*;
 import java.nio.file.*;
 import java.util.concurrent.*;
+import java.util.LinkedList;
 import jdk.testlibrary.SimpleSSLContext;
 import static jdk.incubator.http.HttpRequest.BodyPublisher.fromFile;
 import static jdk.incubator.http.HttpRequest.BodyPublisher.fromString;
@@ -56,7 +58,7 @@
     static ExecutorService serverExec;
     static SSLContext sslContext;
 
-    static String httpURIString, httpsURIString;
+    static String pingURIString, httpURIString, httpsURIString;
 
     static void initialize() throws Exception {
         try {
@@ -65,6 +67,7 @@
             client = getClient();
             httpServer = new Http2TestServer(false, 0, serverExec, sslContext);
             httpServer.addHandler(new Http2EchoHandler(), "/");
+            httpServer.addHandler(new EchoWithPingHandler(), "/ping");
             httpPort = httpServer.getAddress().getPort();
 
             httpsServer = new Http2TestServer(true, 0, serverExec, sslContext);
@@ -72,6 +75,7 @@
 
             httpsPort = httpsServer.getAddress().getPort();
             httpURIString = "http://127.0.0.1:" + httpPort + "/foo/";
+            pingURIString = "http://127.0.0.1:" + httpPort + "/ping/";
             httpsURIString = "https://127.0.0.1:" + httpsPort + "/bar/";
 
             httpServer.start();
@@ -83,16 +87,33 @@
         }
     }
 
+    static LinkedList<CompletableFuture<Long>> cfs = new LinkedList<>();
+
+    static class EchoWithPingHandler extends Http2EchoHandler {
+        @Override
+        public void handle(Http2TestExchange exchange) throws IOException {
+            CompletableFuture<Long> cf = new CompletableFuture<>();
+            cfs.add(cf);
+            exchange.sendPing(cf);
+            super.handle(exchange);
+        }
+    }
+
     @Test
     public static void test() throws Exception {
         try {
             initialize();
-            simpleTest(false);
-            simpleTest(true);
+            simpleTest(false, false);
+            simpleTest(false, true);
+            simpleTest(true, false);
             streamTest(false);
             streamTest(true);
             paramsTest();
             Thread.sleep(1000 * 4);
+            CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0])).join();
+            for (CompletableFuture<Long> cf : cfs) {
+                System.out.printf("Ping ack received in %d millisec\n", cf.get());
+            }
         } catch (Throwable tt) {
             System.err.println("tt caught");
             tt.printStackTrace();
@@ -118,10 +139,14 @@
     }
 
     static URI getURI(boolean secure) {
+        return getURI(secure, false);
+    }
+
+    static URI getURI(boolean secure, boolean ping) {
         if (secure)
             return URI.create(httpsURIString);
         else
-            return URI.create(httpURIString);
+            return URI.create(ping ? pingURIString: httpURIString);
     }
 
     static void checkStatus(int expected, int found) throws Exception {
@@ -202,8 +227,8 @@
         System.err.println("paramsTest: DONE");
     }
 
-    static void simpleTest(boolean secure) throws Exception {
-        URI uri = getURI(secure);
+    static void simpleTest(boolean secure, boolean ping) throws Exception {
+        URI uri = getURI(secure, ping);
         System.err.println("Request to " + uri);
 
         // Do a simple warmup request
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java	Fri Nov 24 17:51:51 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java	Fri Nov 24 17:34:57 2017 +0000
@@ -26,6 +26,8 @@
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 import javax.net.ssl.SSLSession;
 import jdk.incubator.http.internal.common.HttpHeadersImpl;
 
@@ -60,4 +62,11 @@
     boolean serverPushAllowed();
 
     void serverPush(URI uri, HttpHeadersImpl headers, InputStream content);
+
+    /**
+     * Send a PING on this exchanges connection, and complete the given CF
+     * with the number of milliseconds it took to get a valid response.
+     * It may also complete exceptionally
+     */
+    void sendPing(CompletableFuture<Long> cf) throws IOException;
 }
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java	Fri Nov 24 17:51:51 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java	Fri Nov 24 17:34:57 2017 +0000
@@ -26,6 +26,8 @@
 import java.io.IOException;
 import java.net.URI;
 import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 import javax.net.ssl.SSLSession;
 import jdk.incubator.http.internal.common.HttpHeadersImpl;
 import jdk.incubator.http.internal.frame.HeaderFrame;
@@ -77,6 +79,11 @@
     }
 
     @Override
+    public void sendPing(CompletableFuture<Long> cf) throws IOException {
+        conn.sendPing(cf);
+    }
+
+    @Override
     public HttpHeadersImpl getResponseHeaders() {
         return rspheaders;
     }
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java	Fri Nov 24 17:51:51 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java	Fri Nov 24 17:34:57 2017 +0000
@@ -28,6 +28,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import javax.net.ServerSocketFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java	Fri Nov 24 17:51:51 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java	Fri Nov 24 17:34:57 2017 +0000
@@ -36,20 +36,12 @@
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import jdk.incubator.http.internal.common.HttpHeadersImpl;
-import jdk.incubator.http.internal.frame.DataFrame;
-import jdk.incubator.http.internal.frame.FramesDecoder;
-import jdk.incubator.http.internal.frame.FramesEncoder;
-import jdk.incubator.http.internal.frame.GoAwayFrame;
-import jdk.incubator.http.internal.frame.HeaderFrame;
-import jdk.incubator.http.internal.frame.HeadersFrame;
-import jdk.incubator.http.internal.frame.Http2Frame;
-import jdk.incubator.http.internal.frame.PushPromiseFrame;
-import jdk.incubator.http.internal.frame.ResetFrame;
-import jdk.incubator.http.internal.frame.SettingsFrame;
-import jdk.incubator.http.internal.frame.WindowUpdateFrame;
+import jdk.incubator.http.internal.frame.*;
 import jdk.incubator.http.internal.hpack.Decoder;
 import jdk.incubator.http.internal.hpack.DecodingCallback;
 import jdk.incubator.http.internal.hpack.Encoder;
@@ -81,9 +73,13 @@
     final boolean secure;
     volatile boolean stopping;
     volatile int nextPushStreamId = 2;
+    volatile byte[] pingData;
+    volatile CompletableFuture<Long> pingResponseHandler;
+    final AtomicLong pingStamp; // milliseconds at time PING was sent
 
     final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
     final static byte[] EMPTY_BARRAY = new byte[0];
+    final Random random;
 
     final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
 
@@ -107,16 +103,68 @@
         this.streams = Collections.synchronizedMap(new HashMap<>());
         this.outStreams = Collections.synchronizedMap(new HashMap<>());
         this.outputQ = new Queue<>(sentinel);
+        this.random = new Random();
         this.socket = socket;
         this.socket.setTcpNoDelay(true);
         this.serverSettings = SettingsFrame.getDefaultSettings();
         this.exec = server.exec;
         this.secure = server.secure;
         this.pushStreams = new HashSet<>();
+        this.pingStamp = new AtomicLong();
         is = new BufferedInputStream(socket.getInputStream());
         os = new BufferedOutputStream(socket.getOutputStream());
     }
 
+    /**
+     * Sends a PING frame on this connection, and invokes the given
+     * handler when the PING ack is received. The handler is given
+     * an integer, whose value if >= 0, is the number of milliseconds
+     * between PING and ACK, If < 0 signifies an error occured.
+     *
+     * Only one PING is allowed to be outstanding at any time
+     */
+    void sendPing(CompletableFuture<Long> cf) throws IOException {
+        if (pingData != null) {
+            throw new IllegalStateException("PING already outstanding");
+        }
+        pingData = new byte[8];
+        random.nextBytes(pingData);
+        this.pingResponseHandler = cf;
+        pingStamp.set(System.currentTimeMillis());
+        PingFrame ping = new PingFrame(0, pingData);
+        outputQ.put(ping);
+    }
+
+    /**
+     * Handles incoming Ping, which could be an ack
+     * or a client originated Ping
+     */
+    void handlePing(PingFrame ping) throws IOException {
+        if (ping.streamid() != 0) {
+            System.err.println("Invalid ping received");
+            close();
+            return;
+        }
+        if (ping.getFlag(PingFrame.ACK)) {
+            // did we send a Ping?
+            if (pingData == null) {
+                System.err.println("Invalid ping received");
+                close();
+                return;
+            } else if (!Arrays.equals(pingData, ping.getData())) {
+                pingResponseHandler.completeExceptionally(new RuntimeException("Wrong ping data in ACK"));
+            } else {
+                pingResponseHandler.complete(System.currentTimeMillis() - pingStamp.getAndSet(0));
+            }
+            pingResponseHandler = null;
+            pingData = null;
+        } else {
+            // client originated PING. Just send it back with ACK set
+            ping.setFlag(PingFrame.ACK);
+            outputQ.put(ping);
+        }
+    }
+
     private static boolean compareIPAddrs(InetAddress addr1, String host) {
         try {
             InetAddress addr2 = InetAddress.getByName(host);
@@ -292,8 +340,10 @@
         } else if (f instanceof GoAwayFrame) {
             System.err.println("Closing: "+ f.toString());
             close();
-        }
-        throw new UnsupportedOperationException("Not supported yet: " + f.toString());
+        } else if (f instanceof PingFrame) {
+            handlePing((PingFrame)f);
+        } else
+            throw new UnsupportedOperationException("Not supported yet: " + f.toString());
     }
 
     void sendWindowUpdates(int len, int streamid) throws IOException {