--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/BufferingSubscriberTest.java Sun Nov 05 17:32:13 2017 +0000
@@ -0,0 +1,356 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.SubmissionPublisher;
+import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+import jdk.test.lib.RandomFactory;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import static java.lang.Long.MAX_VALUE;
+import static java.lang.System.out;
+import static java.util.concurrent.CompletableFuture.delayedExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.testng.Assert.*;
+
+/*
+ * @test
+ * @bug 8184285
+ * @summary Direct test for HttpResponse.BodySubscriber.buffering() API
+ * @key randomness
+ * @library /test/lib
+ * @build jdk.test.lib.RandomFactory
+ * @run testng/othervm -Djdk.internal.httpclient.debug=true BufferingSubscriberTest
+ */
+
+public class BufferingSubscriberTest {
+
+ static final Random random = RandomFactory.getRandom();
+
+ @DataProvider(name = "negatives")
+ public Object[][] negatives() {
+ return new Object[][] { { 0 }, { -1 }, { -1000 } };
+ }
+
+ @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)
+ public void subscriberThrowsIAE(int bufferSize) {
+ BodySubscriber<?> bp = BodySubscriber.asByteArray();
+ BodySubscriber.buffering(bp, bufferSize);
+ }
+
+ @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)
+ public void handlerThrowsIAE(int bufferSize) {
+ BodyHandler<?> bp = BodyHandler.asByteArray();
+ BodyHandler.buffering(bp, bufferSize);
+ }
+
+ // ---
+
+ @DataProvider(name = "config")
+ public Object[][] config() {
+ return new Object[][] {
+ // iterations delayMillis numBuffers bufferSize maxBufferSize minBufferSize
+ { 1, 0, 1, 1, 2, 1 },
+ { 1, 0, 1, 10, 1000, 1 },
+ { 1, 10, 1, 10, 1000, 1 },
+ { 1, 0, 1, 1000, 1000, 1 },
+ { 1, 0, 10, 1000, 1000, 1 },
+ { 1, 0, 1000, 10 , 1000, 50 },
+ { 1, 100, 1, 1000 * 4, 1000, 1 },
+ { 100, 0, 1000, 1, 2, 1 },
+ { 3, 0, 4, 5006, 1000, 1 },
+ { 20, 0, 100, 4888, 1000, 100 },
+ { 16, 10, 1000, 50 , 1000, 100 },
+ };
+ }
+
+ @Test(dataProvider = "config")
+ public void test(int iterations,
+ int delayMillis,
+ int numBuffers,
+ int bufferSize,
+ int maxBufferSize,
+ int minbufferSize) {
+ for (long perRequestAmount : new long[] { 1L, MAX_VALUE })
+ test(iterations,
+ delayMillis,
+ numBuffers,
+ bufferSize,
+ maxBufferSize,
+ minbufferSize,
+ perRequestAmount);
+ }
+
+ public void test(int iterations,
+ int delayMillis,
+ int numBuffers,
+ int bufferSize,
+ int maxBufferSize,
+ int minBufferSize,
+ long requestAmount) {
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ try {
+
+ out.printf("Iterations %d\n", iterations);
+ for (int i=0; i<iterations; i++ ) {
+ out.printf("Iteration: %d\n", i);
+ SubmissionPublisher<List<ByteBuffer>> publisher =
+ new SubmissionPublisher<>(executor, 1);
+ CompletableFuture<?> cf = sink(publisher,
+ delayMillis,
+ numBuffers * bufferSize,
+ requestAmount,
+ maxBufferSize,
+ minBufferSize);
+ source(publisher, numBuffers, bufferSize);
+ publisher.close();
+ cf.join();
+ }
+ out.println("OK");
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+ static int accumulatedDataSize(List<ByteBuffer> bufs) {
+ return bufs.stream().mapToInt(ByteBuffer::remaining).sum();
+ }
+
+ /** Returns a new BB with its contents set to monotonically increasing
+ * values, staring at the given start index and wrapping every 100. */
+ static ByteBuffer allocateBuffer(int size, int startIdx) {
+ ByteBuffer b = ByteBuffer.allocate(size);
+ for (int i=0; i<size; i++)
+ b.put((byte)((startIdx + i) % 100));
+ b.position(0);
+ return b;
+ }
+
+ static class TestSubscriber implements BodySubscriber<Integer> {
+ final int delayMillis;
+ final int bufferSize;
+ final int expectedTotalSize;
+ final long requestAmount;
+ final CompletableFuture<Integer> completion;
+ final Executor delayedExecutor;
+ volatile Flow.Subscription subscription;
+
+ TestSubscriber(int bufferSize,
+ int delayMillis,
+ int expectedTotalSize,
+ long requestAmount) {
+ this.bufferSize = bufferSize;
+ this.completion = new CompletableFuture<>();
+ this.delayMillis = delayMillis;
+ this.delayedExecutor = delayedExecutor(delayMillis, MILLISECONDS);
+ this.expectedTotalSize = expectedTotalSize;
+ this.requestAmount = requestAmount;
+ }
+
+ /**
+ * Example of a factory method which would decorate a buffering
+ * subscriber to create a new subscriber dependent on buffering capability.
+ *
+ * The integer type parameter simulates the body just by counting the
+ * number of bytes in the body.
+ */
+ static BodySubscriber<Integer> createSubscriber(int bufferSize,
+ int delay,
+ int expectedTotalSize,
+ long requestAmount) {
+ TestSubscriber s = new TestSubscriber(bufferSize,
+ delay,
+ expectedTotalSize,
+ requestAmount);
+ return BodySubscriber.buffering(s, bufferSize);
+ }
+
+ private void requestMore() { subscription.request(requestAmount); }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ assertNull(this.subscription);
+ this.subscription = subscription;
+ if (delayMillis > 0)
+ delayedExecutor.execute(this::requestMore);
+ else
+ requestMore();
+ }
+
+ volatile int wrongSizes;
+ volatile int totalBytesReceived;
+ volatile int onNextInvocations;
+ volatile int lastSeenSize = -1;
+ volatile boolean noMoreOnNext; // false
+ volatile int index; // 0
+
+ @Override
+ public void onNext(List<ByteBuffer> items) {
+ long sz = accumulatedDataSize(items);
+ onNextInvocations++;
+ assertNotEquals(sz, 0L, "Unexpected empty buffers");
+ items.stream().forEach(b -> assertEquals(b.position(), 0));
+ assertFalse(noMoreOnNext);
+
+ if (sz != bufferSize) {
+ String msg = sz + ", should be less than bufferSize, " + bufferSize;
+ assertTrue(sz < bufferSize, msg);
+ assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize);
+ noMoreOnNext = true;
+ wrongSizes++;
+ } else {
+ assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize");
+ }
+
+ // Ensure expected contents
+ for (ByteBuffer b : items) {
+ while (b.hasRemaining()) {
+ assertEquals(b.get(), (byte) (index % 100));
+ index++;
+ }
+ }
+
+ totalBytesReceived += sz;
+ assertEquals(totalBytesReceived, index );
+ if (delayMillis > 0)
+ delayedExecutor.execute(this::requestMore);
+ else
+ requestMore();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ completion.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ if (wrongSizes > 1) { // allow just the final item to be smaller
+ String msg = "Wrong sizes. Expected no more than 1. [" + this + "]";
+ completion.completeExceptionally(new Throwable(msg));
+ }
+ if (totalBytesReceived != expectedTotalSize) {
+ String msg = "Wrong number of bytes. [" + this + "]";
+ completion.completeExceptionally(new Throwable(msg));
+ } else {
+ completion.complete(totalBytesReceived);
+ }
+ }
+
+ @Override
+ public CompletionStage<Integer> getBody() { return completion; }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(super.toString());
+ sb.append(", bufferSize=").append(bufferSize);
+ sb.append(", onNextInvocations=").append(onNextInvocations);
+ sb.append(", totalBytesReceived=").append(totalBytesReceived);
+ sb.append(", expectedTotalSize=").append(expectedTotalSize);
+ sb.append(", requestAmount=").append(requestAmount);
+ sb.append(", lastSeenSize=").append(lastSeenSize);
+ sb.append(", wrongSizes=").append(wrongSizes);
+ sb.append(", index=").append(index);
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Publishes data, through the given publisher, using the main thread.
+ *
+ * Note: The executor supplied when creating the SubmissionPublisher provides
+ * the threads for executing the Subscribers.
+ *
+ * @param publisher the publisher
+ * @param numBuffers the number of buffers to send ( before splitting in two )
+ * @param bufferSize the total size of the data to send ( before splitting in two )
+ */
+ static void source(SubmissionPublisher<List<ByteBuffer>> publisher,
+ int numBuffers,
+ int bufferSize) {
+ out.printf("Publishing %d buffers of size %d each\n", numBuffers, bufferSize);
+ int index = 0;
+ for (int i=0; i<numBuffers; i++) {
+ int chunkSize = random.nextInt(bufferSize);
+ ByteBuffer buf1 = allocateBuffer(chunkSize, index);
+ index += chunkSize;
+ ByteBuffer buf2 = allocateBuffer(bufferSize - chunkSize, index);
+ index += bufferSize - chunkSize;
+ publisher.submit(List.of(buf1, buf2));
+ }
+ out.println("source complete");
+ }
+
+ /**
+ * Creates and subscribes Subscribers that receive data from the given
+ * publisher.
+ *
+ * @param publisher the publisher
+ * @param delayMillis time, in milliseconds, to delay the Subscription
+ * requesting more bytes ( for simulating slow consumption )
+ * @param expectedTotalSize the total number of bytes expected to be received
+ * by the subscribers
+ * @return a CompletableFuture which completes when the subscription is complete
+ */
+ static CompletableFuture<?> sink(SubmissionPublisher<List<ByteBuffer>> publisher,
+ int delayMillis,
+ int expectedTotalSize,
+ long requestAmount,
+ int maxBufferSize,
+ int minBufferSize) {
+ int bufferSize = random.nextInt(maxBufferSize - minBufferSize) + minBufferSize;
+ BodySubscriber<Integer> sub = TestSubscriber.createSubscriber(bufferSize,
+ delayMillis,
+ expectedTotalSize,
+ requestAmount);
+ publisher.subscribe(sub);
+ out.printf("Subscriber reads data with buffer size: %d\n", bufferSize);
+ out.printf("Subscription delay is %d msec\n", delayMillis);
+ out.printf("Request amount is %d items\n", requestAmount);
+ return sub.getBody().toCompletableFuture();
+ }
+
+ // ---
+
+ // TODO: Add a test for cancel
+
+ // ---
+
+ /* Main entry point for standalone testing of the main functional test. */
+ public static void main(String... args) {
+ BufferingSubscriberTest t = new BufferingSubscriberTest();
+ for (Object[] objs : t.config())
+ t.test((int)objs[0], (int)objs[1], (int)objs[2], (int)objs[3], (int)objs[4], (int)objs[5]);
+ }
+}