http-client-branch: added ping support to test http2 server and some test pings from server to client in the BasicTest
--- a/test/jdk/java/net/httpclient/http2/BasicTest.java Fri Nov 24 17:51:51 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/BasicTest.java Fri Nov 24 17:34:57 2017 +0000
@@ -33,12 +33,14 @@
* @run testng/othervm -Djdk.httpclient.HttpClient.log=ssl,requests,responses,errors BasicTest
*/
+import java.io.IOException;
import java.net.*;
import jdk.incubator.http.*;
import static jdk.incubator.http.HttpClient.Version.HTTP_2;
import javax.net.ssl.*;
import java.nio.file.*;
import java.util.concurrent.*;
+import java.util.LinkedList;
import jdk.testlibrary.SimpleSSLContext;
import static jdk.incubator.http.HttpRequest.BodyPublisher.fromFile;
import static jdk.incubator.http.HttpRequest.BodyPublisher.fromString;
@@ -56,7 +58,7 @@
static ExecutorService serverExec;
static SSLContext sslContext;
- static String httpURIString, httpsURIString;
+ static String pingURIString, httpURIString, httpsURIString;
static void initialize() throws Exception {
try {
@@ -65,6 +67,7 @@
client = getClient();
httpServer = new Http2TestServer(false, 0, serverExec, sslContext);
httpServer.addHandler(new Http2EchoHandler(), "/");
+ httpServer.addHandler(new EchoWithPingHandler(), "/ping");
httpPort = httpServer.getAddress().getPort();
httpsServer = new Http2TestServer(true, 0, serverExec, sslContext);
@@ -72,6 +75,7 @@
httpsPort = httpsServer.getAddress().getPort();
httpURIString = "http://127.0.0.1:" + httpPort + "/foo/";
+ pingURIString = "http://127.0.0.1:" + httpPort + "/ping/";
httpsURIString = "https://127.0.0.1:" + httpsPort + "/bar/";
httpServer.start();
@@ -83,16 +87,33 @@
}
}
+ static LinkedList<CompletableFuture<Long>> cfs = new LinkedList<>();
+
+ static class EchoWithPingHandler extends Http2EchoHandler {
+ @Override
+ public void handle(Http2TestExchange exchange) throws IOException {
+ CompletableFuture<Long> cf = new CompletableFuture<>();
+ cfs.add(cf);
+ exchange.sendPing(cf);
+ super.handle(exchange);
+ }
+ }
+
@Test
public static void test() throws Exception {
try {
initialize();
- simpleTest(false);
- simpleTest(true);
+ simpleTest(false, false);
+ simpleTest(false, true);
+ simpleTest(true, false);
streamTest(false);
streamTest(true);
paramsTest();
Thread.sleep(1000 * 4);
+ CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0])).join();
+ for (CompletableFuture<Long> cf : cfs) {
+ System.out.printf("Ping ack received in %d millisec\n", cf.get());
+ }
} catch (Throwable tt) {
System.err.println("tt caught");
tt.printStackTrace();
@@ -118,10 +139,14 @@
}
static URI getURI(boolean secure) {
+ return getURI(secure, false);
+ }
+
+ static URI getURI(boolean secure, boolean ping) {
if (secure)
return URI.create(httpsURIString);
else
- return URI.create(httpURIString);
+ return URI.create(ping ? pingURIString: httpURIString);
}
static void checkStatus(int expected, int found) throws Exception {
@@ -202,8 +227,8 @@
System.err.println("paramsTest: DONE");
}
- static void simpleTest(boolean secure) throws Exception {
- URI uri = getURI(secure);
+ static void simpleTest(boolean secure, boolean ping) throws Exception {
+ URI uri = getURI(secure, ping);
System.err.println("Request to " + uri);
// Do a simple warmup request
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java Fri Nov 24 17:51:51 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestExchange.java Fri Nov 24 17:34:57 2017 +0000
@@ -26,6 +26,8 @@
import java.io.OutputStream;
import java.net.URI;
import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import javax.net.ssl.SSLSession;
import jdk.incubator.http.internal.common.HttpHeadersImpl;
@@ -60,4 +62,11 @@
boolean serverPushAllowed();
void serverPush(URI uri, HttpHeadersImpl headers, InputStream content);
+
+ /**
+ * Send a PING on this exchanges connection, and complete the given CF
+ * with the number of milliseconds it took to get a valid response.
+ * It may also complete exceptionally
+ */
+ void sendPing(CompletableFuture<Long> cf) throws IOException;
}
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java Fri Nov 24 17:51:51 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestExchangeImpl.java Fri Nov 24 17:34:57 2017 +0000
@@ -26,6 +26,8 @@
import java.io.IOException;
import java.net.URI;
import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import javax.net.ssl.SSLSession;
import jdk.incubator.http.internal.common.HttpHeadersImpl;
import jdk.incubator.http.internal.frame.HeaderFrame;
@@ -77,6 +79,11 @@
}
@Override
+ public void sendPing(CompletableFuture<Long> cf) throws IOException {
+ conn.sendPing(cf);
+ }
+
+ @Override
public HttpHeadersImpl getResponseHeaders() {
return rspheaders;
}
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Fri Nov 24 17:51:51 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Fri Nov 24 17:34:57 2017 +0000
@@ -28,6 +28,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Fri Nov 24 17:51:51 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Fri Nov 24 17:34:57 2017 +0000
@@ -36,20 +36,12 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import jdk.incubator.http.internal.common.HttpHeadersImpl;
-import jdk.incubator.http.internal.frame.DataFrame;
-import jdk.incubator.http.internal.frame.FramesDecoder;
-import jdk.incubator.http.internal.frame.FramesEncoder;
-import jdk.incubator.http.internal.frame.GoAwayFrame;
-import jdk.incubator.http.internal.frame.HeaderFrame;
-import jdk.incubator.http.internal.frame.HeadersFrame;
-import jdk.incubator.http.internal.frame.Http2Frame;
-import jdk.incubator.http.internal.frame.PushPromiseFrame;
-import jdk.incubator.http.internal.frame.ResetFrame;
-import jdk.incubator.http.internal.frame.SettingsFrame;
-import jdk.incubator.http.internal.frame.WindowUpdateFrame;
+import jdk.incubator.http.internal.frame.*;
import jdk.incubator.http.internal.hpack.Decoder;
import jdk.incubator.http.internal.hpack.DecodingCallback;
import jdk.incubator.http.internal.hpack.Encoder;
@@ -81,9 +73,13 @@
final boolean secure;
volatile boolean stopping;
volatile int nextPushStreamId = 2;
+ volatile byte[] pingData;
+ volatile CompletableFuture<Long> pingResponseHandler;
+ final AtomicLong pingStamp; // milliseconds at time PING was sent
final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
final static byte[] EMPTY_BARRAY = new byte[0];
+ final Random random;
final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
@@ -107,16 +103,68 @@
this.streams = Collections.synchronizedMap(new HashMap<>());
this.outStreams = Collections.synchronizedMap(new HashMap<>());
this.outputQ = new Queue<>(sentinel);
+ this.random = new Random();
this.socket = socket;
this.socket.setTcpNoDelay(true);
this.serverSettings = SettingsFrame.getDefaultSettings();
this.exec = server.exec;
this.secure = server.secure;
this.pushStreams = new HashSet<>();
+ this.pingStamp = new AtomicLong();
is = new BufferedInputStream(socket.getInputStream());
os = new BufferedOutputStream(socket.getOutputStream());
}
+ /**
+ * Sends a PING frame on this connection, and invokes the given
+ * handler when the PING ack is received. The handler is given
+ * an integer, whose value if >= 0, is the number of milliseconds
+ * between PING and ACK, If < 0 signifies an error occured.
+ *
+ * Only one PING is allowed to be outstanding at any time
+ */
+ void sendPing(CompletableFuture<Long> cf) throws IOException {
+ if (pingData != null) {
+ throw new IllegalStateException("PING already outstanding");
+ }
+ pingData = new byte[8];
+ random.nextBytes(pingData);
+ this.pingResponseHandler = cf;
+ pingStamp.set(System.currentTimeMillis());
+ PingFrame ping = new PingFrame(0, pingData);
+ outputQ.put(ping);
+ }
+
+ /**
+ * Handles incoming Ping, which could be an ack
+ * or a client originated Ping
+ */
+ void handlePing(PingFrame ping) throws IOException {
+ if (ping.streamid() != 0) {
+ System.err.println("Invalid ping received");
+ close();
+ return;
+ }
+ if (ping.getFlag(PingFrame.ACK)) {
+ // did we send a Ping?
+ if (pingData == null) {
+ System.err.println("Invalid ping received");
+ close();
+ return;
+ } else if (!Arrays.equals(pingData, ping.getData())) {
+ pingResponseHandler.completeExceptionally(new RuntimeException("Wrong ping data in ACK"));
+ } else {
+ pingResponseHandler.complete(System.currentTimeMillis() - pingStamp.getAndSet(0));
+ }
+ pingResponseHandler = null;
+ pingData = null;
+ } else {
+ // client originated PING. Just send it back with ACK set
+ ping.setFlag(PingFrame.ACK);
+ outputQ.put(ping);
+ }
+ }
+
private static boolean compareIPAddrs(InetAddress addr1, String host) {
try {
InetAddress addr2 = InetAddress.getByName(host);
@@ -292,8 +340,10 @@
} else if (f instanceof GoAwayFrame) {
System.err.println("Closing: "+ f.toString());
close();
- }
- throw new UnsupportedOperationException("Not supported yet: " + f.toString());
+ } else if (f instanceof PingFrame) {
+ handlePing((PingFrame)f);
+ } else
+ throw new UnsupportedOperationException("Not supported yet: " + f.toString());
}
void sendWindowUpdates(int len, int streamid) throws IOException {