test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/WrapperTest.java
author chegar
Wed, 07 Feb 2018 21:45:37 +0000
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56089 test/jdk/java/net/httpclient/whitebox/java.net.http/java/net/http/WrapperTest.java@42208b2f224e
child 56451 9585061fdb04
permissions -rw-r--r--
http-client-branch: move implementation to jdk.internal.net.http
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     1
/*
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     2
 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     4
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     7
 * published by the Free Software Foundation.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     8
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     9
 * This code is distributed in the hope that it will be useful, but WITHOUT
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    10
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    11
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    12
 * version 2 for more details (a copy is included in the LICENSE file that
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    13
 * accompanied this code).
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    14
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    15
 * You should have received a copy of the GNU General Public License version
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    16
 * 2 along with this work; if not, write to the Free Software Foundation,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    17
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    18
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    19
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    20
 * or visit www.oracle.com if you need additional information or have any
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    21
 * questions.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    22
 */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    23
56092
fd85b2bf2b0d http-client-branch: move implementation to jdk.internal.net.http
chegar
parents: 56089
diff changeset
    24
package jdk.internal.net.http;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    25
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    26
import java.nio.ByteBuffer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    27
import java.util.LinkedList;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    28
import java.util.List;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    29
import java.util.concurrent.*;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    30
import java.util.concurrent.atomic.*;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    31
import org.testng.annotations.Test;
56092
fd85b2bf2b0d http-client-branch: move implementation to jdk.internal.net.http
chegar
parents: 56089
diff changeset
    32
