test/jdk/java/net/httpclient/BufferingSubscriberTest.java
author dfuchs
Tue, 01 Oct 2019 12:10:33 +0100
changeset 58423 54de0c861d32
parent 49765 ee6f7a61f3a5
child 56451 9585061fdb04
permissions -rw-r--r--
8231506: Fix some instabilities in a few networking tests Reviewed-by: alanb, chegar, msheppar
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     1
/*
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48083
diff changeset
     2
 * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     4
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     7
 * published by the Free Software Foundation.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     8
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     9
 * This code is distributed in the hope that it will be useful, but WITHOUT
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    10
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    11
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    12
 * version 2 for more details (a copy is included in the LICENSE file that
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    13
 * accompanied this code).
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    14
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    15
 * You should have received a copy of the GNU General Public License version
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    16
 * 2 along with this work; if not, write to the Free Software Foundation,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    17
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    18
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    19
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    20
 * or visit www.oracle.com if you need additional information or have any
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    21
 * questions.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    22
 */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    23
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    24
import java.nio.ByteBuffer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    25
import java.util.List;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    26
import java.util.Random;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    27
import java.util.concurrent.CompletableFuture;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    28
import java.util.concurrent.CompletionStage;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    29
import java.util.concurrent.Executor;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    30
import java.util.concurrent.ExecutorService;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    31
import java.util.concurrent.Executors;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    32
import java.util.concurrent.Flow;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    33
import java.util.concurrent.Flow.Subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    34
import java.util.concurrent.SubmissionPublisher;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    35
import java.util.function.BiConsumer;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48083
diff changeset
    36
import java.net.http.HttpResponse.BodyHandler;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48083
diff changeset
    37
import java.net.http.HttpResponse.BodyHandlers;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48083
diff changeset
    38
import java.net.http.HttpResponse.BodySubscriber;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48083
diff changeset
    39
