src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
author prappo
Tue, 21 Nov 2017 17:17:37 +0300
branchhttp-client-branch
changeset 55847 3bac3bca4adb
parent 55803 259cf67b22ec
child 55858 cd5eeec735fb
permissions -rw-r--r--
http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
55763
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
     1
/*
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
     2
 * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
     4
 *
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
     7
 * published by the Free Software Foundation.  Oracle designates this
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
     8
 * particular file as subject to the "Classpath" exception as provided
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
     9
 * by Oracle in the LICENSE file that accompanied this code.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    10
 *
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    11
 * This code is distributed in the hope that it will be useful, but WITHOUT
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    12
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    13
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    14
 * version 2 for more details (a copy is included in the LICENSE file that
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    15
 * accompanied this code).
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    16
 *
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    17
 * You should have received a copy of the GNU General Public License version
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    18
 * 2 along with this work; if not, write to the Free Software Foundation,
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    19
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    20
 *
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    21
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    22
 * or visit www.oracle.com if you need additional information or have any
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    23
 * questions.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    24
 */
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    25
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    26
package jdk.incubator.http;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    27
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    28
import java.io.IOException;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    29
import java.io.InputStream;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    30
import java.lang.System.Logger.Level;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    31
import java.nio.ByteBuffer;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    32
import java.nio.channels.FileChannel;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    33
import java.nio.file.OpenOption;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    34
import java.nio.file.Path;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    35
import java.security.AccessControlContext;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    36
import java.security.AccessController;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    37
import java.security.PrivilegedActionException;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    38
import java.security.PrivilegedExceptionAction;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    39
import java.util.ArrayList;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    40
import java.util.Iterator;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    41
import java.util.List;
55847
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
    42
import java.util.Objects;
55763
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    43
import java.util.Optional;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    44
import java.util.concurrent.ArrayBlockingQueue;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    45
import java.util.concurrent.BlockingQueue;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    46
import java.util.concurrent.CompletableFuture;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    47
import java.util.concurrent.CompletionStage;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    48
import java.util.concurrent.ConcurrentHashMap;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    49
import java.util.concurrent.Flow;
55847
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
    50
