# HG changeset patch # User michaelm # Date 1511544897 0 # Node ID 5899aa5e18371ce70ec2fba51892f394351e2f06 # Parent 1b8734a5c6960bdb61fbaf1186af6a32759777d8 http-client-branch: added ping support to test http2 server and some test pings from server to client in the BasicTest diff -r 1b8734a5c696 -r 5899aa5e1837 test/jdk/java/net/httpclient/http2/BasicTest.java --- a/test/jdk/java/net/httpclient/http2/BasicTest.java Fri Nov 24 17:51:51 2017 +0300 +++ b/test/jdk/java/net/httpclient/http2/BasicTest.java Fri Nov 24 17:34:57 2017 +0000 @@ -33,12 +33,14 @@ * @run testng/othervm -Djdk.httpclient.HttpClient.log=ssl,requests,responses,errors BasicTest */ +import java.io.IOException; import java.net.*; import jdk.incubator.http.*; import static jdk.incubator.http.HttpClient.Version.HTTP_2; import javax.net.ssl.*; import java.nio.file.*; import java.util.concurrent.*; +import java.util.LinkedList; import jdk.testlibrary.SimpleSSLContext; import static jdk.incubator.http.HttpRequest.BodyPublisher.fromFile; import static jdk.incubator.http.HttpRequest.BodyPublisher.fromString; @@ -56,7 +58,7 @@ static ExecutorService serverExec; static SSLContext sslContext; - static String httpURIString, httpsURIString; + static String pingURIString, httpURIString, httpsURIString; static void initialize() throws Exception { try { @@ -65,6 +67,7 @@ client = getClient(); httpServer = new Http2TestServer(false, 0, serverExec, sslContext); httpServer.addHandler(new Http2EchoHandler(), "/"); + httpServer.addHandler(new EchoWithPingHandler(), "/ping"); httpPort = httpServer.getAddress().getPort(); httpsServer = new Http2TestServer(true, 0, serverExec, sslContext); @@ -72,6 +75,7 @@ httpsPort = httpsServer.getAddress().getPort(); httpURIString = "http://127.0.0.1:" + httpPort + "/foo/"; + pingURIString = "http://127.0.0.1:" + httpPort + "/ping/"; httpsURIString = "https://127.0.0.1:" + httpsPort + "/bar/"; httpServer.start(); @@ -83,16 +87,33 @@ } } + static LinkedList> cfs = new LinkedList<>(); + + static class EchoWithPingHandler extends Http2EchoHandler { + @Override + public void handle(Http2TestExchange exchange) throws IOException { + CompletableFuture cf = new CompletableFuture<>(); + cfs.add(cf); + exchange.sendPing(cf); + super.handle(exchange); + } + } + @Test public static void test() throws Exception { try { initialize(); - simpleTest(false); - simpleTest(true); + simpleTest(false, false); + simpleTest(false, true); + simpleTest(true, false); streamTest(false); streamTest(true); paramsTest(); Thread.sleep(1000 * 4); + CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0])).join(); + for (CompletableFuture cf : cfs) { + System.out.printf("Ping ack received in %d millisec\n", cf.get()); + } } catch (Throwable tt) { System.err.println("tt caught"); tt.printStackTrace(); @@ -118,10 +139,14 @@ } static URI getURI(boolean secure) { + return getURI(secure, false); + } + + static URI getURI(boolean secure, boolean ping) { if (secure) return URI.create(httpsURIString); else - return URI.create(httpURIString); + return URI.create(ping ? pingURIString: httpURIString); } static void checkStatus(int expected, int found) throws Exception { @@ -202,8 +227,8 @@ System.err.println("paramsTest: DONE"); } - static void simpleTest(boolean secure) throws Exception { - URI uri = getURI(secure); + static void simpleTest(boolean secure, boolean ping) throws Exception { + URI uri = getURI(secure, ping); System.err.println("Request to " + uri); // Do a simple warmup request diff -r 1b8734a5c696 -r 5899aa5e1837 test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java --- a/test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java Fri Nov 24 17:51:51 2017 +0300 +++ b/test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java Fri Nov 24 17:34:57 2017 +0000 @@ -26,6 +26,8 @@ import java.io.OutputStream; import java.net.URI; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import javax.net.ssl.SSLSession; import jdk.incubator.http.internal.common.HttpHeadersImpl; @@ -60,4 +62,11 @@ boolean serverPushAllowed(); void serverPush(URI uri, HttpHeadersImpl headers, InputStream content); + + /** + * Send a PING on this exchanges connection, and complete the given CF + * with the number of milliseconds it took to get a valid response. + * It may also complete exceptionally + */ + void sendPing(CompletableFuture cf) throws IOException; } diff -r 1b8734a5c696 -r 5899aa5e1837 test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java --- a/test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java Fri Nov 24 17:51:51 2017 +0300 +++ b/test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java Fri Nov 24 17:34:57 2017 +0000 @@ -26,6 +26,8 @@ import java.io.IOException; import java.net.URI; import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import javax.net.ssl.SSLSession; import jdk.incubator.http.internal.common.HttpHeadersImpl; import jdk.incubator.http.internal.frame.HeaderFrame; @@ -77,6 +79,11 @@ } @Override + public void sendPing(CompletableFuture cf) throws IOException { + conn.sendPing(cf); + } + + @Override public HttpHeadersImpl getResponseHeaders() { return rspheaders; } diff -r 1b8734a5c696 -r 5899aa5e1837 test/jdk/java/net/httpclient/http2/server/Http2TestServer.java --- a/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Fri Nov 24 17:51:51 2017 +0300 +++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Fri Nov 24 17:34:57 2017 +0000 @@ -28,6 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import javax.net.ServerSocketFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLParameters; diff -r 1b8734a5c696 -r 5899aa5e1837 test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java --- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Fri Nov 24 17:51:51 2017 +0300 +++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Fri Nov 24 17:34:57 2017 +0000 @@ -36,20 +36,12 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import jdk.incubator.http.internal.common.HttpHeadersImpl; -import jdk.incubator.http.internal.frame.DataFrame; -import jdk.incubator.http.internal.frame.FramesDecoder; -import jdk.incubator.http.internal.frame.FramesEncoder; -import jdk.incubator.http.internal.frame.GoAwayFrame; -import jdk.incubator.http.internal.frame.HeaderFrame; -import jdk.incubator.http.internal.frame.HeadersFrame; -import jdk.incubator.http.internal.frame.Http2Frame; -import jdk.incubator.http.internal.frame.PushPromiseFrame; -import jdk.incubator.http.internal.frame.ResetFrame; -import jdk.incubator.http.internal.frame.SettingsFrame; -import jdk.incubator.http.internal.frame.WindowUpdateFrame; +import jdk.incubator.http.internal.frame.*; import jdk.incubator.http.internal.hpack.Decoder; import jdk.incubator.http.internal.hpack.DecodingCallback; import jdk.incubator.http.internal.hpack.Encoder; @@ -81,9 +73,13 @@ final boolean secure; volatile boolean stopping; volatile int nextPushStreamId = 2; + volatile byte[] pingData; + volatile CompletableFuture pingResponseHandler; + final AtomicLong pingStamp; // milliseconds at time PING was sent final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); final static byte[] EMPTY_BARRAY = new byte[0]; + final Random random; final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(); @@ -107,16 +103,68 @@ this.streams = Collections.synchronizedMap(new HashMap<>()); this.outStreams = Collections.synchronizedMap(new HashMap<>()); this.outputQ = new Queue<>(sentinel); + this.random = new Random(); this.socket = socket; this.socket.setTcpNoDelay(true); this.serverSettings = SettingsFrame.getDefaultSettings(); this.exec = server.exec; this.secure = server.secure; this.pushStreams = new HashSet<>(); + this.pingStamp = new AtomicLong(); is = new BufferedInputStream(socket.getInputStream()); os = new BufferedOutputStream(socket.getOutputStream()); } + /** + * Sends a PING frame on this connection, and invokes the given + * handler when the PING ack is received. The handler is given + * an integer, whose value if >= 0, is the number of milliseconds + * between PING and ACK, If < 0 signifies an error occured. + * + * Only one PING is allowed to be outstanding at any time + */ + void sendPing(CompletableFuture cf) throws IOException { + if (pingData != null) { + throw new IllegalStateException("PING already outstanding"); + } + pingData = new byte[8]; + random.nextBytes(pingData); + this.pingResponseHandler = cf; + pingStamp.set(System.currentTimeMillis()); + PingFrame ping = new PingFrame(0, pingData); + outputQ.put(ping); + } + + /** + * Handles incoming Ping, which could be an ack + * or a client originated Ping + */ + void handlePing(PingFrame ping) throws IOException { + if (ping.streamid() != 0) { + System.err.println("Invalid ping received"); + close(); + return; + } + if (ping.getFlag(PingFrame.ACK)) { + // did we send a Ping? + if (pingData == null) { + System.err.println("Invalid ping received"); + close(); + return; + } else if (!Arrays.equals(pingData, ping.getData())) { + pingResponseHandler.completeExceptionally(new RuntimeException("Wrong ping data in ACK")); + } else { + pingResponseHandler.complete(System.currentTimeMillis() - pingStamp.getAndSet(0)); + } + pingResponseHandler = null; + pingData = null; + } else { + // client originated PING. Just send it back with ACK set + ping.setFlag(PingFrame.ACK); + outputQ.put(ping); + } + } + private static boolean compareIPAddrs(InetAddress addr1, String host) { try { InetAddress addr2 = InetAddress.getByName(host); @@ -292,8 +340,10 @@ } else if (f instanceof GoAwayFrame) { System.err.println("Closing: "+ f.toString()); close(); - } - throw new UnsupportedOperationException("Not supported yet: " + f.toString()); + } else if (f instanceof PingFrame) { + handlePing((PingFrame)f); + } else + throw new UnsupportedOperationException("Not supported yet: " + f.toString()); } void sendWindowUpdates(int len, int streamid) throws IOException {