test/jdk/java/net/httpclient/BufferingSubscriberTest.java
author chegar
Wed, 07 Feb 2018 14:17:24 +0000
branchhttp-client-branch
changeset 56089 42208b2f224e
parent 55973 4d9b002587db
child 56167 96fa4f49a9ff
permissions -rw-r--r--
http-client-branch: move to standard package and module name
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     1
/*
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     2
 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     4
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     7
 * published by the Free Software Foundation.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     8
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     9
 * This code is distributed in the hope that it will be useful, but WITHOUT
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    10
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    11
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    12
 * version 2 for more details (a copy is included in the LICENSE file that
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    13
 * accompanied this code).
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    14
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    15
 * You should have received a copy of the GNU General Public License version
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    16
 * 2 along with this work; if not, write to the Free Software Foundation,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    17
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    18
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    19
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    20
 * or visit www.oracle.com if you need additional information or have any
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    21
 * questions.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    22
 */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    23
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    24
import java.nio.ByteBuffer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    25
import java.util.List;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    26
import java.util.Random;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    27
import java.util.concurrent.CompletableFuture;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    28
import java.util.concurrent.CompletionStage;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    29
import java.util.concurrent.Executor;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    30
import java.util.concurrent.ExecutorService;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    31
import java.util.concurrent.Executors;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    32
import java.util.concurrent.Flow;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    33
import java.util.concurrent.Flow.Subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    34
import java.util.concurrent.SubmissionPublisher;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    35
import java.util.function.BiConsumer;
56089
42208b2f224e http-client-branch: move to standard package and module name
chegar
parents: 55973
diff changeset
    36
import java.net.http.HttpResponse.BodyHandler;
42208b2f224e http-client-branch: move to standard package and module name
chegar
parents: 55973
diff changeset
    37
