src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java
author chegar
Thu, 17 Oct 2019 20:54:25 +0100
branchdatagramsocketimpl-branch
changeset 58679 9c3209ff7550
parent 58678 9cf78a70fa4f
parent 57685 e4cc5231ce2d
permissions -rw-r--r--
datagramsocketimpl-branch: merge with default
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     1
/*
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
     2
 * Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     4
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     7
 * published by the Free Software Foundation.  Oracle designates this
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     8
 * particular file as subject to the "Classpath" exception as provided
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     9
 * by Oracle in the LICENSE file that accompanied this code.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    10
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    11
 * This code is distributed in the hope that it will be useful, but WITHOUT
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    12
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    13
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    14
 * version 2 for more details (a copy is included in the LICENSE file that
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    15
 * accompanied this code).
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    16
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    17
 * You should have received a copy of the GNU General Public License version
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    18
 * 2 along with this work; if not, write to the Free Software Foundation,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    19
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    20
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    21
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    22
 * or visit www.oracle.com if you need additional information or have any
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    23
 * questions.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    24
 */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    25
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    26
package jdk.internal.net.http;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    27
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    28
import java.io.BufferedReader;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    29
import java.io.FilePermission;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    30
import java.io.IOException;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    31
import java.io.InputStream;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    32
import java.io.InputStreamReader;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    33
import java.nio.ByteBuffer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    34
import java.nio.channels.FileChannel;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    35
import java.nio.charset.Charset;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    36
import java.nio.file.OpenOption;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    37
import java.nio.file.Path;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    38
import java.security.AccessController;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    39
import java.security.PrivilegedActionException;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    40
import java.security.PrivilegedExceptionAction;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    41
import java.util.ArrayList;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    42
import java.util.Iterator;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    43
import java.util.List;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    44
import java.util.Objects;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    45
import java.util.Optional;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    46
import java.util.concurrent.ArrayBlockingQueue;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    47
import java.util.concurrent.BlockingQueue;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    48
import java.util.concurrent.CompletableFuture;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    49
import java.util.concurrent.CompletionStage;
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    50
import java.util.concurrent.Executor;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    51
import java.util.concurrent.Flow;
48408
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
    52
import java.util.concurrent.Flow.Subscriber;
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
    53