import java.util.concurrent.atomic.AtomicBoolean;
55763
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    51
import java.util.function.Consumer;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    52
import java.util.function.Function;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    53
import jdk.incubator.http.internal.common.MinimalFuture;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    54
import jdk.incubator.http.internal.common.Utils;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    55
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    56
class ResponseSubscribers {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    57
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    58
    static class ConsumerSubscriber implements HttpResponse.BodySubscriber<Void> {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    59
        private final Consumer<Optional<byte[]>> consumer;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    60
        private Flow.Subscription subscription;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    61
        private final CompletableFuture<Void> result = new MinimalFuture<>();
55847
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
    62
        private final AtomicBoolean subscribed = new AtomicBoolean();
55763
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    63
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    64
        ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    65
            this.consumer = consumer;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    66
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    67
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    68
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    69
        public CompletionStage<Void> getBody() {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    70
            return result;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    71
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    72
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    73
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    74
        public void onSubscribe(Flow.Subscription subscription) {
55847
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
    75
            if (!subscribed.compareAndSet(false, true)) {
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
    76
                subscription.cancel();
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
    77
            } else {
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
    78
                this.subscription = subscription;
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
    79
                subscription.request(1);
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
    80
            }
55763
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    81
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    82
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    83
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    84
        public void onNext(List<ByteBuffer> items) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    85
            for (ByteBuffer item : items) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    86
                byte[] buf = new byte[item.remaining()];
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    87
                item.get(buf);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    88
                consumer.accept(Optional.of(buf));
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    89
            }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    90
            subscription.request(1);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    91
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    92
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    93
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    94
        public void onError(Throwable throwable) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    95
            result.completeExceptionally(throwable);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    96
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    97
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    98
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
    99
        public void onComplete() {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   100
            consumer.accept(Optional.empty());
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   101
            result.complete(null);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   102
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   103
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   104
    }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   105
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   106
    static class PathSubscriber implements HttpResponse.BodySubscriber<Path> {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   107
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   108
        private final Path file;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   109
        private final CompletableFuture<Path> result = new MinimalFuture<>();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   110
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   111
        private volatile Flow.Subscription subscription;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   112
        private volatile FileChannel out;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   113
        private volatile AccessControlContext acc;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   114
        private final OpenOption[] options;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   115
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   116
        PathSubscriber(Path file, OpenOption... options) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   117
            this.file = file;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   118
            this.options = options;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   119
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   120
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   121
        void setAccessControlContext(AccessControlContext acc) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   122
            this.acc = acc;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   123
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   124
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   125
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   126
        public void onSubscribe(Flow.Subscription subscription) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   127
            if (System.getSecurityManager() != null && acc == null)
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   128
                throw new InternalError(
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   129
                        "Unexpected null acc when security manager has been installed");
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   130
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   131
            this.subscription = subscription;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   132
            try {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   133
                PrivilegedExceptionAction<FileChannel> pa =
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   134
                        () -> FileChannel.open(file, options);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   135
                out = AccessController.doPrivileged(pa, acc);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   136
            } catch (PrivilegedActionException pae) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   137
                Throwable t = pae.getCause() != null ? pae.getCause() : pae;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   138
                result.completeExceptionally(t);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   139
                subscription.cancel();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   140
                return;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   141
            }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   142
            subscription.request(1);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   143
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   144
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   145
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   146
        public void onNext(List<ByteBuffer> items) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   147
            try {
55803
259cf67b22ec http-client-branch: (cleanup)
prappo
parents: 55799
diff changeset
   148
                out.write(items.toArray(Utils.EMPTY_BB_ARRAY));
55763
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   149
            } catch (IOException ex) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   150
                Utils.close(out);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   151
                subscription.cancel();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   152
                result.completeExceptionally(ex);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   153
            }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   154
            subscription.request(1);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   155
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   156
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   157
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   158
        public void onError(Throwable e) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   159
            result.completeExceptionally(e);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   160
            Utils.close(out);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   161
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   162
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   163
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   164
        public void onComplete() {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   165
            Utils.close(out);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   166
            result.complete(file);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   167
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   168
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   169
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   170
        public CompletionStage<Path> getBody() {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   171
            return result;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   172
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   173
    }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   174
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   175
    static class ByteArraySubscriber<T> implements HttpResponse.BodySubscriber<T> {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   176
        private final Function<byte[], T> finisher;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   177
        private final CompletableFuture<T> result = new MinimalFuture<>();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   178
        private final List<ByteBuffer> received = new ArrayList<>();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   179
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   180
        private volatile Flow.Subscription subscription;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   181
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   182
        ByteArraySubscriber(Function<byte[],T> finisher) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   183
            this.finisher = finisher;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   184
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   185
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   186
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   187
        public void onSubscribe(Flow.Subscription subscription) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   188
            if (this.subscription != null) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   189
                subscription.cancel();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   190
                return;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   191
            }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   192
            this.subscription = subscription;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   193
            // We can handle whatever you've got
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   194
            subscription.request(Long.MAX_VALUE);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   195
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   196
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   197
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   198
        public void onNext(List<ByteBuffer> items) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   199
            // incoming buffers are allocated by http client internally,
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   200
            // and won't be used anywhere except this place.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   201
            // So it's free simply to store them for further processing.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   202
            assert Utils.hasRemaining(items);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   203
            Utils.accumulateBuffers(received, items);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   204
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   205
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   206
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   207
        public void onError(Throwable throwable) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   208
            received.clear();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   209
            result.completeExceptionally(throwable);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   210
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   211
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   212
        static private byte[] join(List<ByteBuffer> bytes) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   213
            int size = Utils.remaining(bytes, Integer.MAX_VALUE);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   214
            byte[] res = new byte[size];
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   215
            int from = 0;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   216
            for (ByteBuffer b : bytes) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   217
                int l = b.remaining();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   218
                b.get(res, from, l);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   219
                from += l;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   220
            }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   221
            return res;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   222
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   223
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   224
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   225
        public void onComplete() {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   226
            try {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   227
                result.complete(finisher.apply(join(received)));
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   228
                received.clear();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   229
            } catch (IllegalArgumentException e) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   230
                result.completeExceptionally(e);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   231
            }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   232
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   233
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   234
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   235
        public CompletionStage<T> getBody() {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   236
            return result;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   237
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   238
    }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   239
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   240
    /**
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   241
     * An InputStream built on top of the Flow API.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   242
     */
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   243
    static class HttpResponseInputStream extends InputStream
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   244
        implements HttpResponse.BodySubscriber<InputStream>
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   245
    {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   246
        final static boolean DEBUG = Utils.DEBUG;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   247
        final static int MAX_BUFFERS_IN_QUEUE = 1;  // lock-step with the producer
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   248
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   249
        // An immutable ByteBuffer sentinel to mark that the last byte was received.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   250
        private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   251
        private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   252
        private static final System.Logger DEBUG_LOGGER =
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   253
                Utils.getDebugLogger("HttpResponseInputStream"::toString, DEBUG);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   254
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   255
        // A queue of yet unprocessed ByteBuffers received from the flow API.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   256
        private final BlockingQueue<List<ByteBuffer>> buffers;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   257
        private volatile Flow.Subscription subscription;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   258
        private volatile boolean closed;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   259
        private volatile Throwable failed;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   260
        private volatile Iterator<ByteBuffer> currentListItr;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   261
        private volatile ByteBuffer currentBuffer;
55847
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   262
        private final AtomicBoolean subscribed = new AtomicBoolean();
55763
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   263
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   264
        HttpResponseInputStream() {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   265
            this(MAX_BUFFERS_IN_QUEUE);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   266
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   267
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   268
        HttpResponseInputStream(int maxBuffers) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   269
            int capacity = (maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   270
            // 1 additional slot needed for LAST_LIST added by onComplete
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   271
            this.buffers = new ArrayBlockingQueue<>(capacity + 1);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   272
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   273
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   274
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   275
        public CompletionStage<InputStream> getBody() {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   276
            // Returns the stream immediately, before the
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   277
            // response body is received.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   278
            // This makes it possible for senAsync().get().body()
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   279
            // to complete before the response body is received.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   280
            return CompletableFuture.completedStage(this);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   281
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   282
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   283
        // Returns the current byte buffer to read from.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   284
        // If the current buffer has no remaining data, this method will take the
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   285
        // next buffer from the buffers queue, possibly blocking until
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   286
        // a new buffer is made available through the Flow API, or the
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   287
        // end of the flow has been reached.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   288
        private ByteBuffer current() throws IOException {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   289
            while (currentBuffer == null || !currentBuffer.hasRemaining()) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   290
                // Check whether the stream is closed or exhausted
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   291
                if (closed || failed != null) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   292
                    throw new IOException("closed", failed);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   293
                }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   294
                if (currentBuffer == LAST_BUFFER) break;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   295
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   296
                try {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   297
                    if (currentListItr == null || !currentListItr.hasNext()) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   298
                        // Take a new list of buffers from the queue, blocking
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   299
                        // if none is available yet...
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   300
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   301
                        DEBUG_LOGGER.log(Level.DEBUG, "Taking list of Buffers");
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   302
                        List<ByteBuffer> lb = buffers.take();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   303
                        currentListItr = lb.iterator();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   304
                        DEBUG_LOGGER.log(Level.DEBUG, "List of Buffers Taken");
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   305
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   306
                        // Check whether an exception was encountered upstream
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   307
                        if (closed || failed != null)
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   308
                            throw new IOException("closed", failed);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   309
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   310
                        // Check whether we're done.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   311
                        if (lb == LAST_LIST) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   312
                            currentListItr = null;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   313
                            currentBuffer = LAST_BUFFER;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   314
                            break;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   315
                        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   316
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   317
                        // Request another upstream item ( list of buffers )
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   318
                        Flow.Subscription s = subscription;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   319
                        if (s != null) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   320
                            DEBUG_LOGGER.log(Level.DEBUG, "Increased demand by 1");
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   321
                            s.request(1);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   322
                        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   323
                    }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   324
                    assert currentListItr != null;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   325
                    assert currentListItr.hasNext();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   326
                    DEBUG_LOGGER.log(Level.DEBUG, "Next Buffer");
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   327
                    currentBuffer = currentListItr.next();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   328
                } catch (InterruptedException ex) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   329
                    // continue
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   330
                }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   331
            }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   332
            assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   333
            return currentBuffer;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   334
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   335
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   336
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   337
        public int read(byte[] bytes, int off, int len) throws IOException {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   338
            // get the buffer to read from, possibly blocking if
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   339
            // none is available
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   340
            ByteBuffer buffer;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   341
            if ((buffer = current()) == LAST_BUFFER) return -1;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   342
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   343
            // don't attempt to read more than what is available
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   344
            // in the current buffer.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   345
            int read = Math.min(buffer.remaining(), len);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   346
            assert read > 0 && read <= buffer.remaining();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   347
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   348
            // buffer.get() will do the boundary check for us.
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   349
            buffer.get(bytes, off, read);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   350
            return read;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   351
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   352
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   353
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   354
        public int read() throws IOException {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   355
            ByteBuffer buffer;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   356
            if ((buffer = current()) == LAST_BUFFER) return -1;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   357
            return buffer.get() & 0xFF;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   358
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   359
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   360
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   361
        public void onSubscribe(Flow.Subscription s) {
55847
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   362
            if (!subscribed.compareAndSet(false, true)) {
55763
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   363
                s.cancel();
55847
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   364
            } else {
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   365
                this.subscription = s;
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   366
                assert buffers.remainingCapacity() > 1; // should contain at least 2
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   367
                DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   368
                        + Math.max(1, buffers.remainingCapacity() - 1));
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   369
                s.request(Math.max(1, buffers.remainingCapacity() - 1));
55763
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   370
            }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   371
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   372
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   373
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   374
        public void onNext(List<ByteBuffer> t) {
55847
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   375
            Objects.requireNonNull(t);
55763
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   376
            try {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   377
                DEBUG_LOGGER.log(Level.DEBUG, "next item received");
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   378
                if (!buffers.offer(t)) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   379
                    throw new IllegalStateException("queue is full");
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   380
                }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   381
                DEBUG_LOGGER.log(Level.DEBUG, "item offered");
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   382
            } catch (Exception ex) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   383
                failed = ex;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   384
                try {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   385
                    close();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   386
                } catch (IOException ex1) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   387
                    // OK
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   388
                }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   389
            }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   390
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   391
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   392
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   393
        public void onError(Throwable thrwbl) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   394
            subscription = null;
55847
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   395
            failed = Objects.requireNonNull(thrwbl);
55776
9950bc2ee874 http-client-branch: ensure ResponseSubscribers.HttpResponseInputStreamTest::read will throw an exception when onError is called
dfuchs
parents: 55763
diff changeset
   396
            // The client process that reads the input stream might
9950bc2ee874 http-client-branch: ensure ResponseSubscribers.HttpResponseInputStreamTest::read will throw an exception when onError is called
dfuchs
parents: 55763
diff changeset
   397
            // be blocked in queue.take().
9950bc2ee874 http-client-branch: ensure ResponseSubscribers.HttpResponseInputStreamTest::read will throw an exception when onError is called
dfuchs
parents: 55763
diff changeset
   398
            // Tries to offer LAST_LIST to the queue. If the queue is
9950bc2ee874 http-client-branch: ensure ResponseSubscribers.HttpResponseInputStreamTest::read will throw an exception when onError is called
dfuchs
parents: 55763
diff changeset
   399
            // full we don't care if we can't insert this buffer, as
9950bc2ee874 http-client-branch: ensure ResponseSubscribers.HttpResponseInputStreamTest::read will throw an exception when onError is called
dfuchs
parents: 55763
diff changeset
   400
            // the client can't be blocked in queue.take() in that case.
9950bc2ee874 http-client-branch: ensure ResponseSubscribers.HttpResponseInputStreamTest::read will throw an exception when onError is called
dfuchs
parents: 55763
diff changeset
   401
            // Adding LAST_LIST to the queue is harmless, as the client
9950bc2ee874 http-client-branch: ensure ResponseSubscribers.HttpResponseInputStreamTest::read will throw an exception when onError is called
dfuchs
parents: 55763
diff changeset
   402
            // should find failed != null before handling LAST_LIST.
9950bc2ee874 http-client-branch: ensure ResponseSubscribers.HttpResponseInputStreamTest::read will throw an exception when onError is called
dfuchs
parents: 55763
diff changeset
   403
            buffers.offer(LAST_LIST);
55763
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   404
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   405
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   406
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   407
        public void onComplete() {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   408
            subscription = null;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   409
            onNext(LAST_LIST);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   410
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   411
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   412
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   413
        public void close() throws IOException {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   414
            synchronized (this) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   415
                if (closed) return;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   416
                closed = true;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   417
            }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   418
            Flow.Subscription s = subscription;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   419
            subscription = null;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   420
            if (s != null) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   421
                 s.cancel();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   422
            }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   423
            super.close();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   424
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   425
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   426
    }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   427
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   428
    static class MultiSubscriberImpl<V>
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   429
        implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V>
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   430
    {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   431
        private final MultiMapResult<V> results;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   432
        private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   433
        private final Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   434
        private final boolean completion; // aggregate completes on last PP received or overall completion
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   435
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   436
        MultiSubscriberImpl(
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   437
                Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler,
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   438
                Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler, boolean completion) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   439
            this.results = new MultiMapResult<>(new ConcurrentHashMap<>());
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   440
            this.requestHandler = requestHandler;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   441
            this.pushHandler = pushHandler;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   442
            this.completion = completion;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   443
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   444
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   445
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   446
        public HttpResponse.BodyHandler<V> onRequest(HttpRequest request) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   447
            CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   448
            results.put(request, cf);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   449
            return requestHandler.apply(request);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   450
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   451
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   452
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   453
        public Optional<HttpResponse.BodyHandler<V>> onPushPromise(HttpRequest push) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   454
            CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture();
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   455
            results.put(push, cf);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   456
            return pushHandler.apply(push);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   457
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   458
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   459
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   460
        public void onResponse(HttpResponse<V> response) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   461
            CompletableFuture<HttpResponse<V>> cf = results.get(response.request());
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   462
            cf.complete(response);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   463
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   464
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   465
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   466
        public void onError(HttpRequest request, Throwable t) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   467
            CompletableFuture<HttpResponse<V>> cf = results.get(request);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   468
            cf.completeExceptionally(t);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   469
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   470
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   471
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   472
        public CompletableFuture<MultiMapResult<V>> completion(
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   473
                CompletableFuture<Void> onComplete, CompletableFuture<Void> onFinalPushPromise) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   474
            if (completion)
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   475
                return onComplete.thenApply((ignored)-> results);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   476
            else
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   477
                return onFinalPushPromise.thenApply((ignored) -> results);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   478
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   479
    }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   480
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   481
    /**
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   482
     * Currently this consumes all of the data and ignores it
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   483
     */
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   484
    static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   485
55847
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   486
        private final CompletableFuture<T> cf = new MinimalFuture<>();
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   487
        private final Optional<T> result;
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   488
        private final AtomicBoolean subscribed = new AtomicBoolean();
55763
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   489
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   490
        NullSubscriber(Optional<T> result) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   491
            this.result = result;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   492
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   493
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   494
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   495
        public void onSubscribe(Flow.Subscription subscription) {
55847
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   496
            if (!subscribed.compareAndSet(false, true)) {
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   497
                subscription.cancel();
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   498
            } else {
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   499
                subscription.request(Long.MAX_VALUE);
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   500
            }
55763
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   501
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   502
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   503
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   504
        public void onNext(List<ByteBuffer> items) {
55847
3bac3bca4adb http-client-branch: removed RS TCK-incompatible publisher, updated subscribers so they'd pass TCK
prappo
parents: 55803
diff changeset
   505
            Objects.requireNonNull(items);
55763
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   506
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   507
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   508
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   509
        public void onError(Throwable throwable) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   510
            cf.completeExceptionally(throwable);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   511
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   512
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   513
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   514
        public void onComplete() {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   515
            if (result.isPresent()) {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   516
                cf.complete(result.get());
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   517
            } else {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   518
                cf.complete(null);
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   519
            }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   520
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   521
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   522
        @Override
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   523
        public CompletionStage<T> getBody() {
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   524
            return cf;
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   525
        }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   526
    }
634d8e14c172 http-client-branch: intial load from jdk10/sandbox
chegar
parents:
diff changeset
   527
}