test/jdk/java/net/httpclient/MaxStreams.java
branchhttp-client-branch
changeset 56604 8a808d85fc1a
parent 56598 4c502e3991bf
child 56771 73a6534bce94
equal deleted inserted replaced
56602:f2917c858701 56604:8a808d85fc1a
    51 import java.util.concurrent.CompletableFuture;
    51 import java.util.concurrent.CompletableFuture;
    52 import java.util.concurrent.CompletionException;
    52 import java.util.concurrent.CompletionException;
    53 import java.util.concurrent.CountDownLatch;
    53 import java.util.concurrent.CountDownLatch;
    54 import java.util.concurrent.Executors;
    54 import java.util.concurrent.Executors;
    55 import java.util.concurrent.ExecutorService;
    55 import java.util.concurrent.ExecutorService;
       
    56 import java.util.concurrent.Semaphore;
    56 import javax.net.ssl.SSLContext;
    57 import javax.net.ssl.SSLContext;
    57 import java.net.http.HttpClient;
    58 import java.net.http.HttpClient;
    58 import java.net.http.HttpRequest;
    59 import java.net.http.HttpRequest;
    59 import java.net.http.HttpResponse;
    60 import java.net.http.HttpResponse;
    60 import java.net.http.HttpResponse.BodyHandler;
    61 import java.net.http.HttpResponse.BodyHandler;
    72 
    73 
    73 public class MaxStreams {
    74 public class MaxStreams {
    74 
    75 
    75     Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
    76     Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
    76     Http2TestServer https2TestServer;   // HTTP/2 ( h2 )
    77     Http2TestServer https2TestServer;   // HTTP/2 ( h2 )
       
    78     final Http2FixedHandler handler = new Http2FixedHandler();
    77     String http2FixedURI;
    79     String http2FixedURI;
    78     String https2FixedURI;
    80     String https2FixedURI;
    79     volatile CountDownLatch latch;
    81     volatile CountDownLatch latch;
    80     ExecutorService exec;
    82     ExecutorService exec;
       
    83     final Semaphore canStartTestRun = new Semaphore(1);
    81 
    84 
    82     // we send an initial warm up request, then MAX_STREAMS+1 requests
    85     // we send an initial warm up request, then MAX_STREAMS+1 requests
    83     // in parallel. The last of them should hit the limit.
    86     // in parallel. The last of them should hit the limit.
    84     // Then we wait for all the responses and send a further request
    87     // Then we wait for all the responses and send a further request
    85     // which should succeed. The server should see (and respond to)
    88     // which should succeed. The server should see (and respond to)
    99     }
   102     }
   100 
   103 
   101 
   104 
   102     @Test(dataProvider = "uris", timeOut=20000)
   105     @Test(dataProvider = "uris", timeOut=20000)
   103     void testAsString(String uri) throws Exception {
   106     void testAsString(String uri) throws Exception {
       
   107         canStartTestRun.acquire();
   104         latch = new CountDownLatch(1);
   108         latch = new CountDownLatch(1);
       
   109         handler.setLatch(latch);
   105         HttpClient client = HttpClient.newBuilder().build();
   110         HttpClient client = HttpClient.newBuilder().build();
   106         List<CompletableFuture<HttpResponse<String>>> responses = new LinkedList<>();
   111         List<CompletableFuture<HttpResponse<String>>> responses = new LinkedList<>();
   107 
   112 
   108         HttpRequest request = HttpRequest.newBuilder(URI.create(uri))
   113         HttpRequest request = HttpRequest.newBuilder(URI.create(uri))
   109                                          .version(HttpClient.Version.HTTP_2)
   114                                          .version(HttpClient.Version.HTTP_2)
   116 
   121 
   117         for (int i=0;i<MAX_STREAMS+1; i++) {
   122         for (int i=0;i<MAX_STREAMS+1; i++) {
   118             responses.add(client.sendAsync(request, BodyHandlers.ofString()));
   123             responses.add(client.sendAsync(request, BodyHandlers.ofString()));
   119         }
   124         }
   120 
   125 
   121         Thread.sleep(1000); // race possible even with latch
   126         // wait until we get local exception before allow server to proceed
       
   127         try {
       
   128             CompletableFuture.anyOf(responses.toArray(new CompletableFuture<?>[0])).join();
       
   129         } catch (Exception ee) {
       
   130             System.err.println("Expected exception 1 " + ee);
       
   131         }
       
   132 
   122         latch.countDown();
   133         latch.countDown();
   123 
   134 
   124         // check the first MAX_STREAMS requests succeeded
   135         // check the first MAX_STREAMS requests succeeded
   125         try {
   136         try {
   126             CompletableFuture.allOf(responses.toArray(new CompletableFuture<?>[0])).join();
   137             CompletableFuture.allOf(responses.toArray(new CompletableFuture<?>[0])).join();
       
   138             System.err.println("Did not get Expected exception 2 ");
   127         } catch (Exception ee) {
   139         } catch (Exception ee) {
   128             System.err.println("Expected exception " + ee);
   140             System.err.println("Expected exception 2 " + ee);
   129         }
   141         }
   130         int count = 0;
   142         int count = 0;
       
   143         int failures = 0;
   131         for (CompletableFuture<HttpResponse<String>> cf : responses) {
   144         for (CompletableFuture<HttpResponse<String>> cf : responses) {
   132             HttpResponse<String> r = null;
   145             HttpResponse<String> r = null;
   133             try {
   146             try {
       
   147                 count++;
   134                 r = cf.join();
   148                 r = cf.join();
   135                 if (r.statusCode() != 200 || !r.body().equals(RESPONSE))
   149                 if (r.statusCode() != 200 || !r.body().equals(RESPONSE))
   136                     throw new RuntimeException();
   150                     throw new RuntimeException();
   137             } catch (Throwable t) {
   151             } catch (Throwable t) {
   138                 System.err.println("Exception at count = " + count);
   152                 failures++;
       
   153                 System.err.printf("Failure %d at count %d\n", failures, count);
   139                 System.err.println(t);
   154                 System.err.println(t);
   140                 t.printStackTrace();
   155                 t.printStackTrace();
   141                 count++;
       
   142             }
   156             }
   143         }
   157         }
   144         if (count != 1) {
   158         if (failures != 1) {
   145             String msg = "Expected 1 failure. Got " + count;
   159             String msg = "Expected 1 failure. Got " + failures;
   146             throw new RuntimeException(msg);
   160             throw new RuntimeException(msg);
   147         }
   161         }
   148 
   162 
   149         // make sure it succeeds now as number of streams == 0 now
   163         // make sure it succeeds now as number of streams == 0 now
   150         HttpResponse<String> warmdown = client.send(request, BodyHandlers.ofString());
   164         HttpResponse<String> warmdown = client.send(request, BodyHandlers.ofString());
   161         InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
   175         InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
   162 
   176 
   163         Properties props = new Properties();
   177         Properties props = new Properties();
   164         props.setProperty("http2server.settings.max_concurrent_streams", Integer.toString(MAX_STREAMS));
   178         props.setProperty("http2server.settings.max_concurrent_streams", Integer.toString(MAX_STREAMS));
   165         http2TestServer = new Http2TestServer("localhost", false, 0, exec, 10, props, null);
   179         http2TestServer = new Http2TestServer("localhost", false, 0, exec, 10, props, null);
   166         http2TestServer.addHandler(new Http2FixedHandler(), "/http2/fixed");
   180         http2TestServer.addHandler(handler, "/http2/fixed");
   167         http2FixedURI = "http://" + http2TestServer.serverAuthority()+ "/http2/fixed";
   181         http2FixedURI = "http://" + http2TestServer.serverAuthority()+ "/http2/fixed";
   168         http2TestServer.start();
   182         http2TestServer.start();
   169 
   183 
   170         https2TestServer = new Http2TestServer("localhost", true, 0, exec, 10, props, ctx);
   184         https2TestServer = new Http2TestServer("localhost", true, 0, exec, 10, props, ctx);
   171         https2TestServer.addHandler(new Http2FixedHandler(), "/http2/fixed");
   185         https2TestServer.addHandler(handler, "/http2/fixed");
   172         https2FixedURI = "https://" + https2TestServer.serverAuthority()+ "/http2/fixed";
   186         https2FixedURI = "https://" + https2TestServer.serverAuthority()+ "/http2/fixed";
   173         https2TestServer.start();
   187         https2TestServer.start();
   174     }
   188     }
   175 
   189 
   176     @AfterTest
   190     @AfterTest
   177     public void teardown() throws Exception {
   191     public void teardown() throws Exception {
       
   192         System.err.println("Stopping test server now");
   178         http2TestServer.stop();
   193         http2TestServer.stop();
   179     }
   194     }
   180 
   195 
   181     class Http2FixedHandler implements Http2Handler {
   196     class Http2FixedHandler implements Http2Handler {
   182         volatile AtomicInteger counter = new AtomicInteger(0);
   197         final AtomicInteger counter = new AtomicInteger(0);
       
   198         CountDownLatch latch;
       
   199 
       
   200         synchronized void setLatch(CountDownLatch latch) {
       
   201             this.latch = latch;
       
   202         }
       
   203 
       
   204         synchronized CountDownLatch getLatch() {
       
   205             return latch;
       
   206         }
   183 
   207 
   184         @Override
   208         @Override
   185         public void handle(Http2TestExchange t) throws IOException {
   209         public void handle(Http2TestExchange t) throws IOException {
       
   210             int c = -1;
   186             try (InputStream is = t.getRequestBody();
   211             try (InputStream is = t.getRequestBody();
   187                  OutputStream os = t.getResponseBody()) {
   212                  OutputStream os = t.getResponseBody()) {
   188 
   213 
   189                 is.readAllBytes();
   214                 is.readAllBytes();
   190                 int c = counter.getAndIncrement();
   215                 c = counter.getAndIncrement();
   191                 if (c > 0 && c <= MAX_STREAMS) {
   216                 if (c > 0 && c <= MAX_STREAMS) {
   192                     // Wait for latch.
   217                     // Wait for latch.
   193                     try {
   218                     try {
   194                         // don't send any replies until all requests are sent
   219                         // don't send any replies until all requests are sent
   195                         System.err.println("latch await");
   220                         System.err.println("latch await");
   196                         latch.await();
   221                         getLatch().await();
   197                         System.err.println("latch resume");
   222                         System.err.println("latch resume");
   198                     } catch (InterruptedException ee) {}
   223                     } catch (InterruptedException ee) {}
   199                 }
   224                 }
   200                 if (c == MAX_STREAMS + 1)
       
   201                     counter = new AtomicInteger(0);
       
   202                 t.sendResponseHeaders(200, RESPONSE.length());
   225                 t.sendResponseHeaders(200, RESPONSE.length());
   203                 os.write(RESPONSE.getBytes());
   226                 os.write(RESPONSE.getBytes());
       
   227             } finally {
       
   228                 // client issues MAX_STREAMS + 3 requests in total
       
   229                 // but server should only see MAX_STREAMS + 2 in total. One is rejected by client
       
   230                 // counter c captured before increment so final value is MAX_STREAMS + 1
       
   231                 if (c == MAX_STREAMS + 1) {
       
   232                     counter.set(0);
       
   233                     canStartTestRun.release();
       
   234                 }
   204             }
   235             }
   205         }
   236         }
   206     }
   237     }
   207 }
   238 }