test/jdk/java/net/httpclient/BufferingSubscriberTest.java
branchhttp-client-branch
changeset 55763 634d8e14c172
child 55828 ac0c821cc75c
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/BufferingSubscriberTest.java	Sun Nov 05 17:32:13 2017 +0000
@@ -0,0 +1,356 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.SubmissionPublisher;
+import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+import jdk.test.lib.RandomFactory;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import static java.lang.Long.MAX_VALUE;
+import static java.lang.System.out;
+import static java.util.concurrent.CompletableFuture.delayedExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.testng.Assert.*;
+
+/*
+ * @test
+ * @bug 8184285
+ * @summary Direct test for HttpResponse.BodySubscriber.buffering() API
+ * @key randomness
+ * @library /test/lib
+ * @build jdk.test.lib.RandomFactory
+ * @run testng/othervm -Djdk.internal.httpclient.debug=true BufferingSubscriberTest
+ */
+
+public class BufferingSubscriberTest {
+
+    static final Random random = RandomFactory.getRandom();
+
+    @DataProvider(name = "negatives")
+    public Object[][] negatives() {
+        return new Object[][] {  { 0 }, { -1 }, { -1000 } };
+    }
+
+    @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)
+    public void subscriberThrowsIAE(int bufferSize) {
+        BodySubscriber<?> bp = BodySubscriber.asByteArray();
+        BodySubscriber.buffering(bp, bufferSize);
+    }
+
+    @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)
+    public void handlerThrowsIAE(int bufferSize) {
+        BodyHandler<?> bp = BodyHandler.asByteArray();
+        BodyHandler.buffering(bp, bufferSize);
+    }
+
+    // ---
+
+    @DataProvider(name = "config")
+    public Object[][] config() {
+        return new Object[][] {
+            // iterations delayMillis numBuffers bufferSize maxBufferSize minBufferSize
+            { 1,              0,          1,         1,         2,            1   },
+            { 1,              0,          1,         10,        1000,         1   },
+            { 1,              10,         1,         10,        1000,         1   },
+            { 1,              0,          1,         1000,      1000,         1   },
+            { 1,              0,          10,        1000,      1000,         1   },
+            { 1,              0,          1000,      10 ,       1000,         50  },
+            { 1,              100,        1,         1000 * 4,  1000,         1   },
+            { 100,            0,          1000,      1,         2,            1   },
+            { 3,              0,          4,         5006,      1000,         1   },
+            { 20,             0,          100,       4888,      1000,         100 },
+            { 16,             10,         1000,      50 ,       1000,         100 },
+        };
+    }
+
+    @Test(dataProvider = "config")
+    public void test(int iterations,
+                     int delayMillis,
+                     int numBuffers,
+                     int bufferSize,
+                     int maxBufferSize,
+                     int minbufferSize) {
+        for (long perRequestAmount : new long[] { 1L, MAX_VALUE })
+            test(iterations,
+                 delayMillis,
+                 numBuffers,
+                 bufferSize,
+                 maxBufferSize,
+                 minbufferSize,
+                 perRequestAmount);
+    }
+
+    public void test(int iterations,
+                     int delayMillis,
+                     int numBuffers,
+                     int bufferSize,
+                     int maxBufferSize,
+                     int minBufferSize,
+                     long requestAmount) {
+        ExecutorService executor = Executors.newFixedThreadPool(1);
+        try {
+
+            out.printf("Iterations %d\n", iterations);
+            for (int i=0; i<iterations; i++ ) {
+                out.printf("Iteration: %d\n", i);
+                SubmissionPublisher<List<ByteBuffer>> publisher =
+                        new SubmissionPublisher<>(executor, 1);
+                CompletableFuture<?> cf = sink(publisher,
+                        delayMillis,
+                        numBuffers * bufferSize,
+                        requestAmount,
+                        maxBufferSize,
+                        minBufferSize);
+                source(publisher, numBuffers, bufferSize);
+                publisher.close();
+                cf.join();
+            }
+            out.println("OK");
+        } finally {
+            executor.shutdown();
+        }
+    }
+
+    static int accumulatedDataSize(List<ByteBuffer> bufs) {
+        return bufs.stream().mapToInt(ByteBuffer::remaining).sum();
+    }
+
+    /** Returns a new BB with its contents set to monotonically increasing
+     * values, staring at the given start index and wrapping every 100. */
+    static ByteBuffer allocateBuffer(int size, int startIdx) {
+        ByteBuffer b = ByteBuffer.allocate(size);
+        for (int i=0; i<size; i++)
+            b.put((byte)((startIdx + i) % 100));
+        b.position(0);
+        return b;
+    }
+
+    static class TestSubscriber implements BodySubscriber<Integer> {
+        final int delayMillis;
+        final int bufferSize;
+        final int expectedTotalSize;
+        final long requestAmount;
+        final CompletableFuture<Integer> completion;
+        final Executor delayedExecutor;
+        volatile Flow.Subscription subscription;
+
+        TestSubscriber(int bufferSize,
+                       int delayMillis,
+                       int expectedTotalSize,
+                       long requestAmount) {
+            this.bufferSize = bufferSize;
+            this.completion = new CompletableFuture<>();
+            this.delayMillis = delayMillis;
+            this.delayedExecutor = delayedExecutor(delayMillis, MILLISECONDS);
+            this.expectedTotalSize = expectedTotalSize;
+            this.requestAmount = requestAmount;
+        }
+
+        /**
+         * Example of a factory method which would decorate a buffering
+         * subscriber to create a new subscriber dependent on buffering capability.
+         *
+         * The integer type parameter simulates the body just by counting the
+         * number of bytes in the body.
+         */
+        static BodySubscriber<Integer> createSubscriber(int bufferSize,
+                                                        int delay,
+                                                        int expectedTotalSize,
+                                                        long requestAmount) {
+            TestSubscriber s = new TestSubscriber(bufferSize,
+                                                delay,
+                                                expectedTotalSize,
+                                                requestAmount);
+            return BodySubscriber.buffering(s, bufferSize);
+        }
+
+        private void requestMore() { subscription.request(requestAmount); }
+
+        @Override
+        public void onSubscribe(Subscription subscription) {
+            assertNull(this.subscription);
+            this.subscription = subscription;
+            if (delayMillis > 0)
+                delayedExecutor.execute(this::requestMore);
+            else
+                requestMore();
+        }
+
+        volatile int wrongSizes;
+        volatile int totalBytesReceived;
+        volatile int onNextInvocations;
+        volatile int lastSeenSize = -1;
+        volatile boolean noMoreOnNext; // false
+        volatile int index; // 0
+
+        @Override
+        public void onNext(List<ByteBuffer> items) {
+            long sz = accumulatedDataSize(items);
+            onNextInvocations++;
+            assertNotEquals(sz, 0L, "Unexpected empty buffers");
+            items.stream().forEach(b -> assertEquals(b.position(), 0));
+            assertFalse(noMoreOnNext);
+
+            if (sz != bufferSize) {
+                String msg = sz + ", should be less than bufferSize, " + bufferSize;
+                assertTrue(sz < bufferSize, msg);
+                assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize);
+                noMoreOnNext = true;
+                wrongSizes++;
+            } else {
+                assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize");
+            }
+
+            // Ensure expected contents
+            for (ByteBuffer b : items) {
+                while (b.hasRemaining()) {
+                    assertEquals(b.get(), (byte) (index % 100));
+                    index++;
+                }
+            }
+
+            totalBytesReceived += sz;
+            assertEquals(totalBytesReceived, index );
+            if (delayMillis > 0)
+                delayedExecutor.execute(this::requestMore);
+            else
+                requestMore();
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            completion.completeExceptionally(throwable);
+        }
+
+        @Override
+        public void onComplete() {
+            if (wrongSizes > 1) { // allow just the final item to be smaller
+                String msg = "Wrong sizes. Expected no more than 1. [" + this + "]";
+                completion.completeExceptionally(new Throwable(msg));
+            }
+            if (totalBytesReceived != expectedTotalSize) {
+                String msg = "Wrong number of bytes. [" + this + "]";
+                completion.completeExceptionally(new Throwable(msg));
+            } else {
+                completion.complete(totalBytesReceived);
+            }
+        }
+
+        @Override
+        public CompletionStage<Integer> getBody() { return completion; }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append(super.toString());
+            sb.append(", bufferSize=").append(bufferSize);
+            sb.append(", onNextInvocations=").append(onNextInvocations);
+            sb.append(", totalBytesReceived=").append(totalBytesReceived);
+            sb.append(", expectedTotalSize=").append(expectedTotalSize);
+            sb.append(", requestAmount=").append(requestAmount);
+            sb.append(", lastSeenSize=").append(lastSeenSize);
+            sb.append(", wrongSizes=").append(wrongSizes);
+            sb.append(", index=").append(index);
+            return sb.toString();
+        }
+    }
+
+    /**
+     * Publishes data, through the given publisher, using the main thread.
+     *
+     * Note: The executor supplied when creating the SubmissionPublisher provides
+     * the threads for executing the Subscribers.
+     *
+     * @param publisher the publisher
+     * @param numBuffers the number of buffers to send ( before splitting in two )
+     * @param bufferSize the total size of the data to send ( before splitting in two )
+     */
+    static void source(SubmissionPublisher<List<ByteBuffer>> publisher,
+                       int numBuffers,
+                       int bufferSize) {
+        out.printf("Publishing %d buffers of size %d each\n", numBuffers, bufferSize);
+        int index = 0;
+        for (int i=0; i<numBuffers; i++) {
+            int chunkSize = random.nextInt(bufferSize);
+            ByteBuffer buf1 = allocateBuffer(chunkSize, index);
+            index += chunkSize;
+            ByteBuffer buf2 = allocateBuffer(bufferSize - chunkSize, index);
+            index += bufferSize - chunkSize;
+            publisher.submit(List.of(buf1, buf2));
+        }
+        out.println("source complete");
+    }
+
+    /**
+     * Creates and subscribes Subscribers that receive data from the given
+     * publisher.
+     *
+     * @param publisher the publisher
+     * @param delayMillis time, in milliseconds, to delay the Subscription
+     *                    requesting more bytes ( for simulating slow consumption )
+     * @param expectedTotalSize the total number of bytes expected to be received
+     *                          by the subscribers
+     * @return a CompletableFuture which completes when the subscription is complete
+     */
+    static CompletableFuture<?> sink(SubmissionPublisher<List<ByteBuffer>> publisher,
+                                     int delayMillis,
+                                     int expectedTotalSize,
+                                     long requestAmount,
+                                     int maxBufferSize,
+                                     int minBufferSize) {
+        int bufferSize = random.nextInt(maxBufferSize - minBufferSize) + minBufferSize;
+        BodySubscriber<Integer> sub = TestSubscriber.createSubscriber(bufferSize,
+                                                                    delayMillis,
+                                                                    expectedTotalSize,
+                                                                    requestAmount);
+        publisher.subscribe(sub);
+        out.printf("Subscriber reads data with buffer size: %d\n", bufferSize);
+        out.printf("Subscription delay is %d msec\n", delayMillis);
+        out.printf("Request amount is %d items\n", requestAmount);
+        return sub.getBody().toCompletableFuture();
+    }
+
+    // ---
+
+    // TODO: Add a test for cancel
+
+    // ---
+
+    /* Main entry point for standalone testing of the main functional test. */
+    public static void main(String... args) {
+        BufferingSubscriberTest t = new BufferingSubscriberTest();
+        for (Object[] objs : t.config())
+            t.test((int)objs[0], (int)objs[1], (int)objs[2], (int)objs[3], (int)objs[4], (int)objs[5]);
+    }
+}