import java.util.concurrent.Flow.Subscription;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    54
import java.util.concurrent.atomic.AtomicBoolean;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    55
import java.util.concurrent.atomic.AtomicReference;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    56
import java.util.function.Consumer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    57
import java.util.function.Function;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    58
import java.util.stream.Stream;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    59
import java.net.http.HttpResponse.BodySubscriber;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    60
import jdk.internal.net.http.common.Log;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    61
import jdk.internal.net.http.common.Logger;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    62
import jdk.internal.net.http.common.MinimalFuture;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    63
import jdk.internal.net.http.common.Utils;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    64
import static java.nio.charset.StandardCharsets.UTF_8;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    65
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    66
public class ResponseSubscribers {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    67
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    68
    /**
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    69
     * This interface is used by our BodySubscriber implementations to
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    70
     * declare whether calling getBody() inline is safe, or whether
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    71
     * it needs to be called asynchronously in an executor thread.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    72
     * Calling getBody() inline is usually safe except when it
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    73
     * might block - which can be the case if the BodySubscriber
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    74
     * is provided by custom code, or if it uses a finisher that
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    75
     * might be called and might block before the last bit is
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    76
     * received (for instance, if a mapping subscriber is used with
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    77
     * a mapper function that maps an InputStream to a GZIPInputStream,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    78
     * as the the constructor of GZIPInputStream calls read()).
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    79
     * @param <T> The response type.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    80
     */
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    81
    public interface TrustedSubscriber<T> extends BodySubscriber<T> {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    82
        /**
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    83
         * Returns true if getBody() should be called asynchronously.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    84
         * @implSpec The default implementation of this method returns
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    85
         *           false.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    86
         * @return true if getBody() should be called asynchronously.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    87
         */
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    88
        default boolean needsExecutor() { return false;}
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    89
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    90
        /**
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    91
         * Returns true if calling {@code bs::getBody} might block
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    92
         * and requires an executor.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    93
         *
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    94
         * @implNote
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    95
         * In particular this method returns
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    96
         * true if {@code bs} is not a {@code TrustedSubscriber}.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    97
         * If it is a {@code TrustedSubscriber}, it returns
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    98
         * {@code ((TrustedSubscriber) bs).needsExecutor()}.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    99
         *
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   100
         * @param bs A BodySubscriber.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   101
         * @return true if calling {@code bs::getBody} requires using
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   102
         *         an executor.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   103
         */
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   104
        static boolean needsExecutor(BodySubscriber<?> bs) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   105
            if (bs instanceof TrustedSubscriber) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   106
                return ((TrustedSubscriber) bs).needsExecutor();
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   107
            } else return true;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   108
        }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   109
    }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   110
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   111
    public static class ConsumerSubscriber implements TrustedSubscriber<Void> {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   112
        private final Consumer<Optional<byte[]>> consumer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   113
        private Flow.Subscription subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   114
        private final CompletableFuture<Void> result = new MinimalFuture<>();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   115
        private final AtomicBoolean subscribed = new AtomicBoolean();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   116
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   117
        public ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) {
48379
5382baab8371 8193698: Null handling in BodyPublisher, BodyHandler, and BodySubscriber convenience static factory methods
chegar
parents: 48083
diff changeset
   118
            this.consumer = Objects.requireNonNull(consumer);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   119
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   120
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   121
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   122
        public CompletionStage<Void> getBody() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   123
            return result;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   124
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   125
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   126
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   127
        public void onSubscribe(Flow.Subscription subscription) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   128
            Objects.requireNonNull(subscription);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   129
            if (!subscribed.compareAndSet(false, true)) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   130
                subscription.cancel();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   131
            } else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   132
                this.subscription = subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   133
                subscription.request(1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   134
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   135
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   136
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   137
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   138
        public void onNext(List<ByteBuffer> items) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   139
            Objects.requireNonNull(items);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   140
            for (ByteBuffer item : items) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   141
                byte[] buf = new byte[item.remaining()];
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   142
                item.get(buf);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   143
                consumer.accept(Optional.of(buf));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   144
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   145
            subscription.request(1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   146
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   147
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   148
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   149
        public void onError(Throwable throwable) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   150
            Objects.requireNonNull(throwable);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   151
            result.completeExceptionally(throwable);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   152
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   153
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   154
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   155
        public void onComplete() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   156
            consumer.accept(Optional.empty());
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   157
            result.complete(null);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   158
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   159
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   160
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   161
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   162
    /**
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   163
     * A Subscriber that writes the flow of data to a given file.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   164
     *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   165
     * Privileged actions are performed within a limited doPrivileged that only
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   166
     * asserts the specific, write, file permissions that were checked during
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   167
     * the construction of this PathSubscriber.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   168
     */
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   169
    public static class PathSubscriber implements TrustedSubscriber<Path> {
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   170
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   171
        private static final FilePermission[] EMPTY_FILE_PERMISSIONS = new FilePermission[0];
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   172
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   173
        private final Path file;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   174
        private final OpenOption[] options;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   175
        private final FilePermission[] filePermissions;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   176
        private final CompletableFuture<Path> result = new MinimalFuture<>();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   177
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   178
        private final AtomicBoolean subscribed = new AtomicBoolean();
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   179
        private volatile Flow.Subscription subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   180
        private volatile FileChannel out;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   181
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   182
        private static final String pathForSecurityCheck(Path path) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   183
            return path.toFile().getPath();
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   184
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   185
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   186
        /**
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   187
         * Factory for creating PathSubscriber.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   188
         *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   189
         * Permission checks are performed here before construction of the
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   190
         * PathSubscriber. Permission checking and construction are deliberately
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   191
         * and tightly co-located.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   192
         */
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   193
        public static PathSubscriber create(Path file,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   194
                                            List<OpenOption> options) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   195
            FilePermission filePermission = null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   196
            SecurityManager sm = System.getSecurityManager();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   197
            if (sm != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   198
                String fn = pathForSecurityCheck(file);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   199
                FilePermission writePermission = new FilePermission(fn, "write");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   200
                sm.checkPermission(writePermission);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   201
                filePermission = writePermission;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   202
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   203
            return new PathSubscriber(file, options, filePermission);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   204
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   205
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   206
        // pp so handler implementations in the same package can construct
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   207
        /*package-private*/ PathSubscriber(Path file,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   208
                                           List<OpenOption> options,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   209
                                           FilePermission... filePermissions) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   210
            this.file = file;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   211
            this.options = options.stream().toArray(OpenOption[]::new);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   212
            this.filePermissions =
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   213
                    filePermissions == null ? EMPTY_FILE_PERMISSIONS : filePermissions;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   214
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   215
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   216
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   217
        public void onSubscribe(Flow.Subscription subscription) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   218
            Objects.requireNonNull(subscription);
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   219
            if (!subscribed.compareAndSet(false, true)) {
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   220
                subscription.cancel();
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   221
                return;
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   222
            }
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   223
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   224
            this.subscription = subscription;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   225
            if (System.getSecurityManager() == null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   226
                try {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   227
                    out = FileChannel.open(file, options);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   228
                } catch (IOException ioe) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   229
                    result.completeExceptionally(ioe);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   230
                    return;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   231
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   232
            } else {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   233
                try {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   234
                    PrivilegedExceptionAction<FileChannel> pa =
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   235
                            () -> FileChannel.open(file, options);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   236
                    out = AccessController.doPrivileged(pa, null, filePermissions);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   237
                } catch (PrivilegedActionException pae) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   238
                    Throwable t = pae.getCause() != null ? pae.getCause() : pae;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   239
                    result.completeExceptionally(t);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   240
                    subscription.cancel();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   241
                    return;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   242
                }
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   243
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   244
            subscription.request(1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   245
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   246
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   247
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   248
        public void onNext(List<ByteBuffer> items) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   249
            try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   250
                out.write(items.toArray(Utils.EMPTY_BB_ARRAY));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   251
            } catch (IOException ex) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   252
                Utils.close(out);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   253
                subscription.cancel();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   254
                result.completeExceptionally(ex);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   255
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   256
            subscription.request(1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   257
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   258
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   259
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   260
        public void onError(Throwable e) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   261
            result.completeExceptionally(e);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   262
            Utils.close(out);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   263
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   264
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   265
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   266
        public void onComplete() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   267
            Utils.close(out);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   268
            result.complete(file);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   269
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   270
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   271
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   272
        public CompletionStage<Path> getBody() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   273
            return result;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   274
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   275
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   276
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   277
    public static class ByteArraySubscriber<T> implements TrustedSubscriber<T> {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   278
        private final Function<byte[], T> finisher;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   279
        private final CompletableFuture<T> result = new MinimalFuture<>();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   280
        private final List<ByteBuffer> received = new ArrayList<>();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   281
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   282
        private volatile Flow.Subscription subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   283
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   284
        public ByteArraySubscriber(Function<byte[],T> finisher) {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   285
            this.finisher = finisher;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   286
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   287
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   288
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   289
        public void onSubscribe(Flow.Subscription subscription) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   290
            if (this.subscription != null) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   291
                subscription.cancel();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   292
                return;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   293
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   294
            this.subscription = subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   295
            // We can handle whatever you've got
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   296
            subscription.request(Long.MAX_VALUE);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   297
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   298
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   299
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   300
        public void onNext(List<ByteBuffer> items) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   301
            // incoming buffers are allocated by http client internally,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   302
            // and won't be used anywhere except this place.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   303
            // So it's free simply to store them for further processing.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   304
            assert Utils.hasRemaining(items);
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   305
            received.addAll(items);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   306
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   307
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   308
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   309
        public void onError(Throwable throwable) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   310
            received.clear();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   311
            result.completeExceptionally(throwable);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   312
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   313
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   314
        static private byte[] join(List<ByteBuffer> bytes) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   315
            int size = Utils.remaining(bytes, Integer.MAX_VALUE);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   316
            byte[] res = new byte[size];
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   317
            int from = 0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   318
            for (ByteBuffer b : bytes) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   319
                int l = b.remaining();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   320
                b.get(res, from, l);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   321
                from += l;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   322
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   323
            return res;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   324
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   325
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   326
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   327
        public void onComplete() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   328
            try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   329
                result.complete(finisher.apply(join(received)));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   330
                received.clear();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   331
            } catch (IllegalArgumentException e) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   332
                result.completeExceptionally(e);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   333
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   334
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   335
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   336
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   337
        public CompletionStage<T> getBody() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   338
            return result;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   339
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   340
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   341
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   342
    /**
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   343
     * An InputStream built on top of the Flow API.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   344
     */
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   345
    public static class HttpResponseInputStream extends InputStream
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   346
        implements TrustedSubscriber<InputStream>
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   347
    {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   348
        final static int MAX_BUFFERS_IN_QUEUE = 1;  // lock-step with the producer
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   349
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   350
        // An immutable ByteBuffer sentinel to mark that the last byte was received.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   351
        private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   352
        private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER);
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   353
        private static final Logger debug =
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   354
                Utils.getDebugLogger("HttpResponseInputStream"::toString, Utils.DEBUG);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   355
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   356
        // A queue of yet unprocessed ByteBuffers received from the flow API.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   357
        private final BlockingQueue<List<ByteBuffer>> buffers;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   358
        private volatile Flow.Subscription subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   359
        private volatile boolean closed;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   360
        private volatile Throwable failed;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   361
        private volatile Iterator<ByteBuffer> currentListItr;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   362
        private volatile ByteBuffer currentBuffer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   363
        private final AtomicBoolean subscribed = new AtomicBoolean();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   364
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   365
        public HttpResponseInputStream() {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   366
            this(MAX_BUFFERS_IN_QUEUE);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   367
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   368
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   369
        HttpResponseInputStream(int maxBuffers) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   370
            int capacity = (maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   371
            // 1 additional slot needed for LAST_LIST added by onComplete
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   372
            this.buffers = new ArrayBlockingQueue<>(capacity + 1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   373
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   374
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   375
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   376
        public CompletionStage<InputStream> getBody() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   377
            // Returns the stream immediately, before the
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   378
            // response body is received.
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   379
            // This makes it possible for sendAsync().get().body()
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   380
            // to complete before the response body is received.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   381
            return CompletableFuture.completedStage(this);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   382
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   383
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   384
        // Returns the current byte buffer to read from.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   385
        // If the current buffer has no remaining data, this method will take the
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   386
        // next buffer from the buffers queue, possibly blocking until
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   387
        // a new buffer is made available through the Flow API, or the
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   388
        // end of the flow has been reached.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   389
        private ByteBuffer current() throws IOException {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   390
            while (currentBuffer == null || !currentBuffer.hasRemaining()) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   391
                // Check whether the stream is closed or exhausted
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   392
                if (closed || failed != null) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   393
                    throw new IOException("closed", failed);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   394
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   395
                if (currentBuffer == LAST_BUFFER) break;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   396
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   397
                try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   398
                    if (currentListItr == null || !currentListItr.hasNext()) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   399
                        // Take a new list of buffers from the queue, blocking
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   400
                        // if none is available yet...
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   401
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   402
                        if (debug.on()) debug.log("Taking list of Buffers");
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   403
                        List<ByteBuffer> lb = buffers.take();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   404
                        currentListItr = lb.iterator();
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   405
                        if (debug.on()) debug.log("List of Buffers Taken");
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   406
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   407
                        // Check whether an exception was encountered upstream
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   408
                        if (closed || failed != null)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   409
                            throw new IOException("closed", failed);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   410
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   411
                        // Check whether we're done.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   412
                        if (lb == LAST_LIST) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   413
                            currentListItr = null;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   414
                            currentBuffer = LAST_BUFFER;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   415
                            break;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   416
                        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   417
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   418
                        // Request another upstream item ( list of buffers )
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   419
                        Flow.Subscription s = subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   420
                        if (s != null) {
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   421
                            if (debug.on()) debug.log("Increased demand by 1");
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   422
                            s.request(1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   423
                        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   424
                        assert currentListItr != null;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   425
                        if (lb.isEmpty()) continue;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   426
                    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   427
                    assert currentListItr != null;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   428
                    assert currentListItr.hasNext();
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   429
                    if (debug.on()) debug.log("Next Buffer");
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   430
                    currentBuffer = currentListItr.next();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   431
                } catch (InterruptedException ex) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   432
                    // continue
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   433
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   434
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   435
            assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   436
            return currentBuffer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   437
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   438
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   439
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   440
        public int read(byte[] bytes, int off, int len) throws IOException {
57685
e4cc5231ce2d 8228970: AssertionError in ResponseSubscribers$HttpResponseInputStream
dfuchs
parents: 55402
diff changeset
   441
            Objects.checkFromIndexSize(off, len, bytes.length);
e4cc5231ce2d 8228970: AssertionError in ResponseSubscribers$HttpResponseInputStream
dfuchs
parents: 55402
diff changeset
   442
            if (len == 0) {
e4cc5231ce2d 8228970: AssertionError in ResponseSubscribers$HttpResponseInputStream
dfuchs
parents: 55402
diff changeset
   443
                return 0;
e4cc5231ce2d 8228970: AssertionError in ResponseSubscribers$HttpResponseInputStream
dfuchs
parents: 55402
diff changeset
   444
            }
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   445
            // get the buffer to read from, possibly blocking if
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   446
            // none is available
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   447
            ByteBuffer buffer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   448
            if ((buffer = current()) == LAST_BUFFER) return -1;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   449
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   450
            // don't attempt to read more than what is available
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   451
            // in the current buffer.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   452
            int read = Math.min(buffer.remaining(), len);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   453
            assert read > 0 && read <= buffer.remaining();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   454
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   455
            // buffer.get() will do the boundary check for us.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   456
            buffer.get(bytes, off, read);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   457
            return read;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   458
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   459
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   460
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   461
        public int read() throws IOException {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   462
            ByteBuffer buffer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   463
            if ((buffer = current()) == LAST_BUFFER) return -1;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   464
            return buffer.get() & 0xFF;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   465
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   466
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   467
        @Override
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   468
        public int available() throws IOException {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   469
            // best effort: returns the number of remaining bytes in
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   470
            // the current buffer if any, or 1 if the current buffer
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   471
            // is null or empty but the queue or current buffer list
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   472
            // are not empty. Returns 0 otherwise.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   473
            if (closed) return 0;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   474
            int available = 0;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   475
            ByteBuffer current = currentBuffer;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   476
            if (current == LAST_BUFFER) return 0;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   477
            if (current != null) available = current.remaining();
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   478
            if (available != 0) return available;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   479
            Iterator<?> iterator = currentListItr;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   480
            if (iterator != null && iterator.hasNext()) return 1;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   481
            if (buffers.isEmpty()) return 0;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   482
            return 1;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   483
        }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   484
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   485
        @Override
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   486
        public void onSubscribe(Flow.Subscription s) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   487
            Objects.requireNonNull(s);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   488
            try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   489
                if (!subscribed.compareAndSet(false, true)) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   490
                    s.cancel();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   491
                } else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   492
                    // check whether the stream is already closed.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   493
                    // if so, we should cancel the subscription
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   494
                    // immediately.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   495
                    boolean closed;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   496
                    synchronized (this) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   497
                        closed = this.closed;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   498
                        if (!closed) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   499
                            this.subscription = s;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   500
                        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   501
                    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   502
                    if (closed) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   503
                        s.cancel();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   504
                        return;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   505
                    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   506
                    assert buffers.remainingCapacity() > 1; // should contain at least 2
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   507
                    if (debug.on())
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   508
                        debug.log("onSubscribe: requesting "
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   509
                                  + Math.max(1, buffers.remainingCapacity() - 1));
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   510
                    s.request(Math.max(1, buffers.remainingCapacity() - 1));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   511
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   512
            } catch (Throwable t) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   513
                failed = t;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   514
                try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   515
                    close();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   516
                } catch (IOException x) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   517
                    // OK
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   518
                } finally {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   519
                    onError(t);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   520
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   521
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   522
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   523
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   524
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   525
        public void onNext(List<ByteBuffer> t) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   526
            Objects.requireNonNull(t);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   527
            try {
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   528
                if (debug.on()) debug.log("next item received");
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   529
                if (!buffers.offer(t)) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   530
                    throw new IllegalStateException("queue is full");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   531
                }
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   532
                if (debug.on()) debug.log("item offered");
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   533
            } catch (Throwable ex) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   534
                failed = ex;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   535
                try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   536
                    close();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   537
                } catch (IOException ex1) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   538
                    // OK
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   539
                } finally {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   540
                    onError(ex);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   541
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   542
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   543
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   544
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   545
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   546
        public void onError(Throwable thrwbl) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   547
            subscription = null;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   548
            failed = Objects.requireNonNull(thrwbl);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   549
            // The client process that reads the input stream might
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   550
            // be blocked in queue.take().
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   551
            // Tries to offer LAST_LIST to the queue. If the queue is
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   552
            // full we don't care if we can't insert this buffer, as
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   553
            // the client can't be blocked in queue.take() in that case.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   554
            // Adding LAST_LIST to the queue is harmless, as the client
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   555
            // should find failed != null before handling LAST_LIST.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   556
            buffers.offer(LAST_LIST);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   557
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   558
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   559
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   560
        public void onComplete() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   561
            subscription = null;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   562
            onNext(LAST_LIST);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   563
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   564
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   565
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   566
        public void close() throws IOException {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   567
            Flow.Subscription s;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   568
            synchronized (this) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   569
                if (closed) return;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   570
                closed = true;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   571
                s = subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   572
                subscription = null;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   573
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   574
            // s will be null if already completed
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   575
            try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   576
                if (s != null) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   577
                    s.cancel();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   578
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   579
            } finally {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   580
                buffers.offer(LAST_LIST);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   581
                super.close();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   582
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   583
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   584
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   585
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   586
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   587
    public static BodySubscriber<Stream<String>> createLineStream() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   588
        return createLineStream(UTF_8);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   589
    }
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   590
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   591
    public static BodySubscriber<Stream<String>> createLineStream(Charset charset) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   592
        Objects.requireNonNull(charset);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   593
        BodySubscriber<InputStream> s = new HttpResponseInputStream();
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   594
        // Creates a MappingSubscriber with a trusted finisher that is
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   595
        // trusted not to block.
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   596
        return new MappingSubscriber<InputStream,Stream<String>>(s,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   597
            (InputStream stream) -> {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   598
                return new BufferedReader(new InputStreamReader(stream, charset))
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   599
                            .lines().onClose(() -> Utils.close(stream));
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   600
            }, true);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   601
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   602
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   603
    /**
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   604
     * Currently this consumes all of the data and ignores it
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   605
     */
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   606
    public static class NullSubscriber<T> implements TrustedSubscriber<T> {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   607
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   608
        private final CompletableFuture<T> cf = new MinimalFuture<>();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   609
        private final Optional<T> result;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   610
        private final AtomicBoolean subscribed = new AtomicBoolean();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   611
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   612
        public NullSubscriber(Optional<T> result) {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   613
            this.result = result;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   614
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   615
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   616
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   617
        public void onSubscribe(Flow.Subscription subscription) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   618
            Objects.requireNonNull(subscription);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   619
            if (!subscribed.compareAndSet(false, true)) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   620
                subscription.cancel();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   621
            } else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   622
                subscription.request(Long.MAX_VALUE);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   623
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   624
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   625
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   626
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   627
        public void onNext(List<ByteBuffer> items) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   628
            Objects.requireNonNull(items);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   629
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   630
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   631
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   632
        public void onError(Throwable throwable) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   633
            Objects.requireNonNull(throwable);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   634
            cf.completeExceptionally(throwable);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   635
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   636
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   637
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   638
        public void onComplete() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   639
            if (result.isPresent()) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   640
                cf.complete(result.get());
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   641
            } else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   642
                cf.complete(null);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   643
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   644
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   645
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   646
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   647
        public CompletionStage<T> getBody() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   648
            return cf;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   649
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   650
    }
48408
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   651
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   652
    /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   653
    public static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R>
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   654
        implements TrustedSubscriber<R>
48408
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   655
    {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   656
        private final CompletableFuture<R> cf = new MinimalFuture<>();
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   657
        private final S subscriber;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   658
        private final Function<? super S,? extends R> finisher;
48408
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   659
        private volatile Subscription subscription;
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   660
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   661
        // The finisher isn't called until all bytes have been received,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   662
        // and so shouldn't need an executor. No need to override
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   663
        // TrustedSubscriber::needsExecutor
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   664
        public SubscriberAdapter(S subscriber, Function<? super S,? extends R> finisher) {
48408
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   665
            this.subscriber = Objects.requireNonNull(subscriber);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   666
            this.finisher = Objects.requireNonNull(finisher);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   667
        }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   668
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   669
        @Override
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   670
        public void onSubscribe(Subscription subscription) {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   671
            Objects.requireNonNull(subscription);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   672
            if (this.subscription != null) {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   673
                subscription.cancel();
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   674
            } else {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   675
                this.subscription = subscription;
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   676
                subscriber.onSubscribe(subscription);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   677
            }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   678
        }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   679
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   680
        @Override
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   681
        public void onNext(List<ByteBuffer> item) {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   682
            Objects.requireNonNull(item);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   683
            try {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   684
                subscriber.onNext(item);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   685
            } catch (Throwable throwable) {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   686
                subscription.cancel();
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   687
                onError(throwable);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   688
            }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   689
        }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   690
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   691
        @Override
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   692
        public void onError(Throwable throwable) {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   693
            Objects.requireNonNull(throwable);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   694
            try {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   695
                subscriber.onError(throwable);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   696
            } finally {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   697
                cf.completeExceptionally(throwable);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   698
            }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   699
        }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   700
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   701
        @Override
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   702
        public void onComplete() {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   703
            try {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   704
                subscriber.onComplete();
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   705
            } finally {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   706
                try {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   707
                    cf.complete(finisher.apply(subscriber));
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   708
                } catch (Throwable throwable) {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   709
                    cf.completeExceptionally(throwable);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   710
                }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   711
            }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   712
        }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   713
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   714
        @Override
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   715
        public CompletionStage<R> getBody() {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   716
            return cf;
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   717
        }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   718
    }
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   719
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   720
    /**
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   721
     * A body subscriber which receives input from an upstream subscriber
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   722
     * and maps that subscriber's body type to a new type. The upstream subscriber
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   723
     * delegates all flow operations directly to this object. The
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   724
     * {@link CompletionStage} returned by {@link #getBody()}} takes the output
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   725
     * of the upstream {@code getBody()} and applies the mapper function to
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   726
     * obtain the new {@code CompletionStage} type.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   727
     *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   728
     * @param <T> the upstream body type
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   729
     * @param <U> this subscriber's body type
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   730
     */
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   731
    public static class MappingSubscriber<T,U> implements TrustedSubscriber<U> {
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   732
        private final BodySubscriber<T> upstream;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   733
        private final Function<? super T,? extends U> mapper;
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   734
        private final boolean trusted;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   735
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   736
        public MappingSubscriber(BodySubscriber<T> upstream,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   737
                                 Function<? super T,? extends U> mapper) {
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   738
            this(upstream, mapper, false);
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   739
        }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   740
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   741
        // creates a MappingSubscriber with a mapper that is trusted
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   742
        // to not block when called.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   743
        MappingSubscriber(BodySubscriber<T> upstream,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   744
                          Function<? super T,? extends U> mapper,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   745
                          boolean trusted) {
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   746
            this.upstream = Objects.requireNonNull(upstream);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   747
            this.mapper = Objects.requireNonNull(mapper);
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   748
            this.trusted = trusted;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   749
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   750
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   751
        // There is no way to know whether a custom mapper function
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   752
        // might block or not - so we should return true unless the
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   753
        // mapper is implemented and trusted by our own code not to
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   754
        // block.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   755
        @Override
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   756
        public boolean needsExecutor() {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   757
            return !trusted || TrustedSubscriber.needsExecutor(upstream);
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   758
        }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   759
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   760
        // If upstream.getBody() is already completed (case of InputStream),
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   761
        // then calling upstream.getBody().thenApply(mapper) might block
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   762
        // if the mapper blocks. We should probably add a variant of
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   763
        // MappingSubscriber that calls thenApplyAsync instead, but this
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   764
        // needs a new public API point. See needsExecutor() above.
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   765
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   766
        public CompletionStage<U> getBody() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   767
            return upstream.getBody().thenApply(mapper);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   768
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   769
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   770
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   771
        public void onSubscribe(Flow.Subscription subscription) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   772
            upstream.onSubscribe(subscription);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   773
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   774
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   775
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   776
        public void onNext(List<ByteBuffer> item) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   777
            upstream.onNext(item);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   778
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   779
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   780
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   781
        public void onError(Throwable throwable) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   782
            upstream.onError(throwable);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   783
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   784
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   785
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   786
        public void onComplete() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   787
            upstream.onComplete();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   788
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   789
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   790
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   791
    // A BodySubscriber that returns a Publisher<List<ByteBuffer>>
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   792
    static class PublishingBodySubscriber
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   793
            implements TrustedSubscriber<Flow.Publisher<List<ByteBuffer>>> {
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   794
        private final MinimalFuture<Flow.Subscription>
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   795
                subscriptionCF = new MinimalFuture<>();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   796
        private final MinimalFuture<SubscriberRef>
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   797
                subscribedCF = new MinimalFuture<>();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   798
        private AtomicReference<SubscriberRef>
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   799
                subscriberRef = new AtomicReference<>();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   800
        private final CompletionStage<Flow.Publisher<List<ByteBuffer>>> body =
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   801
                subscriptionCF.thenCompose(
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   802
                        (s) -> MinimalFuture.completedFuture(this::subscribe));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   803
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   804
        // We use the completionCF to ensure that only one of
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   805
        // onError or onComplete is ever called.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   806
        private final MinimalFuture<Void> completionCF;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   807
        private PublishingBodySubscriber() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   808
            completionCF = new MinimalFuture<>();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   809
            completionCF.whenComplete(
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   810
                    (r,t) -> subscribedCF.thenAccept( s -> complete(s, t)));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   811
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   812
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   813
        // An object that holds a reference to a Flow.Subscriber.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   814
        // The reference is cleared when the subscriber is completed - either
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   815
        // normally or exceptionally, or when the subscription is cancelled.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   816
        static final class SubscriberRef {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   817
            volatile Flow.Subscriber<? super List<ByteBuffer>> ref;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   818
            SubscriberRef(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   819
                ref = subscriber;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   820
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   821
            Flow.Subscriber<? super List<ByteBuffer>> get() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   822
                return ref;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   823
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   824
            Flow.Subscriber<? super List<ByteBuffer>> clear() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   825
                Flow.Subscriber<? super List<ByteBuffer>> res = ref;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   826
                ref = null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   827
                return res;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   828
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   829
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   830
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   831
        // A subscription that wraps an upstream subscription and
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   832
        // holds a reference to a subscriber. The subscriber reference
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   833
        // is cleared when the subscription is cancelled
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   834
        final static class SubscriptionRef implements Flow.Subscription {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   835
            final Flow.Subscription subscription;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   836
            final SubscriberRef subscriberRef;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   837
            SubscriptionRef(Flow.Subscription subscription,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   838
                            SubscriberRef subscriberRef) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   839
                this.subscription = subscription;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   840
                this.subscriberRef = subscriberRef;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   841
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   842
            @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   843
            public void request(long n) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   844
                if (subscriberRef.get() != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   845
                    subscription.request(n);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   846
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   847
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   848
            @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   849
            public void cancel() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   850
                subscription.cancel();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   851
                subscriberRef.clear();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   852
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   853
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   854
            void subscribe() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   855
                Subscriber<?> subscriber = subscriberRef.get();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   856
                if (subscriber != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   857
                    subscriber.onSubscribe(this);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   858
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   859
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   860
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   861
            @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   862
            public String toString() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   863
                return "SubscriptionRef/"
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   864
                        + subscription.getClass().getName()
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   865
                        + "@"
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   866
                        + System.identityHashCode(subscription);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   867
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   868
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   869
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   870
        // This is a callback for the subscribedCF.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   871
        // Do not call directly!
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   872
        private void complete(SubscriberRef ref, Throwable t) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   873
            assert ref != null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   874
            Subscriber<?> s = ref.clear();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   875
            // maybe null if subscription was cancelled
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   876
            if (s == null) return;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   877
            if (t == null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   878
                try {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   879
                    s.onComplete();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   880
                } catch (Throwable x) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   881
                    s.onError(x);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   882
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   883
            } else {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   884
                s.onError(t);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   885
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   886
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   887
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   888
        private void signalError(Throwable err) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   889
            if (err == null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   890
                err = new NullPointerException("null throwable");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   891
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   892
            completionCF.completeExceptionally(err);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   893
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   894
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   895
        private void signalComplete() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   896
            completionCF.complete(null);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   897
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   898
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   899
        private void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   900
            Objects.requireNonNull(subscriber, "subscriber must not be null");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   901
            SubscriberRef ref = new SubscriberRef(subscriber);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   902
            if (subscriberRef.compareAndSet(null, ref)) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   903
                subscriptionCF.thenAccept((s) -> {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   904
                    SubscriptionRef subscription = new SubscriptionRef(s,ref);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   905
                    try {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   906
                        subscription.subscribe();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   907
                        subscribedCF.complete(ref);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   908
                    } catch (Throwable t) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   909
                        if (Log.errors()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   910
                            Log.logError("Failed to call onSubscribe: " +
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   911
                                    "cancelling subscription: " + t);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   912
                            Log.logError(t);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   913
                        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   914
                        subscription.cancel();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   915
                    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   916
                });
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   917
            } else {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   918
                subscriber.onSubscribe(new Flow.Subscription() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   919
                    @Override public void request(long n) { }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   920
                    @Override public void cancel() { }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   921
                });
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   922
                subscriber.onError(new IllegalStateException(
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   923
                        "This publisher has already one subscriber"));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   924
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   925
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   926
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   927
        private final AtomicBoolean subscribed = new AtomicBoolean();
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   928
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   929
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   930
        public void onSubscribe(Flow.Subscription subscription) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   931
            Objects.requireNonNull(subscription);
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   932
            if (!subscribed.compareAndSet(false, true)) {
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   933
                subscription.cancel();
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   934
            } else {
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   935
                subscriptionCF.complete(subscription);
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   936
            }
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   937
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   938
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   939
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   940
        public void onNext(List<ByteBuffer> item) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   941
            Objects.requireNonNull(item);
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   942
            try {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   943
                // cannot be called before onSubscribe()
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   944
                assert subscriptionCF.isDone();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   945
                SubscriberRef ref = subscriberRef.get();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   946
                // cannot be called before subscriber calls request(1)
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   947
                assert ref != null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   948
                Flow.Subscriber<? super List<ByteBuffer>>
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   949
                        subscriber = ref.get();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   950
                if (subscriber != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   951
                    // may be null if subscription was cancelled.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   952
                    subscriber.onNext(item);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   953
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   954
            } catch (Throwable err) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   955
                signalError(err);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   956
                subscriptionCF.thenAccept(s -> s.cancel());
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   957
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   958
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   959
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   960
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   961
        public void onError(Throwable throwable) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   962
            // cannot be called before onSubscribe();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   963
            assert suppress(subscriptionCF.isDone(),
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   964
                    "onError called before onSubscribe",
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   965
                    throwable);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   966
            // onError can be called before request(1), and therefore can
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   967
            // be called before subscriberRef is set.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   968
            signalError(throwable);
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   969
            Objects.requireNonNull(throwable);
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   970
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   971
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   972
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   973
        public void onComplete() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   974
            // cannot be called before onSubscribe()
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   975
            if (!subscriptionCF.isDone()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   976
                signalError(new InternalError(
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   977
                        "onComplete called before onSubscribed"));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   978
            } else {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   979
                // onComplete can be called before request(1),
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   980
                // and therefore can be called before subscriberRef
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   981
                // is set.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   982
                signalComplete();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   983
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   984
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   985
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   986
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   987
        public CompletionStage<Flow.Publisher<List<ByteBuffer>>> getBody() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   988
            return body;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   989
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   990
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   991
        private boolean suppress(boolean condition,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   992
                                 String assertion,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   993
                                 Throwable carrier) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   994
            if (!condition) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   995
                if (carrier != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   996
                    carrier.addSuppressed(new AssertionError(assertion));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   997
                } else if (Log.errors()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   998
                    Log.logError(new AssertionError(assertion));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   999
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1000
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1001
            return true;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1002
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1003
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1004
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1005
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1006
    public static BodySubscriber<Flow.Publisher<List<ByteBuffer>>>
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1007
    createPublisher() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1008
        return new PublishingBodySubscriber();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1009
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1010
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1011
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1012
    /**
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1013
     * Tries to determine whether bs::getBody must be invoked asynchronously,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1014
     * and if so, uses the provided executor to do it.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1015
     * If the executor is a {@link HttpClientImpl.DelegatingExecutor},
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1016
     * uses the executor's delegate.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1017
     * @param e    The executor to use if an executor is required.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1018
     * @param bs   The BodySubscriber (trusted or not)
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1019
     * @param <T>  The type of the response.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1020
     * @return A completion stage that completes when the completion
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1021
     *         stage returned by bs::getBody completes. This may, or
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1022
     *         may not, be the same completion stage.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1023
     */
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1024
    public static <T> CompletionStage<T> getBodyAsync(Executor e, BodySubscriber<T> bs) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1025
        if (TrustedSubscriber.needsExecutor(bs)) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1026
            // getBody must be called in the executor
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1027
            return getBodyAsync(e, bs, new MinimalFuture<>());
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1028
        } else {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1029
            // No executor needed
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1030
            return bs.getBody();
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1031
        }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1032
    }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1033
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1034
    /**
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1035
     * Invokes bs::getBody using the provided executor.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1036
     * If invoking bs::getBody requires an executor, and the given executor
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1037
     * is a {@link HttpClientImpl.DelegatingExecutor}, then the executor's
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1038
     * delegate is used. If an error occurs anywhere then the given {code cf}
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1039
     * is completed exceptionally (this method does not throw).
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1040
     * @param e   The executor that should be used to call bs::getBody
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1041
     * @param bs  The BodySubscriber
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1042
     * @param cf  A completable future that this function will set up
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1043
     *            to complete when the completion stage returned by
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1044
     *            bs::getBody completes.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1045
     *            In case of any error while trying to set up the
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1046
     *            completion chain, {@code cf} will be completed
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1047
     *            exceptionally with that error.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1048
     * @param <T> The response type.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1049
     * @return The provided {@code cf}.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1050
     */
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1051
    public static <T> CompletableFuture<T> getBodyAsync(Executor e,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1052
                                                      BodySubscriber<T> bs,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1053
                                                      CompletableFuture<T> cf) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1054
        return getBodyAsync(e, bs, cf, cf::completeExceptionally);
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1055
    }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1056
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1057
    /**
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1058
     * Invokes bs::getBody using the provided executor.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1059
     * If invoking bs::getBody requires an executor, and the given executor
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1060
     * is a {@link HttpClientImpl.DelegatingExecutor}, then the executor's
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1061
     * delegate is used.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1062
     * The provided {@code cf} is completed with the result (exceptional
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1063
     * or not) of the completion stage returned by bs::getBody.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1064
     * If an error occurs when trying to set up the
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1065
     * completion chain, the provided {@code errorHandler} is invoked,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1066
     * but {@code cf} is not necessarily affected.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1067
     * This method does not throw.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1068
     * @param e   The executor that should be used to call bs::getBody
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1069
     * @param bs  The BodySubscriber
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1070
     * @param cf  A completable future that this function will set up
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1071
     *            to complete when the completion stage returned by
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1072
     *            bs::getBody completes.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1073
     *            In case of any error while trying to set up the
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1074
     *            completion chain, {@code cf} will be completed
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1075
     *            exceptionally with that error.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1076
     * @param errorHandler The handler to invoke if an error is raised
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1077
     *                     while trying to set up the completion chain.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1078
     * @param <T> The response type.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1079
     * @return The provide {@code cf}. If the {@code errorHandler} is
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1080
     * invoked, it is the responsibility of the {@code errorHandler} to
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1081
     * complete the {@code cf}, if needed.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1082
     */
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1083
    public static <T> CompletableFuture<T> getBodyAsync(Executor e,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1084
                                                      BodySubscriber<T> bs,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1085
                                                      CompletableFuture<T> cf,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1086
                                                      Consumer<Throwable> errorHandler) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1087
        assert errorHandler != null;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1088
        try {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1089
            assert e != null;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1090
            assert cf != null;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1091
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1092
            if (TrustedSubscriber.needsExecutor(bs)) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1093
                e = (e instanceof HttpClientImpl.DelegatingExecutor)
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1094
                        ? ((HttpClientImpl.DelegatingExecutor) e).delegate() : e;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1095
            }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1096
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1097
            e.execute(() -> {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1098
                try {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1099
                    bs.getBody().whenComplete((r, t) -> {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1100
                        if (t != null) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1101
                            cf.completeExceptionally(t);
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1102
                        } else {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1103
                            cf.complete(r);
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1104
                        }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1105
                    });
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1106
                } catch (Throwable t) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1107
                    errorHandler.accept(t);
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1108
                }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1109
            });
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1110
            return cf;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1111
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1112
        } catch (Throwable t) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1113
            errorHandler.accept(t);
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1114
        }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1115
        return cf;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1116
    }
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
  1117
}