test/jdk/java/net/httpclient/FlowAdapterPublisherTest.java
changeset 48408 4f830b447edf
child 49765 ee6f7a61f3a5
child 56076 9a2855e0a796
equal deleted inserted replaced
48407:fcb5b835bf32 48408:4f830b447edf
       
     1 /*
       
     2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 import java.io.IOException;
       
    25 import java.io.InputStream;
       
    26 import java.io.OutputStream;
       
    27 import java.net.InetSocketAddress;
       
    28 import java.net.URI;
       
    29 import java.nio.ByteBuffer;
       
    30 import java.nio.MappedByteBuffer;
       
    31 import java.util.Arrays;
       
    32 import java.util.concurrent.Flow;
       
    33 import java.util.concurrent.Flow.Publisher;
       
    34 import java.util.concurrent.atomic.AtomicBoolean;
       
    35 import java.util.concurrent.atomic.AtomicInteger;
       
    36 import java.util.concurrent.atomic.AtomicLong;
       
    37 import com.sun.net.httpserver.HttpExchange;
       
    38 import com.sun.net.httpserver.HttpHandler;
       
    39 import com.sun.net.httpserver.HttpServer;
       
    40 import com.sun.net.httpserver.HttpsConfigurator;
       
    41 import com.sun.net.httpserver.HttpsServer;
       
    42 import jdk.incubator.http.HttpClient;
       
    43 import jdk.incubator.http.HttpRequest;
       
    44 import jdk.incubator.http.HttpResponse;
       
    45 import jdk.testlibrary.SimpleSSLContext;
       
    46 import org.testng.annotations.AfterTest;
       
    47 import org.testng.annotations.BeforeTest;
       
    48 import org.testng.annotations.DataProvider;
       
    49 import org.testng.annotations.Test;
       
    50 import javax.net.ssl.SSLContext;
       
    51 import static java.util.stream.Collectors.joining;
       
    52 import static java.nio.charset.StandardCharsets.UTF_8;
       
    53 import static jdk.incubator.http.HttpRequest.BodyPublisher.fromPublisher;
       
    54 import static jdk.incubator.http.HttpResponse.BodyHandler.asString;
       
    55 import static org.testng.Assert.assertEquals;
       
    56 import static org.testng.Assert.assertThrows;
       
    57 import static org.testng.Assert.assertTrue;
       
    58 import static org.testng.Assert.fail;
       
    59 
       
    60 /*
       
    61  * @test
       
    62  * @summary Basic tests for Flow adapter Publishers
       
    63  * @modules java.base/sun.net.www.http
       
    64  *          jdk.incubator.httpclient/jdk.incubator.http.internal.common
       
    65  *          jdk.incubator.httpclient/jdk.incubator.http.internal.frame
       
    66  *          jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
       
    67  *          java.logging
       
    68  *          jdk.httpserver
       
    69  * @library /lib/testlibrary http2/server
       
    70  * @build Http2TestServer
       
    71  * @build jdk.testlibrary.SimpleSSLContext
       
    72  * @run testng/othervm FlowAdapterPublisherTest
       
    73  */
       
    74 
       
    75 public class FlowAdapterPublisherTest {
       
    76 
       
    77     SSLContext sslContext;
       
    78     HttpServer httpTestServer;         // HTTP/1.1    [ 4 servers ]
       
    79     HttpsServer httpsTestServer;       // HTTPS/1.1
       
    80     Http2TestServer http2TestServer;   // HTTP/2 ( h2c )
       
    81     Http2TestServer https2TestServer;  // HTTP/2 ( h2  )
       
    82     String httpURI;
       
    83     String httpsURI;
       
    84     String http2URI;
       
    85     String https2URI;
       
    86 
       
    87     @DataProvider(name = "uris")
       
    88     public Object[][] variants() {
       
    89         return new Object[][]{
       
    90                 { httpURI   },
       
    91                 { httpsURI  },
       
    92                 { http2URI  },
       
    93                 { https2URI },
       
    94         };
       
    95     }
       
    96 
       
    97     static final Class<NullPointerException> NPE = NullPointerException.class;
       
    98     static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;
       
    99 
       
   100     @Test
       
   101     public void testAPIExceptions() {
       
   102         assertThrows(NPE, () -> fromPublisher(null));
       
   103         assertThrows(NPE, () -> fromPublisher(null, 1));
       
   104         assertThrows(IAE, () -> fromPublisher(new BBPublisher(), 0));
       
   105         assertThrows(IAE, () -> fromPublisher(new BBPublisher(), -1));
       
   106         assertThrows(IAE, () -> fromPublisher(new BBPublisher(), Long.MIN_VALUE));
       
   107 
       
   108         Publisher publisher = fromPublisher(new BBPublisher());
       
   109         assertThrows(NPE, () -> publisher.subscribe(null));
       
   110     }
       
   111 
       
   112     //  Flow.Publisher<ByteBuffer>
       
   113 
       
   114     @Test(dataProvider = "uris")
       
   115     void testByteBufferPublisherUnknownLength(String url) {
       
   116         String[] body = new String[] { "You know ", "it's summer ", "in Ireland ",
       
   117                 "when the ", "rain gets ", "warmer." };
       
   118         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
       
   119         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
       
   120                 .POST(fromPublisher(new BBPublisher(body))).build();
       
   121 
       
   122         HttpResponse<String> response = client.sendAsync(request, asString(UTF_8)).join();
       
   123         String text = response.body();
       
   124         System.out.println(text);
       
   125         assertEquals(response.statusCode(), 200);
       
   126         assertEquals(text, Arrays.stream(body).collect(joining()));
       
   127     }
       
   128 
       
   129     @Test(dataProvider = "uris")
       
   130     void testByteBufferPublisherFixedLength(String url) {
       
   131         String[] body = new String[] { "You know ", "it's summer ", "in Ireland ",
       
   132                 "when the ", "rain gets ", "warmer." };
       
   133         int cl = Arrays.stream(body).mapToInt(String::length).sum();
       
   134         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
       
   135         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
       
   136                 .POST(fromPublisher(new BBPublisher(body), cl)).build();
       
   137 
       
   138         HttpResponse<String> response = client.sendAsync(request, asString(UTF_8)).join();
       
   139         String text = response.body();
       
   140         System.out.println(text);
       
   141         assertEquals(response.statusCode(), 200);
       
   142         assertEquals(text, Arrays.stream(body).collect(joining()));
       
   143     }
       
   144 
       
   145     // Flow.Publisher<MappedByteBuffer>
       
   146 
       
   147     @Test(dataProvider = "uris")
       
   148     void testMappedByteBufferPublisherUnknownLength(String url) {
       
   149         String[] body = new String[] { "God invented ", "whiskey to ", "keep the ",
       
   150                 "Irish from ", "ruling the ", "world." };
       
   151         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
       
   152         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
       
   153                 .POST(fromPublisher(new MBBPublisher(body))).build();
       
   154 
       
   155         HttpResponse<String> response = client.sendAsync(request, asString(UTF_8)).join();
       
   156         String text = response.body();
       
   157         System.out.println(text);
       
   158         assertEquals(response.statusCode(), 200);
       
   159         assertEquals(text, Arrays.stream(body).collect(joining()));
       
   160     }
       
   161 
       
   162     @Test(dataProvider = "uris")
       
   163     void testMappedByteBufferPublisherFixedLength(String url) {
       
   164         String[] body = new String[] { "God invented ", "whiskey to ", "keep the ",
       
   165                 "Irish from ", "ruling the ", "world." };
       
   166         int cl = Arrays.stream(body).mapToInt(String::length).sum();
       
   167         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
       
   168         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
       
   169                 .POST(fromPublisher(new MBBPublisher(body), cl)).build();
       
   170 
       
   171         HttpResponse<String> response = client.sendAsync(request, asString(UTF_8)).join();
       
   172         String text = response.body();
       
   173         System.out.println(text);
       
   174         assertEquals(response.statusCode(), 200);
       
   175         assertEquals(text, Arrays.stream(body).collect(joining()));
       
   176     }
       
   177 
       
   178     // The following two tests depend on Exception detail messages, which is
       
   179     // not ideal, but necessary to discern correct behavior. They should be
       
   180     // updated if the exception message is updated.
       
   181 
       
   182     @Test(dataProvider = "uris")
       
   183     void testPublishTooFew(String url) throws InterruptedException {
       
   184         String[] body = new String[] { "You know ", "it's summer ", "in Ireland ",
       
   185                 "when the ", "rain gets ", "warmer." };
       
   186         int cl = Arrays.stream(body).mapToInt(String::length).sum() + 1; // length + 1
       
   187         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
       
   188         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
       
   189                 .POST(fromPublisher(new BBPublisher(body), cl)).build();
       
   190 
       
   191         try {
       
   192             HttpResponse<String> response = client.send(request, asString(UTF_8));
       
   193             fail("Unexpected response: " + response);
       
   194         } catch (IOException expected) {
       
   195             assertTrue(expected.getMessage().contains("Too few bytes returned"),
       
   196                        "Exception message:[" + expected.toString() + "]");
       
   197         }
       
   198     }
       
   199 
       
   200     @Test(dataProvider = "uris")
       
   201     void testPublishTooMany(String url) throws InterruptedException {
       
   202         String[] body = new String[] { "You know ", "it's summer ", "in Ireland ",
       
   203                 "when the ", "rain gets ", "warmer." };
       
   204         int cl = Arrays.stream(body).mapToInt(String::length).sum() - 1; // length - 1
       
   205         HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build();
       
   206         HttpRequest request = HttpRequest.newBuilder(URI.create(url))
       
   207                 .POST(fromPublisher(new BBPublisher(body), cl)).build();
       
   208 
       
   209         try {
       
   210             HttpResponse<String> response = client.send(request, asString(UTF_8));
       
   211             fail("Unexpected response: " + response);
       
   212         } catch (IOException expected) {
       
   213             assertTrue(expected.getMessage().contains("Too many bytes in request body"),
       
   214                     "Exception message:[" + expected.toString() + "]");
       
   215         }
       
   216     }
       
   217 
       
   218     static class BBPublisher extends AbstractPublisher
       
   219         implements Flow.Publisher<ByteBuffer>
       
   220     {
       
   221         BBPublisher(String... bodyParts) { super(bodyParts); }
       
   222 
       
   223         @Override
       
   224         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
       
   225             this.subscriber = subscriber;
       
   226             subscriber.onSubscribe(new InternalSubscription());
       
   227         }
       
   228     }
       
   229 
       
   230     static class MBBPublisher extends AbstractPublisher
       
   231         implements Flow.Publisher<MappedByteBuffer>
       
   232     {
       
   233         MBBPublisher(String... bodyParts) { super(bodyParts); }
       
   234 
       
   235         @Override
       
   236         public void subscribe(Flow.Subscriber<? super MappedByteBuffer> subscriber) {
       
   237             this.subscriber = subscriber;
       
   238             subscriber.onSubscribe(new InternalSubscription());
       
   239         }
       
   240     }
       
   241 
       
   242     static abstract class AbstractPublisher {
       
   243         private final String[] bodyParts;
       
   244         protected volatile Flow.Subscriber subscriber;
       
   245 
       
   246         AbstractPublisher(String... bodyParts) {
       
   247             this.bodyParts = bodyParts;
       
   248         }
       
   249 
       
   250         class InternalSubscription implements Flow.Subscription {
       
   251 
       
   252             private final AtomicLong demand = new AtomicLong();
       
   253             private final AtomicBoolean cancelled = new AtomicBoolean();
       
   254             private volatile int position;
       
   255 
       
   256             private static final int IDLE    =  1;
       
   257             private static final int PUSHING =  2;
       
   258             private static final int AGAIN   =  4;
       
   259             private final AtomicInteger state = new AtomicInteger(IDLE);
       
   260 
       
   261             @Override
       
   262             public void request(long n) {
       
   263                 if (n <= 0L) {
       
   264                     subscriber.onError(new IllegalArgumentException(
       
   265                             "non-positive subscription request"));
       
   266                     return;
       
   267                 }
       
   268                 if (cancelled.get()) {
       
   269                     return;
       
   270                 }
       
   271 
       
   272                 while (true) {
       
   273                     long prev = demand.get(), d;
       
   274                     if ((d = prev + n) < prev) // saturate
       
   275                         d = Long.MAX_VALUE;
       
   276                     if (demand.compareAndSet(prev, d))
       
   277                         break;
       
   278                 }
       
   279 
       
   280                 while (true) {
       
   281                     int s = state.get();
       
   282                     if (s == IDLE) {
       
   283                         if (state.compareAndSet(IDLE, PUSHING)) {
       
   284                             while (true) {
       
   285                                 push();
       
   286                                 if (state.compareAndSet(PUSHING, IDLE))
       
   287                                     return;
       
   288                                 else if (state.compareAndSet(AGAIN, PUSHING))
       
   289                                     continue;
       
   290                             }
       
   291                         }
       
   292                     } else if (s == PUSHING) {
       
   293                         if (state.compareAndSet(PUSHING, AGAIN))
       
   294                             return;
       
   295                     } else if (s == AGAIN){
       
   296                         // do nothing, the pusher will already rerun
       
   297                         return;
       
   298                     } else {
       
   299                         throw new AssertionError("Unknown state:" + s);
       
   300                     }
       
   301                 }
       
   302             }
       
   303 
       
   304             private void push() {
       
   305                 long prev;
       
   306                 while ((prev = demand.get()) > 0) {
       
   307                     if (!demand.compareAndSet(prev, prev -1))
       
   308                         continue;
       
   309 
       
   310                     int index = position;
       
   311                     if (index < bodyParts.length) {
       
   312                         position++;
       
   313                         subscriber.onNext(ByteBuffer.wrap(bodyParts[index].getBytes(UTF_8)));
       
   314                     }
       
   315                 }
       
   316 
       
   317                 if (position == bodyParts.length && !cancelled.get()) {
       
   318                     cancelled.set(true);
       
   319                     subscriber.onComplete();
       
   320                 }
       
   321             }
       
   322 
       
   323             @Override
       
   324             public void cancel() {
       
   325                 if (cancelled.compareAndExchange(false, true))
       
   326                     return;  // already cancelled
       
   327             }
       
   328         }
       
   329     }
       
   330 
       
   331     @BeforeTest
       
   332     public void setup() throws Exception {
       
   333         sslContext = new SimpleSSLContext().get();
       
   334         if (sslContext == null)
       
   335             throw new AssertionError("Unexpected null sslContext");
       
   336 
       
   337         InetSocketAddress sa = new InetSocketAddress("localhost", 0);
       
   338         httpTestServer = HttpServer.create(sa, 0);
       
   339         httpTestServer.createContext("/http1/echo", new Http1EchoHandler());
       
   340         httpURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/echo";
       
   341 
       
   342         httpsTestServer = HttpsServer.create(sa, 0);
       
   343         httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
       
   344         httpsTestServer.createContext("/https1/echo", new Http1EchoHandler());
       
   345         httpsURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/echo";
       
   346 
       
   347         http2TestServer = new Http2TestServer("127.0.0.1", false, 0);
       
   348         http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo");
       
   349         int port = http2TestServer.getAddress().getPort();
       
   350         http2URI = "http://127.0.0.1:" + port + "/http2/echo";
       
   351 
       
   352         https2TestServer = new Http2TestServer("127.0.0.1", true, 0);
       
   353         https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo");
       
   354         port = https2TestServer.getAddress().getPort();
       
   355         https2URI = "https://127.0.0.1:" + port + "/https2/echo";
       
   356 
       
   357         httpTestServer.start();
       
   358         httpsTestServer.start();
       
   359         http2TestServer.start();
       
   360         https2TestServer.start();
       
   361     }
       
   362 
       
   363     @AfterTest
       
   364     public void teardown() throws Exception {
       
   365         httpTestServer.stop(0);
       
   366         httpsTestServer.stop(0);
       
   367         http2TestServer.stop();
       
   368         https2TestServer.stop();
       
   369     }
       
   370 
       
   371     static class Http1EchoHandler implements HttpHandler {
       
   372         @Override
       
   373         public void handle(HttpExchange t) throws IOException {
       
   374             try (InputStream is = t.getRequestBody();
       
   375                  OutputStream os = t.getResponseBody()) {
       
   376                 byte[] bytes = is.readAllBytes();
       
   377                 t.sendResponseHeaders(200, bytes.length);
       
   378                 os.write(bytes);
       
   379             }
       
   380         }
       
   381     }
       
   382 
       
   383     static class Http2EchoHandler implements Http2Handler {
       
   384         @Override
       
   385         public void handle(Http2TestExchange t) throws IOException {
       
   386             try (InputStream is = t.getRequestBody();
       
   387                  OutputStream os = t.getResponseBody()) {
       
   388                 byte[] bytes = is.readAllBytes();
       
   389                 t.sendResponseHeaders(200, bytes.length);
       
   390                 os.write(bytes);
       
   391             }
       
   392         }
       
   393     }
       
   394 }