test/jdk/java/net/httpclient/BufferingSubscriberTest.java
branchhttp-client-branch
changeset 55763 634d8e14c172
child 55828 ac0c821cc75c
equal deleted inserted replaced
55762:e947a3a50a95 55763:634d8e14c172
       
     1 /*
       
     2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 import java.nio.ByteBuffer;
       
    25 import java.util.List;
       
    26 import java.util.Random;
       
    27 import java.util.concurrent.CompletableFuture;
       
    28 import java.util.concurrent.CompletionStage;
       
    29 import java.util.concurrent.Executor;
       
    30 import java.util.concurrent.ExecutorService;
       
    31 import java.util.concurrent.Executors;
       
    32 import java.util.concurrent.Flow;
       
    33 import java.util.concurrent.Flow.Subscription;
       
    34 import java.util.concurrent.SubmissionPublisher;
       
    35 import jdk.incubator.http.HttpResponse.BodyHandler;
       
    36 import jdk.incubator.http.HttpResponse.BodySubscriber;
       
    37 import jdk.test.lib.RandomFactory;
       
    38 import org.testng.annotations.DataProvider;
       
    39 import org.testng.annotations.Test;
       
    40 import static java.lang.Long.MAX_VALUE;
       
    41 import static java.lang.System.out;
       
    42 import static java.util.concurrent.CompletableFuture.delayedExecutor;
       
    43 import static java.util.concurrent.TimeUnit.MILLISECONDS;
       
    44 import static org.testng.Assert.*;
       
    45 
       
    46 /*
       
    47  * @test
       
    48  * @bug 8184285
       
    49  * @summary Direct test for HttpResponse.BodySubscriber.buffering() API
       
    50  * @key randomness
       
    51  * @library /test/lib
       
    52  * @build jdk.test.lib.RandomFactory
       
    53  * @run testng/othervm -Djdk.internal.httpclient.debug=true BufferingSubscriberTest
       
    54  */
       
    55 
       
    56 public class BufferingSubscriberTest {
       
    57 
       
    58     static final Random random = RandomFactory.getRandom();
       
    59 
       
    60     @DataProvider(name = "negatives")
       
    61     public Object[][] negatives() {
       
    62         return new Object[][] {  { 0 }, { -1 }, { -1000 } };
       
    63     }
       
    64 
       
    65     @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)
       
    66     public void subscriberThrowsIAE(int bufferSize) {
       
    67         BodySubscriber<?> bp = BodySubscriber.asByteArray();
       
    68         BodySubscriber.buffering(bp, bufferSize);
       
    69     }
       
    70 
       
    71     @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)
       
    72     public void handlerThrowsIAE(int bufferSize) {
       
    73         BodyHandler<?> bp = BodyHandler.asByteArray();
       
    74         BodyHandler.buffering(bp, bufferSize);
       
    75     }
       
    76 
       
    77     // ---
       
    78 
       
    79     @DataProvider(name = "config")
       
    80     public Object[][] config() {
       
    81         return new Object[][] {
       
    82             // iterations delayMillis numBuffers bufferSize maxBufferSize minBufferSize
       
    83             { 1,              0,          1,         1,         2,            1   },
       
    84             { 1,              0,          1,         10,        1000,         1   },
       
    85             { 1,              10,         1,         10,        1000,         1   },
       
    86             { 1,              0,          1,         1000,      1000,         1   },
       
    87             { 1,              0,          10,        1000,      1000,         1   },
       
    88             { 1,              0,          1000,      10 ,       1000,         50  },
       
    89             { 1,              100,        1,         1000 * 4,  1000,         1   },
       
    90             { 100,            0,          1000,      1,         2,            1   },
       
    91             { 3,              0,          4,         5006,      1000,         1   },
       
    92             { 20,             0,          100,       4888,      1000,         100 },
       
    93             { 16,             10,         1000,      50 ,       1000,         100 },
       
    94         };
       
    95     }
       
    96 
       
    97     @Test(dataProvider = "config")
       
    98     public void test(int iterations,
       
    99                      int delayMillis,
       
   100                      int numBuffers,
       
   101                      int bufferSize,
       
   102                      int maxBufferSize,
       
   103                      int minbufferSize) {
       
   104         for (long perRequestAmount : new long[] { 1L, MAX_VALUE })
       
   105             test(iterations,
       
   106                  delayMillis,
       
   107                  numBuffers,
       
   108                  bufferSize,
       
   109                  maxBufferSize,
       
   110                  minbufferSize,
       
   111                  perRequestAmount);
       
   112     }
       
   113 
       
   114     public void test(int iterations,
       
   115                      int delayMillis,
       
   116                      int numBuffers,
       
   117                      int bufferSize,
       
   118                      int maxBufferSize,
       
   119                      int minBufferSize,
       
   120                      long requestAmount) {
       
   121         ExecutorService executor = Executors.newFixedThreadPool(1);
       
   122         try {
       
   123 
       
   124             out.printf("Iterations %d\n", iterations);
       
   125             for (int i=0; i<iterations; i++ ) {
       
   126                 out.printf("Iteration: %d\n", i);
       
   127                 SubmissionPublisher<List<ByteBuffer>> publisher =
       
   128                         new SubmissionPublisher<>(executor, 1);
       
   129                 CompletableFuture<?> cf = sink(publisher,
       
   130                         delayMillis,
       
   131                         numBuffers * bufferSize,
       
   132                         requestAmount,
       
   133                         maxBufferSize,
       
   134                         minBufferSize);
       
   135                 source(publisher, numBuffers, bufferSize);
       
   136                 publisher.close();
       
   137                 cf.join();
       
   138             }
       
   139             out.println("OK");
       
   140         } finally {
       
   141             executor.shutdown();
       
   142         }
       
   143     }
       
   144 
       
   145     static int accumulatedDataSize(List<ByteBuffer> bufs) {
       
   146         return bufs.stream().mapToInt(ByteBuffer::remaining).sum();
       
   147     }
       
   148 
       
   149     /** Returns a new BB with its contents set to monotonically increasing
       
   150      * values, staring at the given start index and wrapping every 100. */
       
   151     static ByteBuffer allocateBuffer(int size, int startIdx) {
       
   152         ByteBuffer b = ByteBuffer.allocate(size);
       
   153         for (int i=0; i<size; i++)
       
   154             b.put((byte)((startIdx + i) % 100));
       
   155         b.position(0);
       
   156         return b;
       
   157     }
       
   158 
       
   159     static class TestSubscriber implements BodySubscriber<Integer> {
       
   160         final int delayMillis;
       
   161         final int bufferSize;
       
   162         final int expectedTotalSize;
       
   163         final long requestAmount;
       
   164         final CompletableFuture<Integer> completion;
       
   165         final Executor delayedExecutor;
       
   166         volatile Flow.Subscription subscription;
       
   167 
       
   168         TestSubscriber(int bufferSize,
       
   169                        int delayMillis,
       
   170                        int expectedTotalSize,
       
   171                        long requestAmount) {
       
   172             this.bufferSize = bufferSize;
       
   173             this.completion = new CompletableFuture<>();
       
   174             this.delayMillis = delayMillis;
       
   175             this.delayedExecutor = delayedExecutor(delayMillis, MILLISECONDS);
       
   176             this.expectedTotalSize = expectedTotalSize;
       
   177             this.requestAmount = requestAmount;
       
   178         }
       
   179 
       
   180         /**
       
   181          * Example of a factory method which would decorate a buffering
       
   182          * subscriber to create a new subscriber dependent on buffering capability.
       
   183          *
       
   184          * The integer type parameter simulates the body just by counting the
       
   185          * number of bytes in the body.
       
   186          */
       
   187         static BodySubscriber<Integer> createSubscriber(int bufferSize,
       
   188                                                         int delay,
       
   189                                                         int expectedTotalSize,
       
   190                                                         long requestAmount) {
       
   191             TestSubscriber s = new TestSubscriber(bufferSize,
       
   192                                                 delay,
       
   193                                                 expectedTotalSize,
       
   194                                                 requestAmount);
       
   195             return BodySubscriber.buffering(s, bufferSize);
       
   196         }
       
   197 
       
   198         private void requestMore() { subscription.request(requestAmount); }
       
   199 
       
   200         @Override
       
   201         public void onSubscribe(Subscription subscription) {
       
   202             assertNull(this.subscription);
       
   203             this.subscription = subscription;
       
   204             if (delayMillis > 0)
       
   205                 delayedExecutor.execute(this::requestMore);
       
   206             else
       
   207                 requestMore();
       
   208         }
       
   209 
       
   210         volatile int wrongSizes;
       
   211         volatile int totalBytesReceived;
       
   212         volatile int onNextInvocations;
       
   213         volatile int lastSeenSize = -1;
       
   214         volatile boolean noMoreOnNext; // false
       
   215         volatile int index; // 0
       
   216 
       
   217         @Override
       
   218         public void onNext(List<ByteBuffer> items) {
       
   219             long sz = accumulatedDataSize(items);
       
   220             onNextInvocations++;
       
   221             assertNotEquals(sz, 0L, "Unexpected empty buffers");
       
   222             items.stream().forEach(b -> assertEquals(b.position(), 0));
       
   223             assertFalse(noMoreOnNext);
       
   224 
       
   225             if (sz != bufferSize) {
       
   226                 String msg = sz + ", should be less than bufferSize, " + bufferSize;
       
   227                 assertTrue(sz < bufferSize, msg);
       
   228                 assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize);
       
   229                 noMoreOnNext = true;
       
   230                 wrongSizes++;
       
   231             } else {
       
   232                 assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize");
       
   233             }
       
   234 
       
   235             // Ensure expected contents
       
   236             for (ByteBuffer b : items) {
       
   237                 while (b.hasRemaining()) {
       
   238                     assertEquals(b.get(), (byte) (index % 100));
       
   239                     index++;
       
   240                 }
       
   241             }
       
   242 
       
   243             totalBytesReceived += sz;
       
   244             assertEquals(totalBytesReceived, index );
       
   245             if (delayMillis > 0)
       
   246                 delayedExecutor.execute(this::requestMore);
       
   247             else
       
   248                 requestMore();
       
   249         }
       
   250 
       
   251         @Override
       
   252         public void onError(Throwable throwable) {
       
   253             completion.completeExceptionally(throwable);
       
   254         }
       
   255 
       
   256         @Override
       
   257         public void onComplete() {
       
   258             if (wrongSizes > 1) { // allow just the final item to be smaller
       
   259                 String msg = "Wrong sizes. Expected no more than 1. [" + this + "]";
       
   260                 completion.completeExceptionally(new Throwable(msg));
       
   261             }
       
   262             if (totalBytesReceived != expectedTotalSize) {
       
   263                 String msg = "Wrong number of bytes. [" + this + "]";
       
   264                 completion.completeExceptionally(new Throwable(msg));
       
   265             } else {
       
   266                 completion.complete(totalBytesReceived);
       
   267             }
       
   268         }
       
   269 
       
   270         @Override
       
   271         public CompletionStage<Integer> getBody() { return completion; }
       
   272 
       
   273         @Override
       
   274         public String toString() {
       
   275             StringBuilder sb = new StringBuilder();
       
   276             sb.append(super.toString());
       
   277             sb.append(", bufferSize=").append(bufferSize);
       
   278             sb.append(", onNextInvocations=").append(onNextInvocations);
       
   279             sb.append(", totalBytesReceived=").append(totalBytesReceived);
       
   280             sb.append(", expectedTotalSize=").append(expectedTotalSize);
       
   281             sb.append(", requestAmount=").append(requestAmount);
       
   282             sb.append(", lastSeenSize=").append(lastSeenSize);
       
   283             sb.append(", wrongSizes=").append(wrongSizes);
       
   284             sb.append(", index=").append(index);
       
   285             return sb.toString();
       
   286         }
       
   287     }
       
   288 
       
   289     /**
       
   290      * Publishes data, through the given publisher, using the main thread.
       
   291      *
       
   292      * Note: The executor supplied when creating the SubmissionPublisher provides
       
   293      * the threads for executing the Subscribers.
       
   294      *
       
   295      * @param publisher the publisher
       
   296      * @param numBuffers the number of buffers to send ( before splitting in two )
       
   297      * @param bufferSize the total size of the data to send ( before splitting in two )
       
   298      */
       
   299     static void source(SubmissionPublisher<List<ByteBuffer>> publisher,
       
   300                        int numBuffers,
       
   301                        int bufferSize) {
       
   302         out.printf("Publishing %d buffers of size %d each\n", numBuffers, bufferSize);
       
   303         int index = 0;
       
   304         for (int i=0; i<numBuffers; i++) {
       
   305             int chunkSize = random.nextInt(bufferSize);
       
   306             ByteBuffer buf1 = allocateBuffer(chunkSize, index);
       
   307             index += chunkSize;
       
   308             ByteBuffer buf2 = allocateBuffer(bufferSize - chunkSize, index);
       
   309             index += bufferSize - chunkSize;
       
   310             publisher.submit(List.of(buf1, buf2));
       
   311         }
       
   312         out.println("source complete");
       
   313     }
       
   314 
       
   315     /**
       
   316      * Creates and subscribes Subscribers that receive data from the given
       
   317      * publisher.
       
   318      *
       
   319      * @param publisher the publisher
       
   320      * @param delayMillis time, in milliseconds, to delay the Subscription
       
   321      *                    requesting more bytes ( for simulating slow consumption )
       
   322      * @param expectedTotalSize the total number of bytes expected to be received
       
   323      *                          by the subscribers
       
   324      * @return a CompletableFuture which completes when the subscription is complete
       
   325      */
       
   326     static CompletableFuture<?> sink(SubmissionPublisher<List<ByteBuffer>> publisher,
       
   327                                      int delayMillis,
       
   328                                      int expectedTotalSize,
       
   329                                      long requestAmount,
       
   330                                      int maxBufferSize,
       
   331                                      int minBufferSize) {
       
   332         int bufferSize = random.nextInt(maxBufferSize - minBufferSize) + minBufferSize;
       
   333         BodySubscriber<Integer> sub = TestSubscriber.createSubscriber(bufferSize,
       
   334                                                                     delayMillis,
       
   335                                                                     expectedTotalSize,
       
   336                                                                     requestAmount);
       
   337         publisher.subscribe(sub);
       
   338         out.printf("Subscriber reads data with buffer size: %d\n", bufferSize);
       
   339         out.printf("Subscription delay is %d msec\n", delayMillis);
       
   340         out.printf("Request amount is %d items\n", requestAmount);
       
   341         return sub.getBody().toCompletableFuture();
       
   342     }
       
   343 
       
   344     // ---
       
   345 
       
   346     // TODO: Add a test for cancel
       
   347 
       
   348     // ---
       
   349 
       
   350     /* Main entry point for standalone testing of the main functional test. */
       
   351     public static void main(String... args) {
       
   352         BufferingSubscriberTest t = new BufferingSubscriberTest();
       
   353         for (Object[] objs : t.config())
       
   354             t.test((int)objs[0], (int)objs[1], (int)objs[2], (int)objs[3], (int)objs[4], (int)objs[5]);
       
   355     }
       
   356 }