import java.net.http.HttpResponse.BodySubscriber;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    38
import jdk.test.lib.RandomFactory;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    39
import org.testng.annotations.DataProvider;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    40
import org.testng.annotations.Test;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    41
import static java.lang.Long.MAX_VALUE;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    42
import static java.lang.Long.min;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    43
import static java.lang.System.out;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    44
import static java.util.concurrent.CompletableFuture.delayedExecutor;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    45
import static java.util.concurrent.TimeUnit.MILLISECONDS;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    46
import static org.testng.Assert.*;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    47
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    48
/*
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    49
 * @test
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    50
 * @bug 8184285
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    51
 * @summary Direct test for HttpResponse.BodySubscriber.buffering() API
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    52
 * @key randomness
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    53
 * @library /test/lib
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    54
 * @build jdk.test.lib.RandomFactory
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    55
 * @run testng/othervm -Djdk.internal.httpclient.debug=true BufferingSubscriberTest
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    56
 */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    57
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    58
public class BufferingSubscriberTest {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    59
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    60
    // If we compute that a test will take less that 10s
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    61
    // we judge it acceptable
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    62
    static final long LOWER_THRESHOLD = 10_000; // 10 sec.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    63
    // If we compute that a test will take more than 20 sec
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    64
    // we judge it problematic: we will try to adjust the
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    65
    // buffer sizes, and if we can't we will print a warning
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    66
    static final long UPPER_THRESHOLD = 20_000; // 20 sec.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    67
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    68
    static final Random random = RandomFactory.getRandom();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    69
    static final long start = System.nanoTime();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    70
    static final String START = "start";
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    71
    static final String END   = "end  ";
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    72
    static long elapsed() { return (System.nanoTime() - start)/1000_000;}
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    73
    static void printStamp(String what, String fmt, Object... args) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    74
        long elapsed = elapsed();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    75
        long sec = elapsed/1000;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    76
        long ms  = elapsed % 1000;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    77
        String time = sec > 0 ? sec + "sec " : "";
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    78
        time = time + ms + "ms";
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    79
        out.println(what + "\t ["+time+"]\t "+ String.format(fmt,args));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    80
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    81
    @DataProvider(name = "negatives")
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    82
    public Object[][] negatives() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    83
        return new Object[][] {  { 0 }, { -1 }, { -1000 } };
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    84
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    85
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    86
    @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    87
    public void subscriberThrowsIAE(int bufferSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    88
        printStamp(START, "subscriberThrowsIAE(%d)", bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    89
        try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    90
            BodySubscriber<?> bp = BodySubscriber.asByteArray();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    91
            BodySubscriber.buffering(bp, bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    92
        } finally {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    93
            printStamp(END, "subscriberThrowsIAE(%d)", bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    94
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    95
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    96
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    97
    @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    98
    public void handlerThrowsIAE(int bufferSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    99
        printStamp(START, "handlerThrowsIAE(%d)", bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   100
        try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   101
            BodyHandler<?> bp = BodyHandler.asByteArray();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   102
            BodyHandler.buffering(bp, bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   103
        } finally {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   104
            printStamp(END, "handlerThrowsIAE(%d)", bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   105
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   106
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   107
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   108
    // ---
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   109
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   110
    @DataProvider(name = "config")
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   111
    public Object[][] config() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   112
        return new Object[][] {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   113
            // iterations delayMillis numBuffers bufferSize maxBufferSize minBufferSize
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   114
            { 1,              0,          1,         1,         2,            1   },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   115
            { 1,              5,          1,         100,       2,            1   },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   116
            { 1,              0,          1,         10,        1000,         1   },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   117
            { 1,              10,         1,         10,        1000,         1   },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   118
            { 1,              0,          1,         1000,      1000,         10  },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   119
            { 1,              0,          10,        1000,      1000,         50  },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   120
            { 1,              0,          1000,      10 ,       1000,         50  },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   121
            { 1,              100,        1,         1000 * 4,  1000,         50  },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   122
            { 100,            0,          1000,      1,         2,            1   },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   123
            { 3,              0,          4,         5006,      1000,         50  },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   124
            { 20,             0,          100,       4888,      1000,         100 },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   125
            { 16,             10,         1000,      50 ,       1000,         100 },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   126
            { 16,             10,         1000,      50 ,       657,          657 },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   127
        };
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   128
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   129
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   130
    @Test(dataProvider = "config")
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   131
    public void test(int iterations,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   132
                     int delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   133
                     int numBuffers,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   134
                     int bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   135
                     int maxBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   136
                     int minbufferSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   137
        for (long perRequestAmount : new long[] { 1L, MAX_VALUE })
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   138
            test(iterations,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   139
                 delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   140
                 numBuffers,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   141
                 bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   142
                 maxBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   143
                 minbufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   144
                 perRequestAmount);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   145
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   146
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   147
    volatile boolean onNextThrew;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   148
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   149
    BiConsumer<Flow.Subscriber<?>, ? super Throwable> onNextThrowHandler =
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   150
            (sub, ex) -> {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   151
                onNextThrew = true;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   152
                System.out.println("onNext threw " + ex);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   153
                ex.printStackTrace();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   154
    };
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   155
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   156
    public void test(int iterations,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   157
                     int delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   158
                     int numBuffers,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   159
                     int bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   160
                     int maxBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   161
                     int minBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   162
                     long requestAmount) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   163
        ExecutorService executor = Executors.newFixedThreadPool(1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   164
        try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   165
            out.printf("Iterations %d\n", iterations);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   166
            for (int i=0; i<iterations; i++ ) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   167
                printStamp(START, "Iteration %d", i);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   168
                try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   169
                    SubmissionPublisher<List<ByteBuffer>> publisher =
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   170
                            new SubmissionPublisher<>(executor,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   171
                                                      1, // lock-step with the publisher, for now
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   172
                                                      onNextThrowHandler);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   173
                    CompletableFuture<?> cf = sink(publisher,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   174
                            delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   175
                            numBuffers * bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   176
                            requestAmount,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   177
                            maxBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   178
                            minBufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   179
                    source(publisher, numBuffers, bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   180
                    publisher.close();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   181
                    cf.join();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   182
                } finally {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   183
                    printStamp(END, "Iteration %d\n", i);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   184
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   185
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   186
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   187
            assertFalse(onNextThrew, "Unexpected onNextThrew, check output");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   188
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   189
            out.println("OK");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   190
        } finally {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   191
            executor.shutdown();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   192
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   193
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   194
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   195
    static long accumulatedDataSize(List<ByteBuffer> bufs) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   196
        return bufs.stream().mapToLong(ByteBuffer::remaining).sum();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   197
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   198
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   199
    /** Returns a new BB with its contents set to monotonically increasing
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   200
     * values, staring at the given start index and wrapping every 100. */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   201
    static ByteBuffer allocateBuffer(int size, int startIdx) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   202
        ByteBuffer b = ByteBuffer.allocate(size);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   203
        for (int i=0; i<size; i++)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   204
            b.put((byte)((startIdx + i) % 100));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   205
        b.position(0);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   206
        return b;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   207
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   208
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   209
    static class TestSubscriber implements BodySubscriber<Integer> {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   210
        final int delayMillis;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   211
        final int bufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   212
        final int expectedTotalSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   213
        final long requestAmount;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   214
        final CompletableFuture<Integer> completion;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   215
        final Executor delayedExecutor;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   216
        volatile Flow.Subscription subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   217
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   218
        TestSubscriber(int bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   219
                       int delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   220
                       int expectedTotalSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   221
                       long requestAmount) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   222
            this.bufferSize = bufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   223
            this.completion = new CompletableFuture<>();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   224
            this.delayMillis = delayMillis;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   225
            this.delayedExecutor = delayedExecutor(delayMillis, MILLISECONDS);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   226
            this.expectedTotalSize = expectedTotalSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   227
            this.requestAmount = requestAmount;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   228
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   229
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   230
        /**
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   231
         * Example of a factory method which would decorate a buffering
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   232
         * subscriber to create a new subscriber dependent on buffering capability.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   233
         * <p>
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   234
         * The integer type parameter simulates the body just by counting the
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   235
         * number of bytes in the body.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   236
         */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   237
        static BodySubscriber<Integer> createSubscriber(int bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   238
                                                        int delay,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   239
                                                        int expectedTotalSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   240
                                                        long requestAmount) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   241
            TestSubscriber s = new TestSubscriber(bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   242
                                                delay,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   243
                                                expectedTotalSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   244
                                                requestAmount);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   245
            return BodySubscriber.buffering(s, bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   246
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   247
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   248
        private void requestMore() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   249
            subscription.request(requestAmount);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   250
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   251
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   252
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   253
        public void onSubscribe(Subscription subscription) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   254
            assertNull(this.subscription);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   255
            this.subscription = subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   256
            if (delayMillis > 0)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   257
                delayedExecutor.execute(this::requestMore);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   258
            else
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   259
                requestMore();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   260
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   261
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   262
        volatile int wrongSizes;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   263
        volatile int totalBytesReceived;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   264
        volatile int onNextInvocations;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   265
        volatile long lastSeenSize = -1;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   266
        volatile boolean noMoreOnNext; // false
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   267
        volatile int index; // 0
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   268
        volatile long count;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   269
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   270
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   271
        public void onNext(List<ByteBuffer> items) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   272
            try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   273
                long sz = accumulatedDataSize(items);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   274
                boolean printStamp = delayMillis > 0
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   275
                        && requestAmount < Long.MAX_VALUE
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   276
                        && count % 20 == 0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   277
                if (printStamp) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   278
                    printStamp("stamp", "count=%d sz=%d accumulated=%d",
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   279
                            count, sz, (totalBytesReceived + sz));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   280
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   281
                count++;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   282
                onNextInvocations++;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   283
                assertNotEquals(sz, 0L, "Unexpected empty buffers");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   284
                items.stream().forEach(b -> assertEquals(b.position(), 0));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   285
                assertFalse(noMoreOnNext);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   286
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   287
                if (sz != bufferSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   288
                    String msg = sz + ", should be less than bufferSize, " + bufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   289
                    assertTrue(sz < bufferSize, msg);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   290
                    assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   291
                    noMoreOnNext = true;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   292
                    wrongSizes++;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   293
                    printStamp("onNext",
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   294
                            "Possibly received last buffer: sz=%d, accumulated=%d, total=%d",
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   295
                            sz, totalBytesReceived, totalBytesReceived + sz);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   296
                } else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   297
                    assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   298
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   299
                lastSeenSize = sz;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   300
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   301
                // Ensure expected contents
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   302
                for (ByteBuffer b : items) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   303
                    while (b.hasRemaining()) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   304
                        assertEquals(b.get(), (byte) (index % 100));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   305
                        index++;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   306
                    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   307
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   308
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   309
                totalBytesReceived += sz;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   310
                assertEquals(totalBytesReceived, index);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   311
                if (delayMillis > 0 && ((expectedTotalSize - totalBytesReceived) > bufferSize))
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   312
                    delayedExecutor.execute(this::requestMore);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   313
                else
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   314
                    requestMore();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   315
            } catch (Throwable t) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   316
                completion.completeExceptionally(t);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   317
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   318
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   319
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   320
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   321
        public void onError(Throwable throwable) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   322
            completion.completeExceptionally(throwable);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   323
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   324
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   325
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   326
        public void onComplete() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   327
            if (wrongSizes > 1) { // allow just the final item to be smaller
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   328
                String msg = "Wrong sizes. Expected no more than 1. [" + this + "]";
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   329
                completion.completeExceptionally(new Throwable(msg));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   330
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   331
            if (totalBytesReceived != expectedTotalSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   332
                String msg = "Wrong number of bytes. [" + this + "]";
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   333
                completion.completeExceptionally(new Throwable(msg));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   334
            } else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   335
                completion.complete(totalBytesReceived);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   336
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   337
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   338
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   339
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   340
        public CompletionStage<Integer> getBody() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   341
            return completion;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   342
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   343
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   344
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   345
        public String toString() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   346
            StringBuilder sb = new StringBuilder();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   347
            sb.append(super.toString());
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   348
            sb.append(", bufferSize=").append(bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   349
            sb.append(", onNextInvocations=").append(onNextInvocations);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   350
            sb.append(", totalBytesReceived=").append(totalBytesReceived);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   351
            sb.append(", expectedTotalSize=").append(expectedTotalSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   352
            sb.append(", requestAmount=").append(requestAmount);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   353
            sb.append(", lastSeenSize=").append(lastSeenSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   354
            sb.append(", wrongSizes=").append(wrongSizes);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   355
            sb.append(", index=").append(index);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   356
            return sb.toString();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   357
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   358
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   359
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   360
    /**
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   361
     * Publishes data, through the given publisher, using the main thread.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   362
     *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   363
     * Note: The executor supplied when creating the SubmissionPublisher provides
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   364
     * the threads for executing the Subscribers.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   365
     *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   366
     * @param publisher the publisher
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   367
     * @param numBuffers the number of buffers to send ( before splitting in two )
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   368
     * @param bufferSize the total size of the data to send ( before splitting in two )
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   369
     */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   370
    static void source(SubmissionPublisher<List<ByteBuffer>> publisher,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   371
                       int numBuffers,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   372
                       int bufferSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   373
        printStamp("source","Publishing %d buffers of size %d each", numBuffers, bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   374
        int index = 0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   375
        for (int i=0; i<numBuffers; i++) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   376
            int chunkSize = random.nextInt(bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   377
            ByteBuffer buf1 = allocateBuffer(chunkSize, index);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   378
            index += chunkSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   379
            ByteBuffer buf2 = allocateBuffer(bufferSize - chunkSize, index);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   380
            index += bufferSize - chunkSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   381
            publisher.submit(List.of(buf1, buf2));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   382
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   383
        printStamp("source", "complete");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   384
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   385
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   386
    /**
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   387
     * Creates and subscribes Subscribers that receive data from the given
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   388
     * publisher.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   389
     *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   390
     * @param publisher the publisher
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   391
     * @param delayMillis time, in milliseconds, to delay the Subscription
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   392
     *                    requesting more bytes ( for simulating slow consumption )
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   393
     * @param expectedTotalSize the total number of bytes expected to be received
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   394
     *                          by the subscribers
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   395
     * @return a CompletableFuture which completes when the subscription is complete
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   396
     */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   397
    static CompletableFuture<?> sink(SubmissionPublisher<List<ByteBuffer>> publisher,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   398
                                     int delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   399
                                     int expectedTotalSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   400
                                     long requestAmount,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   401
                                     int maxBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   402
                                     int minBufferSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   403
        int bufferSize = chooseBufferSize(maxBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   404
                                          minBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   405
                                          delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   406
                                          expectedTotalSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   407
                                          requestAmount);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   408
        assert bufferSize > 0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   409
        assert bufferSize >= minBufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   410
        assert bufferSize <= maxBufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   411
        BodySubscriber<Integer> sub = TestSubscriber.createSubscriber(bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   412
                                                                    delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   413
                                                                    expectedTotalSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   414
                                                                    requestAmount);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   415
        publisher.subscribe(sub);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   416
        printStamp("sink","Subscriber reads data with buffer size: %d", bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   417
        out.printf("Subscription delay is %d msec\n", delayMillis);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   418
        long delay = (((long)delayMillis * expectedTotalSize) / bufferSize) / requestAmount;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   419
        out.printf("Minimum total delay is %d sec %d ms\n", delay / 1000, delay % 1000);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   420
        out.printf("Request amount is %d items\n", requestAmount);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   421
        return sub.getBody().toCompletableFuture();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   422
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   423
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   424
    static int chooseBufferSize(int maxBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   425
                                int minBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   426
                                int delaysMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   427
                                int expectedTotalSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   428
                                long requestAmount) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   429
        assert minBufferSize > 0 && maxBufferSize > 0 && requestAmount > 0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   430
        int bufferSize = maxBufferSize == minBufferSize ? maxBufferSize :
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   431
                (random.nextInt(maxBufferSize - minBufferSize)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   432
                    + minBufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   433
        if (requestAmount == Long.MAX_VALUE) return bufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   434
        long minDelay = (((long)delaysMillis * expectedTotalSize) / maxBufferSize)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   435
                / requestAmount;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   436
        long maxDelay = (((long)delaysMillis * expectedTotalSize) / minBufferSize)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   437
                / requestAmount;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   438
        // if the maximum delay is < 10s just take a random number between min and max.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   439
        if (maxDelay <= LOWER_THRESHOLD) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   440
            return bufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   441
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   442
        // if minimum delay is greater than 20s then print a warning and use max buffer.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   443
        if (minDelay >= UPPER_THRESHOLD) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   444
            System.out.println("Warning: minimum delay is "
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   445
                    + minDelay/1000 + "sec " + minDelay%1000 + "ms");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   446
            System.err.println("Warning: minimum delay is "
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   447
                    + minDelay/1000 + "sec " + minDelay%1000 + "ms");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   448
            return maxBufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   449
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   450
        // maxDelay could be anything, but minDelay is below the UPPER_THRESHOLD
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   451
        // try to pick up a buffer size that keeps the delay below the
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   452
        // UPPER_THRESHOLD
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   453
        while (minBufferSize < maxBufferSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   454
            bufferSize = random.nextInt(maxBufferSize - minBufferSize)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   455
                    + minBufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   456
            long delay = (((long)delaysMillis * expectedTotalSize) / bufferSize)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   457
                    / requestAmount;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   458
            if (delay < UPPER_THRESHOLD) return bufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   459
            minBufferSize++;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   460
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   461
        return minBufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   462
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   463
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   464
    // ---
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   465
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   466
    /* Main entry point for standalone testing of the main functional test. */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   467
    public static void main(String... args) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   468
        BufferingSubscriberTest t = new BufferingSubscriberTest();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   469
        for (Object[] objs : t.config())
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   470
            t.test((int)objs[0], (int)objs[1], (int)objs[2], (int)objs[3], (int)objs[4], (int)objs[5]);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   471
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   472
}