import java.net.http.HttpResponse.BodySubscribers;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    40
import jdk.test.lib.RandomFactory;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    41
import org.testng.annotations.DataProvider;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    42
import org.testng.annotations.Test;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    43
import static java.lang.Long.MAX_VALUE;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    44
import static java.lang.Long.min;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    45
import static java.lang.System.out;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    46
import static java.util.concurrent.CompletableFuture.delayedExecutor;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    47
import static java.util.concurrent.TimeUnit.MILLISECONDS;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    48
import static org.testng.Assert.*;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    49
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    50
/*
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    51
 * @test
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    52
 * @bug 8184285
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    53
 * @summary Direct test for HttpResponse.BodySubscriber.buffering() API
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    54
 * @key randomness
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    55
 * @library /test/lib
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    56
 * @build jdk.test.lib.RandomFactory
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    57
 * @run testng/othervm -Djdk.internal.httpclient.debug=true BufferingSubscriberTest
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    58
 */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    59
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    60
public class BufferingSubscriberTest {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    61
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    62
    // If we compute that a test will take less that 10s
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    63
    // we judge it acceptable
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    64
    static final long LOWER_THRESHOLD = 10_000; // 10 sec.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    65
    // If we compute that a test will take more than 20 sec
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    66
    // we judge it problematic: we will try to adjust the
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    67
    // buffer sizes, and if we can't we will print a warning
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    68
    static final long UPPER_THRESHOLD = 20_000; // 20 sec.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    69
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    70
    static final Random random = RandomFactory.getRandom();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    71
    static final long start = System.nanoTime();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    72
    static final String START = "start";
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    73
    static final String END   = "end  ";
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    74
    static long elapsed() { return (System.nanoTime() - start)/1000_000;}
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    75
    static void printStamp(String what, String fmt, Object... args) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    76
        long elapsed = elapsed();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    77
        long sec = elapsed/1000;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    78
        long ms  = elapsed % 1000;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    79
        String time = sec > 0 ? sec + "sec " : "";
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    80
        time = time + ms + "ms";
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    81
        out.println(what + "\t ["+time+"]\t "+ String.format(fmt,args));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    82
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    83
    @DataProvider(name = "negatives")
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    84
    public Object[][] negatives() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    85
        return new Object[][] {  { 0 }, { -1 }, { -1000 } };
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    86
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    87
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    88
    @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    89
    public void subscriberThrowsIAE(int bufferSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    90
        printStamp(START, "subscriberThrowsIAE(%d)", bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    91
        try {
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48083
diff changeset
    92
            BodySubscriber<?> bp = BodySubscribers.ofByteArray();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48083
diff changeset
    93
            BodySubscribers.buffering(bp, bufferSize);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    94
        } finally {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    95
            printStamp(END, "subscriberThrowsIAE(%d)", bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    96
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    97
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    98
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    99
    @Test(dataProvider = "negatives", expectedExceptions = IllegalArgumentException.class)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   100
    public void handlerThrowsIAE(int bufferSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   101
        printStamp(START, "handlerThrowsIAE(%d)", bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   102
        try {
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48083
diff changeset
   103
            BodyHandler<?> bp = BodyHandlers.ofByteArray();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48083
diff changeset
   104
            BodyHandlers.buffering(bp, bufferSize);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   105
        } finally {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   106
            printStamp(END, "handlerThrowsIAE(%d)", bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   107
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   108
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   109
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   110
    // ---
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   111
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   112
    @DataProvider(name = "config")
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   113
    public Object[][] config() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   114
        return new Object[][] {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   115
            // iterations delayMillis numBuffers bufferSize maxBufferSize minBufferSize
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   116
            { 1,              0,          1,         1,         2,            1   },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   117
            { 1,              5,          1,         100,       2,            1   },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   118
            { 1,              0,          1,         10,        1000,         1   },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   119
            { 1,              10,         1,         10,        1000,         1   },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   120
            { 1,              0,          1,         1000,      1000,         10  },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   121
            { 1,              0,          10,        1000,      1000,         50  },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   122
            { 1,              0,          1000,      10 ,       1000,         50  },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   123
            { 1,              100,        1,         1000 * 4,  1000,         50  },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   124
            { 100,            0,          1000,      1,         2,            1   },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   125
            { 3,              0,          4,         5006,      1000,         50  },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   126
            { 20,             0,          100,       4888,      1000,         100 },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   127
            { 16,             10,         1000,      50 ,       1000,         100 },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   128
            { 16,             10,         1000,      50 ,       657,          657 },
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   129
        };
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   130
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   131
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   132
    @Test(dataProvider = "config")
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   133
    public void test(int iterations,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   134
                     int delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   135
                     int numBuffers,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   136
                     int bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   137
                     int maxBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   138
                     int minbufferSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   139
        for (long perRequestAmount : new long[] { 1L, MAX_VALUE })
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   140
            test(iterations,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   141
                 delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   142
                 numBuffers,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   143
                 bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   144
                 maxBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   145
                 minbufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   146
                 perRequestAmount);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   147
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   148
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   149
    volatile boolean onNextThrew;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   150
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   151
    BiConsumer<Flow.Subscriber<?>, ? super Throwable> onNextThrowHandler =
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   152
            (sub, ex) -> {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   153
                onNextThrew = true;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   154
                System.out.println("onNext threw " + ex);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   155
                ex.printStackTrace();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   156
    };
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   157
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   158
    public void test(int iterations,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   159
                     int delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   160
                     int numBuffers,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   161
                     int bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   162
                     int maxBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   163
                     int minBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   164
                     long requestAmount) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   165
        ExecutorService executor = Executors.newFixedThreadPool(1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   166
        try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   167
            out.printf("Iterations %d\n", iterations);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   168
            for (int i=0; i<iterations; i++ ) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   169
                printStamp(START, "Iteration %d", i);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   170
                try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   171
                    SubmissionPublisher<List<ByteBuffer>> publisher =
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   172
                            new SubmissionPublisher<>(executor,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   173
                                                      1, // lock-step with the publisher, for now
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   174
                                                      onNextThrowHandler);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   175
                    CompletableFuture<?> cf = sink(publisher,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   176
                            delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   177
                            numBuffers * bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   178
                            requestAmount,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   179
                            maxBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   180
                            minBufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   181
                    source(publisher, numBuffers, bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   182
                    publisher.close();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   183
                    cf.join();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   184
                } finally {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   185
                    printStamp(END, "Iteration %d\n", i);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   186
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   187
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   188
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   189
            assertFalse(onNextThrew, "Unexpected onNextThrew, check output");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   190
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   191
            out.println("OK");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   192
        } finally {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   193
            executor.shutdown();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   194
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   195
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   196
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   197
    static long accumulatedDataSize(List<ByteBuffer> bufs) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   198
        return bufs.stream().mapToLong(ByteBuffer::remaining).sum();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   199
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   200
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   201
    /** Returns a new BB with its contents set to monotonically increasing
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   202
     * values, staring at the given start index and wrapping every 100. */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   203
    static ByteBuffer allocateBuffer(int size, int startIdx) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   204
        ByteBuffer b = ByteBuffer.allocate(size);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   205
        for (int i=0; i<size; i++)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   206
            b.put((byte)((startIdx + i) % 100));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   207
        b.position(0);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   208
        return b;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   209
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   210
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   211
    static class TestSubscriber implements BodySubscriber<Integer> {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   212
        final int delayMillis;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   213
        final int bufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   214
        final int expectedTotalSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   215
        final long requestAmount;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   216
        final CompletableFuture<Integer> completion;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   217
        final Executor delayedExecutor;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   218
        volatile Flow.Subscription subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   219
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   220
        TestSubscriber(int bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   221
                       int delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   222
                       int expectedTotalSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   223
                       long requestAmount) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   224
            this.bufferSize = bufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   225
            this.completion = new CompletableFuture<>();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   226
            this.delayMillis = delayMillis;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   227
            this.delayedExecutor = delayedExecutor(delayMillis, MILLISECONDS);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   228
            this.expectedTotalSize = expectedTotalSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   229
            this.requestAmount = requestAmount;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   230
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   231
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   232
        /**
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   233
         * Example of a factory method which would decorate a buffering
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   234
         * subscriber to create a new subscriber dependent on buffering capability.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   235
         * <p>
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   236
         * The integer type parameter simulates the body just by counting the
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   237
         * number of bytes in the body.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   238
         */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   239
        static BodySubscriber<Integer> createSubscriber(int bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   240
                                                        int delay,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   241
                                                        int expectedTotalSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   242
                                                        long requestAmount) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   243
            TestSubscriber s = new TestSubscriber(bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   244
                                                delay,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   245
                                                expectedTotalSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   246
                                                requestAmount);
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48083
diff changeset
   247
            return BodySubscribers.buffering(s, bufferSize);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   248
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   249
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   250
        private void requestMore() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   251
            subscription.request(requestAmount);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   252
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   253
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   254
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   255
        public void onSubscribe(Subscription subscription) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   256
            assertNull(this.subscription);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   257
            this.subscription = subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   258
            if (delayMillis > 0)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   259
                delayedExecutor.execute(this::requestMore);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   260
            else
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   261
                requestMore();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   262
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   263
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   264
        volatile int wrongSizes;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   265
        volatile int totalBytesReceived;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   266
        volatile int onNextInvocations;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   267
        volatile long lastSeenSize = -1;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   268
        volatile boolean noMoreOnNext; // false
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   269
        volatile int index; // 0
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   270
        volatile long count;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   271
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   272
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   273
        public void onNext(List<ByteBuffer> items) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   274
            try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   275
                long sz = accumulatedDataSize(items);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   276
                boolean printStamp = delayMillis > 0
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   277
                        && requestAmount < Long.MAX_VALUE
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   278
                        && count % 20 == 0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   279
                if (printStamp) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   280
                    printStamp("stamp", "count=%d sz=%d accumulated=%d",
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   281
                            count, sz, (totalBytesReceived + sz));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   282
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   283
                count++;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   284
                onNextInvocations++;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   285
                assertNotEquals(sz, 0L, "Unexpected empty buffers");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   286
                items.stream().forEach(b -> assertEquals(b.position(), 0));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   287
                assertFalse(noMoreOnNext);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   288
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   289
                if (sz != bufferSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   290
                    String msg = sz + ", should be less than bufferSize, " + bufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   291
                    assertTrue(sz < bufferSize, msg);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   292
                    assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   293
                    noMoreOnNext = true;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   294
                    wrongSizes++;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   295
                    printStamp("onNext",
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   296
                            "Possibly received last buffer: sz=%d, accumulated=%d, total=%d",
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   297
                            sz, totalBytesReceived, totalBytesReceived + sz);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   298
                } else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   299
                    assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   300
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   301
                lastSeenSize = sz;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   302
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   303
                // Ensure expected contents
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   304
                for (ByteBuffer b : items) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   305
                    while (b.hasRemaining()) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   306
                        assertEquals(b.get(), (byte) (index % 100));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   307
                        index++;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   308
                    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   309
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   310
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   311
                totalBytesReceived += sz;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   312
                assertEquals(totalBytesReceived, index);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   313
                if (delayMillis > 0 && ((expectedTotalSize - totalBytesReceived) > bufferSize))
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   314
                    delayedExecutor.execute(this::requestMore);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   315
                else
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   316
                    requestMore();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   317
            } catch (Throwable t) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   318
                completion.completeExceptionally(t);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   319
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   320
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   321
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   322
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   323
        public void onError(Throwable throwable) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   324
            completion.completeExceptionally(throwable);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   325
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   326
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   327
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   328
        public void onComplete() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   329
            if (wrongSizes > 1) { // allow just the final item to be smaller
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   330
                String msg = "Wrong sizes. Expected no more than 1. [" + this + "]";
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   331
                completion.completeExceptionally(new Throwable(msg));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   332
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   333
            if (totalBytesReceived != expectedTotalSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   334
                String msg = "Wrong number of bytes. [" + this + "]";
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   335
                completion.completeExceptionally(new Throwable(msg));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   336
            } else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   337
                completion.complete(totalBytesReceived);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   338
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   339
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   340
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   341
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   342
        public CompletionStage<Integer> getBody() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   343
            return completion;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   344
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   345
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   346
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   347
        public String toString() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   348
            StringBuilder sb = new StringBuilder();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   349
            sb.append(super.toString());
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   350
            sb.append(", bufferSize=").append(bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   351
            sb.append(", onNextInvocations=").append(onNextInvocations);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   352
            sb.append(", totalBytesReceived=").append(totalBytesReceived);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   353
            sb.append(", expectedTotalSize=").append(expectedTotalSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   354
            sb.append(", requestAmount=").append(requestAmount);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   355
            sb.append(", lastSeenSize=").append(lastSeenSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   356
            sb.append(", wrongSizes=").append(wrongSizes);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   357
            sb.append(", index=").append(index);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   358
            return sb.toString();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   359
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   360
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   361
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   362
    /**
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   363
     * Publishes data, through the given publisher, using the main thread.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   364
     *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   365
     * Note: The executor supplied when creating the SubmissionPublisher provides
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   366
     * the threads for executing the Subscribers.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   367
     *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   368
     * @param publisher the publisher
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   369
     * @param numBuffers the number of buffers to send ( before splitting in two )
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   370
     * @param bufferSize the total size of the data to send ( before splitting in two )
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   371
     */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   372
    static void source(SubmissionPublisher<List<ByteBuffer>> publisher,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   373
                       int numBuffers,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   374
                       int bufferSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   375
        printStamp("source","Publishing %d buffers of size %d each", numBuffers, bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   376
        int index = 0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   377
        for (int i=0; i<numBuffers; i++) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   378
            int chunkSize = random.nextInt(bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   379
            ByteBuffer buf1 = allocateBuffer(chunkSize, index);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   380
            index += chunkSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   381
            ByteBuffer buf2 = allocateBuffer(bufferSize - chunkSize, index);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   382
            index += bufferSize - chunkSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   383
            publisher.submit(List.of(buf1, buf2));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   384
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   385
        printStamp("source", "complete");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   386
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   387
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   388
    /**
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   389
     * Creates and subscribes Subscribers that receive data from the given
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   390
     * publisher.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   391
     *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   392
     * @param publisher the publisher
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   393
     * @param delayMillis time, in milliseconds, to delay the Subscription
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   394
     *                    requesting more bytes ( for simulating slow consumption )
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   395
     * @param expectedTotalSize the total number of bytes expected to be received
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   396
     *                          by the subscribers
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   397
     * @return a CompletableFuture which completes when the subscription is complete
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   398
     */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   399
    static CompletableFuture<?> sink(SubmissionPublisher<List<ByteBuffer>> publisher,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   400
                                     int delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   401
                                     int expectedTotalSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   402
                                     long requestAmount,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   403
                                     int maxBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   404
                                     int minBufferSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   405
        int bufferSize = chooseBufferSize(maxBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   406
                                          minBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   407
                                          delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   408
                                          expectedTotalSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   409
                                          requestAmount);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   410
        assert bufferSize > 0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   411
        assert bufferSize >= minBufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   412
        assert bufferSize <= maxBufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   413
        BodySubscriber<Integer> sub = TestSubscriber.createSubscriber(bufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   414
                                                                    delayMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   415
                                                                    expectedTotalSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   416
                                                                    requestAmount);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   417
        publisher.subscribe(sub);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   418
        printStamp("sink","Subscriber reads data with buffer size: %d", bufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   419
        out.printf("Subscription delay is %d msec\n", delayMillis);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   420
        long delay = (((long)delayMillis * expectedTotalSize) / bufferSize) / requestAmount;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   421
        out.printf("Minimum total delay is %d sec %d ms\n", delay / 1000, delay % 1000);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   422
        out.printf("Request amount is %d items\n", requestAmount);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   423
        return sub.getBody().toCompletableFuture();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   424
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   425
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   426
    static int chooseBufferSize(int maxBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   427
                                int minBufferSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   428
                                int delaysMillis,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   429
                                int expectedTotalSize,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   430
                                long requestAmount) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   431
        assert minBufferSize > 0 && maxBufferSize > 0 && requestAmount > 0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   432
        int bufferSize = maxBufferSize == minBufferSize ? maxBufferSize :
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   433
                (random.nextInt(maxBufferSize - minBufferSize)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   434
                    + minBufferSize);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   435
        if (requestAmount == Long.MAX_VALUE) return bufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   436
        long minDelay = (((long)delaysMillis * expectedTotalSize) / maxBufferSize)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   437
                / requestAmount;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   438
        long maxDelay = (((long)delaysMillis * expectedTotalSize) / minBufferSize)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   439
                / requestAmount;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   440
        // if the maximum delay is < 10s just take a random number between min and max.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   441
        if (maxDelay <= LOWER_THRESHOLD) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   442
            return bufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   443
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   444
        // if minimum delay is greater than 20s then print a warning and use max buffer.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   445
        if (minDelay >= UPPER_THRESHOLD) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   446
            System.out.println("Warning: minimum delay is "
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   447
                    + minDelay/1000 + "sec " + minDelay%1000 + "ms");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   448
            System.err.println("Warning: minimum delay is "
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   449
                    + minDelay/1000 + "sec " + minDelay%1000 + "ms");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   450
            return maxBufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   451
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   452
        // maxDelay could be anything, but minDelay is below the UPPER_THRESHOLD
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   453
        // try to pick up a buffer size that keeps the delay below the
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   454
        // UPPER_THRESHOLD
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   455
        while (minBufferSize < maxBufferSize) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   456
            bufferSize = random.nextInt(maxBufferSize - minBufferSize)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   457
                    + minBufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   458
            long delay = (((long)delaysMillis * expectedTotalSize) / bufferSize)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   459
                    / requestAmount;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   460
            if (delay < UPPER_THRESHOLD) return bufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   461
            minBufferSize++;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   462
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   463
        return minBufferSize;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   464
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   465
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   466
    // ---
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   467
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   468
    /* Main entry point for standalone testing of the main functional test. */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   469
    public static void main(String... args) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   470
        BufferingSubscriberTest t = new BufferingSubscriberTest();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   471
        for (Object[] objs : t.config())
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   472
            t.test((int)objs[0], (int)objs[1], (int)objs[2], (int)objs[3], (int)objs[4], (int)objs[5]);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   473
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   474
}