import jdk.internal.net.http.common.SubscriberWrapper;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    33
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    34
@Test
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    35
public class WrapperTest {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    36
    static final int LO_PRI = 1;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    37
    static final int HI_PRI = 2;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    38
    static final int NUM_HI_PRI = 240;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    39
    static final int BUFSIZE = 1016;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    40
    static final int BUFSIZE_INT = BUFSIZE/4;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    41
    static final int HI_PRI_FREQ = 40;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    42
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    43
    static final int TOTAL = 10000;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    44
    //static final int TOTAL = 500;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    45
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    46
    final SubmissionPublisher<List<ByteBuffer>> publisher;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    47
    final SubscriberWrapper sub1, sub2, sub3;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    48
    final ExecutorService executor = Executors.newCachedThreadPool();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    49
    volatile int hipricount = 0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    50
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    51
    void errorHandler(Flow.Subscriber<? super List<ByteBuffer>> sub, Throwable t) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    52
        System.err.printf("Exception from %s : %s\n", sub.toString(), t.toString());
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    53
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    54
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    55
    public WrapperTest() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    56
        publisher = new SubmissionPublisher<>(executor, 600,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    57
                (a, b) -> {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    58
                    errorHandler(a, b);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    59
                });
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    60
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    61
        CompletableFuture<Void> notif = new CompletableFuture<>();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    62
        LastSubscriber ls = new LastSubscriber(notif);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    63
        sub1 = new Filter1(ls);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    64
        sub2 = new Filter2(sub1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    65
        sub3 = new Filter2(sub2);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    66
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    67
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    68
    public class Filter2 extends SubscriberWrapper {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    69
        Filter2(SubscriberWrapper wrapper) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    70
            super(wrapper);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    71
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    72
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    73
        // reverse the order of the bytes in each buffer
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    74
        public void incoming(List<ByteBuffer> list, boolean complete) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    75
            List<ByteBuffer> out = new LinkedList<>();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    76
            for (ByteBuffer inbuf : list) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    77
                int size = inbuf.remaining();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    78
                ByteBuffer outbuf = ByteBuffer.allocate(size);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    79
                for (int i=size; i>0; i--) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    80
                    byte b = inbuf.get(i-1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    81
                    outbuf.put(b);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    82
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    83
                outbuf.flip();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    84
                out.add(outbuf);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    85
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    86
            if (complete) System.out.println("Filter2.complete");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    87
            outgoing(out, complete);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    88
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    89
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    90
        protected long windowUpdate(long currval) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    91
            return currval == 0 ? 1 : 0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    92
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    93
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    94
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    95
    volatile int filter1Calls = 0; // every third call we insert hi pri data
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    96
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    97
    ByteBuffer getHiPri(int val) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    98
        ByteBuffer buf = ByteBuffer.allocate(8);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    99
        buf.putInt(HI_PRI);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   100
        buf.putInt(val);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   101
        buf.flip();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   102
        return buf;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   103
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   104
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   105
    volatile int hiPriAdded = 0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   106
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   107
    public class Filter1 extends SubscriberWrapper {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   108
        Filter1(Flow.Subscriber<List<ByteBuffer>> downstreamSubscriber)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   109
        {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   110
            super();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   111
            subscribe(downstreamSubscriber);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   112
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   113
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   114
        // Inserts up to NUM_HI_PRI hi priority buffers into flow
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   115
        protected void incoming(List<ByteBuffer> in, boolean complete) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   116
            if ((++filter1Calls % HI_PRI_FREQ) == 0 && (hiPriAdded++ < NUM_HI_PRI)) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   117
                sub1.outgoing(getHiPri(hipricount++), false);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   118
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   119
            // pass data thru
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   120
            if (complete) System.out.println("Filter1.complete");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   121
            outgoing(in, complete);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   122
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   123
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   124
        protected long windowUpdate(long currval) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   125
            return currval == 0 ? 1 : 0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   126
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   127
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   128
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   129
    /**
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   130
     * Final subscriber in the chain. Compares the data sent by the original
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   131
     * publisher.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   132
     */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   133
    static public class LastSubscriber implements Flow.Subscriber<List<ByteBuffer>> {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   134
        volatile Flow.Subscription subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   135
        volatile int hipriCounter=0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   136
        volatile int lopriCounter=0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   137
        final CompletableFuture<Void> cf;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   138
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   139
        LastSubscriber(CompletableFuture<Void> cf) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   140
            this.cf = cf;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   141
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   142
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   143
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   144
        public void onSubscribe(Flow.Subscription subscription) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   145
            this.subscription = subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   146
            subscription.request(50); // say
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   147
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   148
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   149
        private void error(String...args) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   150
            StringBuilder sb = new StringBuilder();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   151
            for (String s : args) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   152
                sb.append(s);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   153
                sb.append(' ');
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   154
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   155
            String msg = sb.toString();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   156
            System.out.println("Error: " + msg);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   157
            RuntimeException e = new RuntimeException(msg);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   158
            cf.completeExceptionally(e);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   159
            subscription.cancel(); // This is where we need a variant that include exception
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   160
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   161
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   162
        private void check(ByteBuffer buf) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   163
            int type = buf.getInt();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   164
            if (type == HI_PRI) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   165
                // check next int is hi pri counter
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   166
                int c = buf.getInt();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   167
                if (c != hipriCounter)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   168
                    error("hi pri counter", Integer.toString(c), Integer.toString(hipriCounter));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   169
                hipriCounter++;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   170
            } else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   171
                while (buf.hasRemaining()) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   172
                    if (buf.getInt() != lopriCounter)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   173
                        error("lo pri counter", Integer.toString(lopriCounter));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   174
                    lopriCounter++;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   175
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   176
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   177
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   178
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   179
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   180
        public void onNext(List<ByteBuffer> items) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   181
            for (ByteBuffer item : items)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   182
                check(item);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   183
            subscription.request(1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   184
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   185
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   186
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   187
        public void onError(Throwable throwable) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   188
            error(throwable.getMessage());
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   189
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   190
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   191
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   192
        public void onComplete() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   193
            if (hipriCounter != NUM_HI_PRI)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   194
                error("hi pri at end wrong", Integer.toString(hipriCounter), Integer.toString(NUM_HI_PRI));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   195
            else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   196
                System.out.println("LastSubscriber.complete");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   197
                cf.complete(null); // success
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   198
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   199
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   200
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   201
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   202
    List<ByteBuffer> getBuffer(int c) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   203
        ByteBuffer buf = ByteBuffer.allocate(BUFSIZE+4);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   204
        buf.putInt(LO_PRI);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   205
        for (int i=0; i<BUFSIZE_INT; i++) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   206
            buf.putInt(c++);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   207
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   208
        buf.flip();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   209
        return List.of(buf);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   210
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   211
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   212
    boolean errorTest = false;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   213
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   214
    @Test
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   215
    public void run() throws InterruptedException {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   216
        try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   217
            CompletableFuture<Void> completion = sub3.completion();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   218
            publisher.subscribe(sub3);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   219
            // now submit a load of data
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   220
            int counter = 0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   221
            for (int i = 0; i < TOTAL; i++) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   222
                List<ByteBuffer> bufs = getBuffer(counter);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   223
                //if (i==2)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   224
                    //bufs.get(0).putInt(41, 1234); // error
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   225
                counter += BUFSIZE_INT;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   226
                publisher.submit(bufs);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   227
                //if (i % 1000 == 0)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   228
                    //Thread.sleep(1000);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   229
                //if (i == 99) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   230
                    //publisher.closeExceptionally(new RuntimeException("Test error"));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   231
                    //errorTest = true;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   232
                    //break;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   233
                //}
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   234
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   235
            if (!errorTest) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   236
                publisher.close();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   237
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   238
            System.out.println("Publisher completed");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   239
            completion.join();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   240
            System.out.println("Subscribers completed ok");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   241
        } finally {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   242
            executor.shutdownNow();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   243
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   244
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   245
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   246
    static void display(CompletableFuture<?> cf) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   247
        System.out.print (cf);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   248
        if (!cf.isDone())
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   249
            return;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   250
        try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   251
            cf.join(); // wont block
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   252
        } catch (Exception e) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   253
            System.out.println(" " + e);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   254
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   255
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   256
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   257
/*
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   258
    public static void main(String[] args) throws InterruptedException {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   259
        WrapperTest test = new WrapperTest();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   260
        test.run();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   261
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   262
*/
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   263
}