test/jdk/java/net/httpclient/ResponsePublisher.java
branchhttp-client-branch
changeset 56379 c59f684f1eda
child 56384 b5451b2aca3d
equal deleted inserted replaced
56378:41fe61be5930 56379:c59f684f1eda
       
     1 /*
       
     2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 /*
       
    25  * @test
       
    26  * @summary Tests an asynchronous BodySubscriber that completes
       
    27  *          immediately with a Publisher<List<ByteBuffer>>
       
    28  * @library /lib/testlibrary http2/server
       
    29  * @build jdk.testlibrary.SimpleSSLContext
       
    30  * @modules java.base/sun.net.www.http
       
    31  *          java.net.http/jdk.internal.net.http.common
       
    32  *          java.net.http/jdk.internal.net.http.frame
       
    33  *          java.net.http/jdk.internal.net.http.hpack
       
    34  * @run testng/othervm ResponsePublisher
       
    35  */
       
    36 
       
    37 import com.sun.net.httpserver.HttpExchange;
       
    38 import com.sun.net.httpserver.HttpHandler;
       
    39 import com.sun.net.httpserver.HttpServer;
       
    40 import com.sun.net.httpserver.HttpsConfigurator;
       
    41 import com.sun.net.httpserver.HttpsServer;
       
    42 import jdk.testlibrary.SimpleSSLContext;
       
    43 import org.testng.annotations.AfterTest;
       
    44 import org.testng.annotations.BeforeTest;
       
    45 import org.testng.annotations.DataProvider;
       
    46 import org.testng.annotations.Test;
       
    47 
       
    48 import javax.net.ssl.SSLContext;
       
    49 import java.io.IOException;
       
    50 import java.io.InputStream;
       
    51 import java.io.OutputStream;
       
    52 import java.net.InetAddress;
       
    53 import java.net.InetSocketAddress;
       
    54 import java.net.URI;
       
    55 import java.net.http.HttpClient;
       
    56 import java.net.http.HttpHeaders;
       
    57 import java.net.http.HttpRequest;
       
    58 import java.net.http.HttpResponse;
       
    59 import java.net.http.HttpResponse.BodyHandler;
       
    60 import java.net.http.HttpResponse.BodySubscriber;
       
    61 import java.net.http.HttpResponse.BodySubscribers;
       
    62 import java.nio.ByteBuffer;
       
    63 import java.util.List;
       
    64 import java.util.Objects;
       
    65 import java.util.concurrent.CompletableFuture;
       
    66 import java.util.concurrent.CompletionStage;
       
    67 import java.util.concurrent.Executor;
       
    68 import java.util.concurrent.Executors;
       
    69 import java.util.concurrent.Flow;
       
    70 import java.util.concurrent.Flow.Publisher;
       
    71 import java.util.concurrent.atomic.AtomicReference;
       
    72 
       
    73 import static java.lang.System.out;
       
    74 import static java.nio.charset.StandardCharsets.UTF_8;
       
    75 import static org.testng.Assert.assertEquals;
       
    76 import static org.testng.Assert.assertNotNull;
       
    77 import static org.testng.Assert.assertTrue;
       
    78 
       
    79 public class ResponsePublisher {
       
    80 
       
    81     SSLContext sslContext;
       
    82     HttpServer httpTestServer;         // HTTP/1.1    [ 4 servers ]
       
    83     HttpsServer httpsTestServer;       // HTTPS/1.1
       
    84     Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
       
    85     Http2TestServer https2TestServer;  // HTTP/2 ( h2  )
       
    86     String httpURI_fixed;
       
    87     String httpURI_chunk;
       
    88     String httpsURI_fixed;
       
    89     String httpsURI_chunk;
       
    90     String http2URI_fixed;
       
    91     String http2URI_chunk;
       
    92     String https2URI_fixed;
       
    93     String https2URI_chunk;
       
    94 
       
    95     static final int ITERATION_COUNT = 10;
       
    96     // a shared executor helps reduce the amount of threads created by the test
       
    97     static final Executor executor = Executors.newCachedThreadPool();
       
    98 
       
    99     @DataProvider(name = "variants")
       
   100     public Object[][] variants() {
       
   101         return new Object[][]{
       
   102                 { httpURI_fixed,    false },
       
   103                 { httpURI_chunk,    false },
       
   104                 { httpsURI_fixed,   false },
       
   105                 { httpsURI_chunk,   false },
       
   106                 { http2URI_fixed,   false },
       
   107                 { http2URI_chunk,   false },
       
   108                 { https2URI_fixed,  false,},
       
   109                 { https2URI_chunk,  false },
       
   110 
       
   111                 { httpURI_fixed,    true },
       
   112                 { httpURI_chunk,    true },
       
   113                 { httpsURI_fixed,   true },
       
   114                 { httpsURI_chunk,   true },
       
   115                 { http2URI_fixed,   true },
       
   116                 { http2URI_chunk,   true },
       
   117                 { https2URI_fixed,  true,},
       
   118                 { https2URI_chunk,  true },
       
   119         };
       
   120     }
       
   121 
       
   122     HttpClient newHttpClient() {
       
   123         return HttpClient.newBuilder()
       
   124                          .executor(executor)
       
   125                          .sslContext(sslContext)
       
   126                          .build();
       
   127     }
       
   128 
       
   129     @Test(dataProvider = "variants")
       
   130     public void testNoBody(String uri, boolean sameClient) throws Exception {
       
   131         HttpClient client = null;
       
   132         for (int i=0; i< ITERATION_COUNT; i++) {
       
   133             if (!sameClient || client == null)
       
   134                 client = newHttpClient();
       
   135 
       
   136             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
       
   137                     .build();
       
   138             BodyHandler<Publisher<List<ByteBuffer>>> handler = new PublishingBodyHandler();
       
   139             HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler);
       
   140             // We can reuse our BodySubscribers implementations to subscribe to the
       
   141             // Publisher<List<ByteBuffer>>
       
   142             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
       
   143             // get the Publisher<List<ByteBuffer>> and
       
   144             // subscribe to it.
       
   145             response.body().subscribe(ofString);
       
   146             // Get the final result and compare it with the expected body
       
   147             String body = ofString.getBody().toCompletableFuture().get();
       
   148             assertEquals(body, "");
       
   149         }
       
   150     }
       
   151 
       
   152     @Test(dataProvider = "variants")
       
   153     public void testNoBodyAsync(String uri, boolean sameClient) throws Exception {
       
   154         HttpClient client = null;
       
   155         for (int i=0; i< ITERATION_COUNT; i++) {
       
   156             if (!sameClient || client == null)
       
   157                 client = newHttpClient();
       
   158 
       
   159             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
       
   160                     .build();
       
   161             BodyHandler<Publisher<List<ByteBuffer>>> handler = new PublishingBodyHandler();
       
   162             // We can reuse our BodySubscribers implementations to subscribe to the
       
   163             // Publisher<List<ByteBuffer>>
       
   164             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
       
   165             CompletableFuture<String> result =
       
   166                     client.sendAsync(req, handler).thenCompose(
       
   167                             (responsePublisher) -> {
       
   168                                 // get the Publisher<List<ByteBuffer>> and
       
   169                                 // subscribe to it.
       
   170                                 responsePublisher.body().subscribe(ofString);
       
   171                                 return ofString.getBody();
       
   172                             });
       
   173             // Get the final result and compare it with the expected body
       
   174             assertEquals(result.get(), "");
       
   175         }
       
   176     }
       
   177 
       
   178     @Test(dataProvider = "variants")
       
   179     public void testAsString(String uri, boolean sameClient) throws Exception {
       
   180         HttpClient client = null;
       
   181         for (int i=0; i< ITERATION_COUNT; i++) {
       
   182             if (!sameClient || client == null)
       
   183                 client = newHttpClient();
       
   184 
       
   185             HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody"))
       
   186                     .build();
       
   187             BodyHandler<Publisher<List<ByteBuffer>>> handler = new PublishingBodyHandler();
       
   188             HttpResponse<Publisher<List<ByteBuffer>>> response = client.send(req, handler);
       
   189             // We can reuse our BodySubscribers implementations to subscribe to the
       
   190             // Publisher<List<ByteBuffer>>
       
   191             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
       
   192             // get the Publisher<List<ByteBuffer>> and
       
   193             // subscribe to it.
       
   194             response.body().subscribe(ofString);
       
   195             // Get the final result and compare it with the expected body
       
   196             String body = ofString.getBody().toCompletableFuture().get();
       
   197             assertEquals(body, WITH_BODY);
       
   198         }
       
   199     }
       
   200 
       
   201     @Test(dataProvider = "variants")
       
   202     public void testAsStringAsync(String uri, boolean sameClient) throws Exception {
       
   203         HttpClient client = null;
       
   204         for (int i=0; i< ITERATION_COUNT; i++) {
       
   205             if (!sameClient || client == null)
       
   206                 client = newHttpClient();
       
   207 
       
   208             HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody"))
       
   209                     .build();
       
   210             BodyHandler<Publisher<List<ByteBuffer>>> handler = new PublishingBodyHandler();
       
   211             // We can reuse our BodySubscribers implementations to subscribe to the
       
   212             // Publisher<List<ByteBuffer>>
       
   213             BodySubscriber<String> ofString = BodySubscribers.ofString(UTF_8);
       
   214             CompletableFuture<String> result = client.sendAsync(req, handler)
       
   215                     .thenCompose((responsePublisher) -> {
       
   216                         // get the Publisher<List<ByteBuffer>> and
       
   217                         // subscribe to it.
       
   218                         responsePublisher.body().subscribe(ofString);
       
   219                         return ofString.getBody();
       
   220                     });
       
   221             // Get the final result and compare it with the expected body
       
   222             String body = result.get();
       
   223             assertEquals(body, WITH_BODY);
       
   224         }
       
   225     }
       
   226 
       
   227     // A BodyHandler that returns PublishingBodySubscriber instances
       
   228     static class PublishingBodyHandler implements BodyHandler<Publisher<List<ByteBuffer>>> {
       
   229         @Override
       
   230         public BodySubscriber<Publisher<List<ByteBuffer>>> apply(int statusCode, HttpHeaders responseHeaders) {
       
   231             assertEquals(statusCode, 200);
       
   232             return new PublishingBodySubscriber();
       
   233         }
       
   234     }
       
   235 
       
   236     // A BodySubscriber that returns a Publisher<List<ByteBuffer>>
       
   237     static class PublishingBodySubscriber implements BodySubscriber<Publisher<List<ByteBuffer>>> {
       
   238         private final CompletableFuture<Flow.Subscription> subscriptionCF = new CompletableFuture<>();
       
   239         private final CompletableFuture<Flow.Subscriber<? super List<ByteBuffer>>> subscribedCF = new CompletableFuture<>();
       
   240         private AtomicReference<Flow.Subscriber<? super List<ByteBuffer>>> subscriberRef = new AtomicReference<>();
       
   241         private final CompletionStage<Publisher<List<ByteBuffer>>> body =
       
   242                 //subscriptionCF.thenCompose((s) -> CompletableFuture.completedStage(this::subscribe));
       
   243                 CompletableFuture.completedStage(this::subscribe);
       
   244 
       
   245         private void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
       
   246             Objects.requireNonNull(subscriber, "subscriber must not be null");
       
   247             if (subscriberRef.compareAndSet(null, subscriber)) {
       
   248                 subscriptionCF.thenAccept((s) -> {
       
   249                     subscriber.onSubscribe(s);
       
   250                     subscribedCF.complete(subscriber);
       
   251                 });
       
   252             } else {
       
   253                 subscriber.onSubscribe(new Flow.Subscription() {
       
   254                     @Override public void request(long n) { }
       
   255                     @Override public void cancel() { }
       
   256                 });
       
   257                 subscriber.onError(new IOException("This publisher has already one subscriber"));
       
   258             }
       
   259         }
       
   260 
       
   261         @Override
       
   262         public void onSubscribe(Flow.Subscription subscription) {
       
   263             subscriptionCF.complete(subscription);
       
   264         }
       
   265 
       
   266         @Override
       
   267         public void onNext(List<ByteBuffer> item) {
       
   268             assert subscriptionCF.isDone(); // cannot be called before onSubscribe()
       
   269             Flow.Subscriber<? super List<ByteBuffer>> subscriber = subscriberRef.get();
       
   270             assert subscriber != null; // cannot be called before subscriber calls request(1)
       
   271             subscriber.onNext(item);
       
   272         }
       
   273 
       
   274         @Override
       
   275         public void onError(Throwable throwable) {
       
   276             assert subscriptionCF.isDone(); // cannot be called before onSubscribe()
       
   277             // onError can be called before request(1), and therefore can
       
   278             // be called before subscriberRef is set.
       
   279             subscribedCF.thenAccept(s -> s.onError(throwable));
       
   280         }
       
   281 
       
   282         @Override
       
   283         public void onComplete() {
       
   284             assert subscriptionCF.isDone(); // cannot be called before onSubscribe()
       
   285             // onComplete can be called before request(1), and therefore can
       
   286             // be called before subscriberRef is set.
       
   287             subscribedCF.thenAccept(s -> s.onComplete());
       
   288         }
       
   289 
       
   290         @Override
       
   291         public CompletionStage<Publisher<List<ByteBuffer>>> getBody() {
       
   292             return body;
       
   293         }
       
   294     }
       
   295 
       
   296     static String serverAuthority(HttpServer server) {
       
   297         return InetAddress.getLoopbackAddress().getHostName() + ":"
       
   298                 + server.getAddress().getPort();
       
   299     }
       
   300 
       
   301     @BeforeTest
       
   302     public void setup() throws Exception {
       
   303         sslContext = new SimpleSSLContext().get();
       
   304         if (sslContext == null)
       
   305             throw new AssertionError("Unexpected null sslContext");
       
   306 
       
   307         // HTTP/1.1
       
   308         HttpHandler h1_fixedLengthHandler = new HTTP1_FixedLengthHandler();
       
   309         HttpHandler h1_chunkHandler = new HTTP1_ChunkedHandler();
       
   310         InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
       
   311         httpTestServer = HttpServer.create(sa, 0);
       
   312         httpTestServer.createContext("/http1/fixed", h1_fixedLengthHandler);
       
   313         httpTestServer.createContext("/http1/chunk", h1_chunkHandler);
       
   314         httpURI_fixed = "http://" + serverAuthority(httpTestServer) + "/http1/fixed";
       
   315         httpURI_chunk = "http://" + serverAuthority(httpTestServer) + "/http1/chunk";
       
   316 
       
   317         httpsTestServer = HttpsServer.create(sa, 0);
       
   318         httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
       
   319         httpsTestServer.createContext("/https1/fixed", h1_fixedLengthHandler);
       
   320         httpsTestServer.createContext("/https1/chunk", h1_chunkHandler);
       
   321         httpsURI_fixed = "https://" + serverAuthority(httpsTestServer) + "/https1/fixed";
       
   322         httpsURI_chunk = "https://" + serverAuthority(httpsTestServer) + "/https1/chunk";
       
   323 
       
   324         // HTTP/2
       
   325         Http2Handler h2_fixedLengthHandler = new HTTP2_FixedLengthHandler();
       
   326         Http2Handler h2_chunkedHandler = new HTTP2_VariableHandler();
       
   327 
       
   328         http2TestServer = new Http2TestServer("localhost", false, 0);
       
   329         http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
       
   330         http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
       
   331         http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed";
       
   332         http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk";
       
   333 
       
   334         https2TestServer = new Http2TestServer("localhost", true, 0);
       
   335         https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
       
   336         https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
       
   337         https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed";
       
   338         https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk";
       
   339 
       
   340         httpTestServer.start();
       
   341         httpsTestServer.start();
       
   342         http2TestServer.start();
       
   343         https2TestServer.start();
       
   344     }
       
   345 
       
   346     @AfterTest
       
   347     public void teardown() throws Exception {
       
   348         httpTestServer.stop(0);
       
   349         httpsTestServer.stop(0);
       
   350         http2TestServer.stop();
       
   351         https2TestServer.stop();
       
   352     }
       
   353 
       
   354     static final String WITH_BODY = "Lorem ipsum dolor sit amet, consectetur" +
       
   355             " adipiscing elit, sed do eiusmod tempor incididunt ut labore et" +
       
   356             " dolore magna aliqua. Ut enim ad minim veniam, quis nostrud" +
       
   357             " exercitation ullamco laboris nisi ut aliquip ex ea" +
       
   358             " commodo consequat. Duis aute irure dolor in reprehenderit in " +
       
   359             "voluptate velit esse cillum dolore eu fugiat nulla pariatur." +
       
   360             " Excepteur sint occaecat cupidatat non proident, sunt in culpa qui" +
       
   361             " officia deserunt mollit anim id est laborum.";
       
   362 
       
   363     static class HTTP1_FixedLengthHandler implements HttpHandler {
       
   364         @Override
       
   365         public void handle(HttpExchange t) throws IOException {
       
   366             out.println("HTTP1_FixedLengthHandler received request to " + t.getRequestURI());
       
   367             try (InputStream is = t.getRequestBody()) {
       
   368                 is.readAllBytes();
       
   369             }
       
   370             if (t.getRequestURI().getPath().endsWith("/withBody")) {
       
   371                 byte[] bytes = WITH_BODY.getBytes(UTF_8);
       
   372                 t.sendResponseHeaders(200, bytes.length);  // body
       
   373                 try (OutputStream os = t.getResponseBody()) {
       
   374                     os.write(bytes);
       
   375                 }
       
   376             } else {
       
   377                 t.sendResponseHeaders(200, -1);  //no body
       
   378             }
       
   379         }
       
   380     }
       
   381 
       
   382     static class HTTP1_ChunkedHandler implements HttpHandler {
       
   383         @Override
       
   384         public void handle(HttpExchange t) throws IOException {
       
   385             out.println("HTTP1_ChunkedHandler received request to " + t.getRequestURI());
       
   386             try (InputStream is = t.getRequestBody()) {
       
   387                 is.readAllBytes();
       
   388             }
       
   389             t.sendResponseHeaders(200, 0);  //chunked
       
   390             if (t.getRequestURI().getPath().endsWith("/withBody")) {
       
   391                 byte[] bytes = WITH_BODY.getBytes(UTF_8);
       
   392                 try (OutputStream os = t.getResponseBody()) {
       
   393                     os.write(bytes);
       
   394                 }
       
   395             } else {
       
   396                 t.getResponseBody().close();   // no body
       
   397             }
       
   398         }
       
   399     }
       
   400 
       
   401     static class HTTP2_FixedLengthHandler implements Http2Handler {
       
   402         @Override
       
   403         public void handle(Http2TestExchange t) throws IOException {
       
   404             out.println("HTTP2_FixedLengthHandler received request to " + t.getRequestURI());
       
   405             try (InputStream is = t.getRequestBody()) {
       
   406                 is.readAllBytes();
       
   407             }
       
   408             if (t.getRequestURI().getPath().endsWith("/withBody")) {
       
   409                 byte[] bytes = WITH_BODY.getBytes(UTF_8);
       
   410                 t.sendResponseHeaders(200, bytes.length);  // body
       
   411                 try (OutputStream os = t.getResponseBody()) {
       
   412                     os.write(bytes);
       
   413                 }
       
   414             } else {
       
   415                 t.sendResponseHeaders(200, -1);  //no body
       
   416             }
       
   417         }
       
   418     }
       
   419 
       
   420     static class HTTP2_VariableHandler implements Http2Handler {
       
   421         @Override
       
   422         public void handle(Http2TestExchange t) throws IOException {
       
   423             out.println("HTTP2_VariableHandler received request to " + t.getRequestURI());
       
   424             try (InputStream is = t.getRequestBody()) {
       
   425                 is.readAllBytes();
       
   426             }
       
   427             t.sendResponseHeaders(200, 0);  //chunked
       
   428             if (t.getRequestURI().getPath().endsWith("/withBody")) {
       
   429                 byte[] bytes = WITH_BODY.getBytes(UTF_8);
       
   430                 try (OutputStream os = t.getResponseBody()) {
       
   431                     os.write(bytes);
       
   432                 }
       
   433             } else {
       
   434                 t.getResponseBody().close();   // no body
       
   435             }
       
   436         }
       
   437     }
       
   438 }