# HG changeset patch # User michaelm # Date 1527090253 -3600 # Node ID 4c502e3991bf86f28af7b4f9a0874526cd7d77a2 # Parent c8fe5ffdfe98e26b31195a86d1ee84fb3d212b3e 8196389: Should HttpClient support SETTINGS_MAX_CONCURRENT_STREAMS from the server Reviewed-by: chegar, dfuchs diff -r c8fe5ffdfe98 -r 4c502e3991bf src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java Fri May 18 15:23:56 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java Wed May 23 16:44:13 2018 +0100 @@ -25,6 +25,9 @@ package jdk.internal.net.http; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.URI; import java.util.Base64; @@ -95,15 +98,20 @@ synchronized (this) { Http2Connection connection = connections.get(key); if (connection != null) { - if (connection.closed || !connection.reserveStream(true)) { - if (debug.on()) - debug.log("removing found closed or closing connection: %s", connection); - deleteConnection(connection); - } else { - // fast path if connection already exists - if (debug.on()) - debug.log("found connection in the pool: %s", connection); - return MinimalFuture.completedFuture(connection); + try { + if (connection.closed || !connection.reserveStream(true)) { + if (debug.on()) + debug.log("removing found closed or closing connection: %s", connection); + deleteConnection(connection); + } else { + // fast path if connection already exists + if (debug.on()) + debug.log("found connection in the pool: %s", connection); + return MinimalFuture.completedFuture(connection); + } + } catch (IOException e) { + // thrown by connection.reserveStream() + return MinimalFuture.failedFuture(e); } } @@ -119,6 +127,11 @@ .whenComplete((conn, t) -> { synchronized (Http2ClientImpl.this) { if (conn != null) { + try { + conn.reserveStream(true); + } catch (IOException e) { + throw new UncheckedIOException(e); // shouldn't happen + } offerConnection(conn); } else { Throwable cause = Utils.getCompletionCause(t); diff -r c8fe5ffdfe98 -r 4c502e3991bf src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Fri May 18 15:23:56 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Wed May 23 16:44:13 2018 +0100 @@ -259,6 +259,8 @@ // assigning a stream to a connection. private int lastReservedClientStreamid = 1; private int lastReservedServerStreamid = 0; + private int numReservedClientStreams = 0; // count of current streams + private int numReservedServerStreams = 0; // count of current streams private final Encoder hpackOut; private final Decoder hpackIn; final SettingsFrame clientSettings; @@ -311,7 +313,7 @@ /** * Case 1) Create from upgraded HTTP/1.1 connection. - * Is ready to use. Can be SSL. exchange is the Exchange + * Is ready to use. Can't be SSL. exchange is the Exchange * that initiated the connection, whose response will be delivered * on a Stream. */ @@ -325,6 +327,7 @@ client2, 3, // stream 1 is registered during the upgrade keyFor(connection)); + reserveStream(true); Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize()); Stream initialStream = createStream(exchange); @@ -408,7 +411,8 @@ // call these before assigning a request/stream to a connection // if false returned then a new Http2Connection is required // if true, the the stream may be assigned to this connection - synchronized boolean reserveStream(boolean clientInitiated) { + // for server push, if false returned, then the stream should be cancelled + synchronized boolean reserveStream(boolean clientInitiated) throws IOException { if (finalStream) { return false; } @@ -425,6 +429,19 @@ lastReservedClientStreamid+=2; else lastReservedServerStreamid+=2; + + assert numReservedClientStreams >= 0; + assert numReservedServerStreams >= 0; + if (clientInitiated && numReservedClientStreams >= getMaxConcurrentClientStreams()) { + throw new IOException("too many concurrent streams"); + } else if (clientInitiated) { + numReservedClientStreams++; + } + if (!clientInitiated && numReservedServerStreams >= getMaxConcurrentServerStreams()) { + return false; + } else if (!clientInitiated) { + numReservedServerStreams++; + } return true; } @@ -565,6 +582,14 @@ return serverSettings.getParameter(INITIAL_WINDOW_SIZE); } + final int getMaxConcurrentClientStreams() { + return serverSettings.getParameter(MAX_CONCURRENT_STREAMS); + } + + final int getMaxConcurrentServerStreams() { + return clientSettings.getParameter(MAX_CONCURRENT_STREAMS); + } + void close() { Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address()); GoAwayFrame f = new GoAwayFrame(0, @@ -818,8 +843,17 @@ void closeStream(int streamid) { if (debug.on()) debug.log("Closed stream %d", streamid); + boolean isClient = (streamid % 2) == 1; Stream s = streams.remove(streamid); if (s != null) { + synchronized (this) { + if (isClient) + numReservedClientStreams--; + else + numReservedServerStreams--; + } + assert numReservedClientStreams >= 0; + assert numReservedServerStreams >= 0; // decrement the reference count on the HttpClientImpl // to allow the SelectorManager thread to exit if no // other operation is pending and the facade is no @@ -1168,7 +1202,7 @@ private final SequentialScheduler scheduler = SequentialScheduler.synchronizedScheduler(this::processQueue); private final HttpClientImpl client; - + Http2TubeSubscriber(HttpClientImpl client) { this.client = Objects.requireNonNull(client); } diff -r c8fe5ffdfe98 -r 4c502e3991bf src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java Fri May 18 15:23:56 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/frame/SettingsFrame.java Wed May 23 16:44:13 2018 +0100 @@ -110,14 +110,14 @@ return TYPE; } - public int getParameter(int paramID) { + public synchronized int getParameter(int paramID) { if (paramID > MAX_PARAM) { throw new IllegalArgumentException("illegal parameter"); } return parameters[paramID - 1]; } - public SettingsFrame setParameter(int paramID, int value) { + public synchronized SettingsFrame setParameter(int paramID, int value) { if (paramID > MAX_PARAM) { throw new IllegalArgumentException("illegal parameter"); } diff -r c8fe5ffdfe98 -r 4c502e3991bf test/jdk/java/net/httpclient/MaxStreams.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/jdk/java/net/httpclient/MaxStreams.java Wed May 23 16:44:13 2018 +0100 @@ -0,0 +1,207 @@ +/* + * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * @test + * @bug 8196389 + * @summary Should HttpClient support SETTINGS_MAX_CONCURRENT_STREAMS from the server + * + * @modules java.base/sun.net.www.http + * java.net.http/jdk.internal.net.http.common + * java.net.http/jdk.internal.net.http.frame + * java.net.http/jdk.internal.net.http.hpack + * java.logging + * jdk.httpserver + * @library /lib/testlibrary http2/server + * @build Http2TestServer + * @build jdk.testlibrary.SimpleSSLContext + * @run testng/othervm -ea -esa MaxStreams + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.List; +import java.util.LinkedList; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import javax.net.ssl.SSLContext; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandler; +import java.net.http.HttpResponse.BodyHandlers; +import jdk.testlibrary.SimpleSSLContext; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.net.http.HttpResponse.BodyHandlers.discarding; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.fail; + +public class MaxStreams { + + Http2TestServer http2TestServer; // HTTP/2 ( h2c ) + Http2TestServer https2TestServer; // HTTP/2 ( h2 ) + String http2FixedURI; + String https2FixedURI; + volatile CountDownLatch latch; + ExecutorService exec; + + // we send an initial warm up request, then MAX_STREAMS+1 requests + // in parallel. The last of them should hit the limit. + // Then we wait for all the responses and send a further request + // which should succeed. The server should see (and respond to) + // MAX_STREAMS+2 requests per test run. + + static final int MAX_STREAMS = 10; + static final String RESPONSE = "Hello world"; + + @DataProvider(name = "uris") + public Object[][] variants() { + return new Object[][]{ + {http2FixedURI}, + {https2FixedURI}, + {http2FixedURI}, + {https2FixedURI} + }; + } + + + @Test(dataProvider = "uris", timeOut=20000) + void testAsString(String uri) throws Exception { + latch = new CountDownLatch(1); + HttpClient client = HttpClient.newBuilder().build(); + List>> responses = new LinkedList<>(); + + HttpRequest request = HttpRequest.newBuilder(URI.create(uri)) + .version(HttpClient.Version.HTTP_2) + .GET() + .build(); + // send warmup to ensure we only have one Http2Connection + HttpResponse warmup = client.send(request, BodyHandlers.ofString()); + if (warmup.statusCode() != 200 || !warmup.body().equals(RESPONSE)) + throw new RuntimeException(); + + for (int i=0;i[0])).join(); + } catch (Exception ee) { + System.err.println("Expected exception " + ee); + } + int count = 0; + for (CompletableFuture> cf : responses) { + HttpResponse r = null; + try { + r = cf.join(); + if (r.statusCode() != 200 || !r.body().equals(RESPONSE)) + throw new RuntimeException(); + } catch (Throwable t) { + System.err.println("Exception at count = " + count); + System.err.println(t); + t.printStackTrace(); + count++; + } + } + if (count != 1) { + String msg = "Expected 1 failure. Got " + count; + throw new RuntimeException(msg); + } + + // make sure it succeeds now as number of streams == 0 now + HttpResponse warmdown = client.send(request, BodyHandlers.ofString()); + if (warmdown.statusCode() != 200 || !warmdown.body().equals(RESPONSE)) + throw new RuntimeException(); + System.err.println("Test OK"); + } + + @BeforeTest + public void setup() throws Exception { + SSLContext ctx = (new SimpleSSLContext()).get(); + exec = Executors.newCachedThreadPool(); + + InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0); + + Properties props = new Properties(); + props.setProperty("http2server.settings.max_concurrent_streams", Integer.toString(MAX_STREAMS)); + http2TestServer = new Http2TestServer("localhost", false, 0, exec, 10, props, null); + http2TestServer.addHandler(new Http2FixedHandler(), "/http2/fixed"); + http2FixedURI = "http://" + http2TestServer.serverAuthority()+ "/http2/fixed"; + http2TestServer.start(); + + https2TestServer = new Http2TestServer("localhost", true, 0, exec, 10, props, ctx); + https2TestServer.addHandler(new Http2FixedHandler(), "/http2/fixed"); + https2FixedURI = "https://" + https2TestServer.serverAuthority()+ "/http2/fixed"; + https2TestServer.start(); + } + + @AfterTest + public void teardown() throws Exception { + http2TestServer.stop(); + } + + class Http2FixedHandler implements Http2Handler { + volatile AtomicInteger counter = new AtomicInteger(0); + + @Override + public void handle(Http2TestExchange t) throws IOException { + try (InputStream is = t.getRequestBody(); + OutputStream os = t.getResponseBody()) { + + is.readAllBytes(); + int c = counter.getAndIncrement(); + if (c > 0 && c <= MAX_STREAMS) { + // Wait for latch. + try { + // don't send any replies until all requests are sent + System.err.println("latch await"); + latch.await(); + System.err.println("latch resume"); + } catch (InterruptedException ee) {} + } + if (c == MAX_STREAMS + 1) + counter = new AtomicInteger(0); + t.sendResponseHeaders(200, RESPONSE.length()); + os.write(RESPONSE.getBytes()); + } + } + } +} diff -r c8fe5ffdfe98 -r 4c502e3991bf test/jdk/java/net/httpclient/http2/BadHeadersTest.java --- a/test/jdk/java/net/httpclient/http2/BadHeadersTest.java Fri May 18 15:23:56 2018 +0100 +++ b/test/jdk/java/net/httpclient/http2/BadHeadersTest.java Wed May 23 16:44:13 2018 +0100 @@ -192,7 +192,7 @@ Socket socket, Http2TestExchangeSupplier exchangeSupplier) throws IOException { - return new Http2TestServerConnection(http2TestServer, socket, exchangeSupplier) { + return new Http2TestServerConnection(http2TestServer, socket, exchangeSupplier, null) { @Override protected HttpHeadersImpl createNewResponseHeaders() { return new OrderedHttpHeaders(); @@ -210,7 +210,7 @@ Socket socket, Http2TestExchangeSupplier exchangeSupplier) throws IOException { - return new Http2TestServerConnection(http2TestServer, socket, exchangeSupplier) { + return new Http2TestServerConnection(http2TestServer, socket, exchangeSupplier, null) { @Override protected HttpHeadersImpl createNewResponseHeaders() { return new OrderedHttpHeaders(); diff -r c8fe5ffdfe98 -r 4c502e3991bf test/jdk/java/net/httpclient/http2/server/Http2TestServer.java --- a/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Fri May 18 15:23:56 2018 +0100 +++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Wed May 23 16:44:13 2018 +0100 @@ -53,6 +53,7 @@ final SSLContext sslContext; final String serverName; final HashMap connections; + final Properties properties; private static ThreadFactory defaultThreadFac = (Runnable r) -> { @@ -67,11 +68,11 @@ } public Http2TestServer(String serverName, boolean secure, int port) throws Exception { - this(serverName, secure, port, getDefaultExecutor(), 50, null); + this(serverName, secure, port, getDefaultExecutor(), 50, null, null); } public Http2TestServer(boolean secure, int port) throws Exception { - this(null, secure, port, getDefaultExecutor(), 50, null); + this(null, secure, port, getDefaultExecutor(), 50, null, null); } public InetSocketAddress getAddress() { @@ -85,19 +86,19 @@ public Http2TestServer(boolean secure, SSLContext context) throws Exception { - this(null, secure, 0, null, 50, context); + this(null, secure, 0, null, 50, null, context); } public Http2TestServer(String serverName, boolean secure, SSLContext context) throws Exception { - this(serverName, secure, 0, null, 50, context); + this(serverName, secure, 0, null, 50, null, context); } public Http2TestServer(boolean secure, int port, ExecutorService exec, SSLContext context) throws Exception { - this(null, secure, port, exec, 50, context); + this(null, secure, port, exec, 50, null, context); } public Http2TestServer(String serverName, @@ -107,7 +108,7 @@ SSLContext context) throws Exception { - this(serverName, secure, port, exec, 50, context); + this(serverName, secure, port, exec, 50, null, context); } /** @@ -120,6 +121,7 @@ * @param port listen port * @param exec executor service (cached thread pool is used if null) * @param backlog the server socket backlog + * @param properties additional configuration properties * @param context the SSLContext used when secure is true */ public Http2TestServer(String serverName, @@ -127,6 +129,7 @@ int port, ExecutorService exec, int backlog, + Properties properties, SSLContext context) throws Exception { @@ -140,6 +143,7 @@ this.exec = exec == null ? getDefaultExecutor() : exec; this.handlers = Collections.synchronizedMap(new HashMap<>()); this.sslContext = context; + this.properties = properties; this.connections = new HashMap<>(); } @@ -264,7 +268,6 @@ socket.close(); } System.err.println("TestServer: start exception: " + e); - //throw e; } } } catch (SecurityException se) { @@ -285,7 +288,7 @@ Socket socket, Http2TestExchangeSupplier exchangeSupplier) throws IOException { - return new Http2TestServerConnection(http2TestServer, socket, exchangeSupplier); + return new Http2TestServerConnection(http2TestServer, socket, exchangeSupplier, properties); } @Override diff -r c8fe5ffdfe98 -r 4c502e3991bf test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java --- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Fri May 18 15:23:56 2018 +0100 +++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Wed May 23 16:44:13 2018 +0100 @@ -71,6 +71,7 @@ final SettingsFrame serverSettings; final ExecutorService exec; final boolean secure; + final Properties properties; volatile boolean stopping; volatile int nextPushStreamId = 2; ConcurrentLinkedQueue pings = new ConcurrentLinkedQueue<>(); @@ -118,7 +119,8 @@ Http2TestServerConnection(Http2TestServer server, Socket socket, - Http2TestExchangeSupplier exchangeSupplier) + Http2TestExchangeSupplier exchangeSupplier, + Properties properties) throws IOException { if (socket instanceof SSLSocket) { @@ -132,8 +134,9 @@ this.outputQ = new Queue<>(sentinel); this.random = new Random(); this.socket = socket; + this.properties = properties; this.socket.setTcpNoDelay(true); - this.serverSettings = SettingsFrame.getDefaultSettings(); + this.serverSettings = getServerSettingProperties(); this.exec = server.exec; this.secure = server.secure; this.pushStreams = new HashSet<>(); @@ -141,6 +144,38 @@ os = new BufferedOutputStream(socket.getOutputStream()); } + static final String propPrefix = "http2server.settings."; + + static final String[][] propIDs = { + {"header_table_size", Integer.toString(SettingsFrame.HEADER_TABLE_SIZE)}, + {"enable_push", Integer.toString(SettingsFrame.ENABLE_PUSH)}, + {"max_concurrent_streams", Integer.toString(SettingsFrame.MAX_CONCURRENT_STREAMS)}, + {"initial_window_size", Integer.toString(SettingsFrame.INITIAL_WINDOW_SIZE)}, + {"max_frame_size", Integer.toString(SettingsFrame.MAX_FRAME_SIZE)}, + {"max_header_list_size", Integer.toString(SettingsFrame.MAX_HEADER_LIST_SIZE)} + }; + + private SettingsFrame getServerSettingProperties() { + SettingsFrame s = SettingsFrame.getDefaultSettings(); + if (properties == null) + return s; + for (int i=0; i