src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java
author chegar
Tue, 18 Jun 2019 14:52:36 +0100
changeset 55402 b78af6d8a252
parent 53467 97cf88608d76
child 57685 e4cc5231ce2d
permissions -rw-r--r--
8225583: Examine the HttpResponse.BodySubscribers for null handling Reviewed-by: dfuchs, prappo
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     1
/*
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
     2
 * Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     4
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     7
 * published by the Free Software Foundation.  Oracle designates this
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     8
 * particular file as subject to the "Classpath" exception as provided
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
     9
 * by Oracle in the LICENSE file that accompanied this code.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    10
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    11
 * This code is distributed in the hope that it will be useful, but WITHOUT
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    12
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    13
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    14
 * version 2 for more details (a copy is included in the LICENSE file that
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    15
 * accompanied this code).
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    16
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    17
 * You should have received a copy of the GNU General Public License version
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    18
 * 2 along with this work; if not, write to the Free Software Foundation,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    19
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    20
 *
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    21
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    22
 * or visit www.oracle.com if you need additional information or have any
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    23
 * questions.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    24
 */
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    25
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    26
package jdk.internal.net.http;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    27
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    28
import java.io.BufferedReader;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    29
import java.io.FilePermission;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    30
import java.io.IOException;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    31
import java.io.InputStream;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    32
import java.io.InputStreamReader;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    33
import java.nio.ByteBuffer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    34
import java.nio.channels.FileChannel;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    35
import java.nio.charset.Charset;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    36
import java.nio.file.OpenOption;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    37
import java.nio.file.Path;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    38
import java.security.AccessController;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    39
import java.security.PrivilegedActionException;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    40
import java.security.PrivilegedExceptionAction;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    41
import java.util.ArrayList;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    42
import java.util.Iterator;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    43
import java.util.List;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    44
import java.util.Objects;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    45
import java.util.Optional;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    46
import java.util.concurrent.ArrayBlockingQueue;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    47
import java.util.concurrent.BlockingQueue;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    48
import java.util.concurrent.CompletableFuture;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    49
import java.util.concurrent.CompletionStage;
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    50
import java.util.concurrent.Executor;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    51
import java.util.concurrent.Flow;
48408
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
    52
import java.util.concurrent.Flow.Subscriber;
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
    53
import java.util.concurrent.Flow.Subscription;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    54
import java.util.concurrent.atomic.AtomicBoolean;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    55
import java.util.concurrent.atomic.AtomicReference;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    56
import java.util.function.Consumer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    57
import java.util.function.Function;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    58
import java.util.stream.Stream;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    59
import java.net.http.HttpResponse.BodySubscriber;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    60
import jdk.internal.net.http.common.Log;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    61
import jdk.internal.net.http.common.Logger;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    62
import jdk.internal.net.http.common.MinimalFuture;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    63
import jdk.internal.net.http.common.Utils;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    64
import static java.nio.charset.StandardCharsets.UTF_8;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    65
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
    66
public class ResponseSubscribers {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
    67
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    68
    /**
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    69
     * This interface is used by our BodySubscriber implementations to
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    70
     * declare whether calling getBody() inline is safe, or whether
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    71
     * it needs to be called asynchronously in an executor thread.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    72
     * Calling getBody() inline is usually safe except when it
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    73
     * might block - which can be the case if the BodySubscriber
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    74
     * is provided by custom code, or if it uses a finisher that
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    75
     * might be called and might block before the last bit is
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    76
     * received (for instance, if a mapping subscriber is used with
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    77
     * a mapper function that maps an InputStream to a GZIPInputStream,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    78
     * as the the constructor of GZIPInputStream calls read()).
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    79
     * @param <T> The response type.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    80
     */
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    81
    public interface TrustedSubscriber<T> extends BodySubscriber<T> {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    82
        /**
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    83
         * Returns true if getBody() should be called asynchronously.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    84
         * @implSpec The default implementation of this method returns
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    85
         *           false.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    86
         * @return true if getBody() should be called asynchronously.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    87
         */
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    88
        default boolean needsExecutor() { return false;}
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    89
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    90
        /**
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    91
         * Returns true if calling {@code bs::getBody} might block
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    92
         * and requires an executor.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    93
         *
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    94
         * @implNote
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    95
         * In particular this method returns
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    96
         * true if {@code bs} is not a {@code TrustedSubscriber}.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    97
         * If it is a {@code TrustedSubscriber}, it returns
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    98
         * {@code ((TrustedSubscriber) bs).needsExecutor()}.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
    99
         *
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   100
         * @param bs A BodySubscriber.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   101
         * @return true if calling {@code bs::getBody} requires using
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   102
         *         an executor.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   103
         */
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   104
        static boolean needsExecutor(BodySubscriber<?> bs) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   105
            if (bs instanceof TrustedSubscriber) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   106
                return ((TrustedSubscriber) bs).needsExecutor();
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   107
            } else return true;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   108
        }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   109
    }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   110
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   111
    public static class ConsumerSubscriber implements TrustedSubscriber<Void> {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   112
        private final Consumer<Optional<byte[]>> consumer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   113
        private Flow.Subscription subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   114
        private final CompletableFuture<Void> result = new MinimalFuture<>();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   115
        private final AtomicBoolean subscribed = new AtomicBoolean();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   116
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   117
        public ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) {
48379
5382baab8371 8193698: Null handling in BodyPublisher, BodyHandler, and BodySubscriber convenience static factory methods
chegar
parents: 48083
diff changeset
   118
            this.consumer = Objects.requireNonNull(consumer);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   119
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   120
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   121
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   122
        public CompletionStage<Void> getBody() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   123
            return result;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   124
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   125
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   126
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   127
        public void onSubscribe(Flow.Subscription subscription) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   128
            Objects.requireNonNull(subscription);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   129
            if (!subscribed.compareAndSet(false, true)) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   130
                subscription.cancel();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   131
            } else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   132
                this.subscription = subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   133
                subscription.request(1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   134
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   135
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   136
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   137
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   138
        public void onNext(List<ByteBuffer> items) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   139
            Objects.requireNonNull(items);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   140
            for (ByteBuffer item : items) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   141
                byte[] buf = new byte[item.remaining()];
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   142
                item.get(buf);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   143
                consumer.accept(Optional.of(buf));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   144
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   145
            subscription.request(1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   146
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   147
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   148
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   149
        public void onError(Throwable throwable) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   150
            Objects.requireNonNull(throwable);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   151
            result.completeExceptionally(throwable);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   152
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   153
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   154
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   155
        public void onComplete() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   156
            consumer.accept(Optional.empty());
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   157
            result.complete(null);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   158
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   159
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   160
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   161
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   162
    /**
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   163
     * A Subscriber that writes the flow of data to a given file.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   164
     *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   165
     * Privileged actions are performed within a limited doPrivileged that only
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   166
     * asserts the specific, write, file permissions that were checked during
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   167
     * the construction of this PathSubscriber.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   168
     */
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   169
    public static class PathSubscriber implements TrustedSubscriber<Path> {
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   170
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   171
        private static final FilePermission[] EMPTY_FILE_PERMISSIONS = new FilePermission[0];
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   172
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   173
        private final Path file;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   174
        private final OpenOption[] options;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   175
        private final FilePermission[] filePermissions;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   176
        private final CompletableFuture<Path> result = new MinimalFuture<>();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   177
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   178
        private final AtomicBoolean subscribed = new AtomicBoolean();
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   179
        private volatile Flow.Subscription subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   180
        private volatile FileChannel out;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   181
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   182
        private static final String pathForSecurityCheck(Path path) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   183
            return path.toFile().getPath();
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   184
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   185
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   186
        /**
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   187
         * Factory for creating PathSubscriber.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   188
         *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   189
         * Permission checks are performed here before construction of the
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   190
         * PathSubscriber. Permission checking and construction are deliberately
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   191
         * and tightly co-located.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   192
         */
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   193
        public static PathSubscriber create(Path file,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   194
                                            List<OpenOption> options) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   195
            FilePermission filePermission = null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   196
            SecurityManager sm = System.getSecurityManager();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   197
            if (sm != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   198
                String fn = pathForSecurityCheck(file);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   199
                FilePermission writePermission = new FilePermission(fn, "write");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   200
                sm.checkPermission(writePermission);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   201
                filePermission = writePermission;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   202
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   203
            return new PathSubscriber(file, options, filePermission);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   204
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   205
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   206
        // pp so handler implementations in the same package can construct
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   207
        /*package-private*/ PathSubscriber(Path file,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   208
                                           List<OpenOption> options,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   209
                                           FilePermission... filePermissions) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   210
            this.file = file;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   211
            this.options = options.stream().toArray(OpenOption[]::new);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   212
            this.filePermissions =
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   213
                    filePermissions == null ? EMPTY_FILE_PERMISSIONS : filePermissions;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   214
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   215
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   216
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   217
        public void onSubscribe(Flow.Subscription subscription) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   218
            Objects.requireNonNull(subscription);
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   219
            if (!subscribed.compareAndSet(false, true)) {
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   220
                subscription.cancel();
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   221
                return;
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   222
            }
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   223
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   224
            this.subscription = subscription;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   225
            if (System.getSecurityManager() == null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   226
                try {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   227
                    out = FileChannel.open(file, options);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   228
                } catch (IOException ioe) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   229
                    result.completeExceptionally(ioe);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   230
                    return;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   231
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   232
            } else {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   233
                try {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   234
                    PrivilegedExceptionAction<FileChannel> pa =
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   235
                            () -> FileChannel.open(file, options);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   236
                    out = AccessController.doPrivileged(pa, null, filePermissions);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   237
                } catch (PrivilegedActionException pae) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   238
                    Throwable t = pae.getCause() != null ? pae.getCause() : pae;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   239
                    result.completeExceptionally(t);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   240
                    subscription.cancel();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   241
                    return;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   242
                }
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   243
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   244
            subscription.request(1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   245
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   246
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   247
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   248
        public void onNext(List<ByteBuffer> items) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   249
            try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   250
                out.write(items.toArray(Utils.EMPTY_BB_ARRAY));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   251
            } catch (IOException ex) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   252
                Utils.close(out);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   253
                subscription.cancel();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   254
                result.completeExceptionally(ex);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   255
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   256
            subscription.request(1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   257
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   258
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   259
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   260
        public void onError(Throwable e) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   261
            result.completeExceptionally(e);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   262
            Utils.close(out);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   263
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   264
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   265
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   266
        public void onComplete() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   267
            Utils.close(out);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   268
            result.complete(file);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   269
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   270
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   271
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   272
        public CompletionStage<Path> getBody() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   273
            return result;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   274
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   275
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   276
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   277
    public static class ByteArraySubscriber<T> implements TrustedSubscriber<T> {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   278
        private final Function<byte[], T> finisher;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   279
        private final CompletableFuture<T> result = new MinimalFuture<>();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   280
        private final List<ByteBuffer> received = new ArrayList<>();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   281
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   282
        private volatile Flow.Subscription subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   283
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   284
        public ByteArraySubscriber(Function<byte[],T> finisher) {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   285
            this.finisher = finisher;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   286
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   287
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   288
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   289
        public void onSubscribe(Flow.Subscription subscription) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   290
            if (this.subscription != null) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   291
                subscription.cancel();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   292
                return;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   293
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   294
            this.subscription = subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   295
            // We can handle whatever you've got
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   296
            subscription.request(Long.MAX_VALUE);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   297
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   298
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   299
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   300
        public void onNext(List<ByteBuffer> items) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   301
            // incoming buffers are allocated by http client internally,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   302
            // and won't be used anywhere except this place.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   303
            // So it's free simply to store them for further processing.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   304
            assert Utils.hasRemaining(items);
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   305
            received.addAll(items);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   306
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   307
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   308
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   309
        public void onError(Throwable throwable) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   310
            received.clear();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   311
            result.completeExceptionally(throwable);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   312
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   313
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   314
        static private byte[] join(List<ByteBuffer> bytes) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   315
            int size = Utils.remaining(bytes, Integer.MAX_VALUE);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   316
            byte[] res = new byte[size];
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   317
            int from = 0;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   318
            for (ByteBuffer b : bytes) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   319
                int l = b.remaining();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   320
                b.get(res, from, l);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   321
                from += l;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   322
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   323
            return res;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   324
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   325
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   326
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   327
        public void onComplete() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   328
            try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   329
                result.complete(finisher.apply(join(received)));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   330
                received.clear();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   331
            } catch (IllegalArgumentException e) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   332
                result.completeExceptionally(e);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   333
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   334
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   335
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   336
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   337
        public CompletionStage<T> getBody() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   338
            return result;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   339
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   340
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   341
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   342
    /**
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   343
     * An InputStream built on top of the Flow API.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   344
     */
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   345
    public static class HttpResponseInputStream extends InputStream
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   346
        implements TrustedSubscriber<InputStream>
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   347
    {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   348
        final static int MAX_BUFFERS_IN_QUEUE = 1;  // lock-step with the producer
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   349
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   350
        // An immutable ByteBuffer sentinel to mark that the last byte was received.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   351
        private static final ByteBuffer LAST_BUFFER = ByteBuffer.wrap(new byte[0]);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   352
        private static final List<ByteBuffer> LAST_LIST = List.of(LAST_BUFFER);
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   353
        private static final Logger debug =
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   354
                Utils.getDebugLogger("HttpResponseInputStream"::toString, Utils.DEBUG);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   355
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   356
        // A queue of yet unprocessed ByteBuffers received from the flow API.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   357
        private final BlockingQueue<List<ByteBuffer>> buffers;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   358
        private volatile Flow.Subscription subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   359
        private volatile boolean closed;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   360
        private volatile Throwable failed;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   361
        private volatile Iterator<ByteBuffer> currentListItr;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   362
        private volatile ByteBuffer currentBuffer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   363
        private final AtomicBoolean subscribed = new AtomicBoolean();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   364
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   365
        public HttpResponseInputStream() {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   366
            this(MAX_BUFFERS_IN_QUEUE);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   367
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   368
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   369
        HttpResponseInputStream(int maxBuffers) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   370
            int capacity = (maxBuffers <= 0 ? MAX_BUFFERS_IN_QUEUE : maxBuffers);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   371
            // 1 additional slot needed for LAST_LIST added by onComplete
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   372
            this.buffers = new ArrayBlockingQueue<>(capacity + 1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   373
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   374
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   375
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   376
        public CompletionStage<InputStream> getBody() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   377
            // Returns the stream immediately, before the
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   378
            // response body is received.
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   379
            // This makes it possible for sendAsync().get().body()
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   380
            // to complete before the response body is received.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   381
            return CompletableFuture.completedStage(this);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   382
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   383
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   384
        // Returns the current byte buffer to read from.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   385
        // If the current buffer has no remaining data, this method will take the
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   386
        // next buffer from the buffers queue, possibly blocking until
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   387
        // a new buffer is made available through the Flow API, or the
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   388
        // end of the flow has been reached.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   389
        private ByteBuffer current() throws IOException {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   390
            while (currentBuffer == null || !currentBuffer.hasRemaining()) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   391
                // Check whether the stream is closed or exhausted
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   392
                if (closed || failed != null) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   393
                    throw new IOException("closed", failed);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   394
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   395
                if (currentBuffer == LAST_BUFFER) break;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   396
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   397
                try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   398
                    if (currentListItr == null || !currentListItr.hasNext()) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   399
                        // Take a new list of buffers from the queue, blocking
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   400
                        // if none is available yet...
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   401
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   402
                        if (debug.on()) debug.log("Taking list of Buffers");
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   403
                        List<ByteBuffer> lb = buffers.take();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   404
                        currentListItr = lb.iterator();
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   405
                        if (debug.on()) debug.log("List of Buffers Taken");
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   406
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   407
                        // Check whether an exception was encountered upstream
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   408
                        if (closed || failed != null)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   409
                            throw new IOException("closed", failed);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   410
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   411
                        // Check whether we're done.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   412
                        if (lb == LAST_LIST) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   413
                            currentListItr = null;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   414
                            currentBuffer = LAST_BUFFER;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   415
                            break;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   416
                        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   417
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   418
                        // Request another upstream item ( list of buffers )
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   419
                        Flow.Subscription s = subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   420
                        if (s != null) {
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   421
                            if (debug.on()) debug.log("Increased demand by 1");
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   422
                            s.request(1);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   423
                        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   424
                        assert currentListItr != null;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   425
                        if (lb.isEmpty()) continue;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   426
                    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   427
                    assert currentListItr != null;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   428
                    assert currentListItr.hasNext();
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   429
                    if (debug.on()) debug.log("Next Buffer");
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   430
                    currentBuffer = currentListItr.next();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   431
                } catch (InterruptedException ex) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   432
                    // continue
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   433
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   434
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   435
            assert currentBuffer == LAST_BUFFER || currentBuffer.hasRemaining();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   436
            return currentBuffer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   437
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   438
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   439
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   440
        public int read(byte[] bytes, int off, int len) throws IOException {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   441
            // get the buffer to read from, possibly blocking if
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   442
            // none is available
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   443
            ByteBuffer buffer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   444
            if ((buffer = current()) == LAST_BUFFER) return -1;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   445
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   446
            // don't attempt to read more than what is available
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   447
            // in the current buffer.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   448
            int read = Math.min(buffer.remaining(), len);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   449
            assert read > 0 && read <= buffer.remaining();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   450
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   451
            // buffer.get() will do the boundary check for us.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   452
            buffer.get(bytes, off, read);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   453
            return read;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   454
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   455
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   456
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   457
        public int read() throws IOException {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   458
            ByteBuffer buffer;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   459
            if ((buffer = current()) == LAST_BUFFER) return -1;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   460
            return buffer.get() & 0xFF;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   461
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   462
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   463
        @Override
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   464
        public int available() throws IOException {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   465
            // best effort: returns the number of remaining bytes in
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   466
            // the current buffer if any, or 1 if the current buffer
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   467
            // is null or empty but the queue or current buffer list
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   468
            // are not empty. Returns 0 otherwise.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   469
            if (closed) return 0;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   470
            int available = 0;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   471
            ByteBuffer current = currentBuffer;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   472
            if (current == LAST_BUFFER) return 0;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   473
            if (current != null) available = current.remaining();
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   474
            if (available != 0) return available;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   475
            Iterator<?> iterator = currentListItr;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   476
            if (iterator != null && iterator.hasNext()) return 1;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   477
            if (buffers.isEmpty()) return 0;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   478
            return 1;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   479
        }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   480
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   481
        @Override
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   482
        public void onSubscribe(Flow.Subscription s) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   483
            Objects.requireNonNull(s);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   484
            try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   485
                if (!subscribed.compareAndSet(false, true)) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   486
                    s.cancel();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   487
                } else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   488
                    // check whether the stream is already closed.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   489
                    // if so, we should cancel the subscription
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   490
                    // immediately.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   491
                    boolean closed;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   492
                    synchronized (this) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   493
                        closed = this.closed;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   494
                        if (!closed) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   495
                            this.subscription = s;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   496
                        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   497
                    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   498
                    if (closed) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   499
                        s.cancel();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   500
                        return;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   501
                    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   502
                    assert buffers.remainingCapacity() > 1; // should contain at least 2
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   503
                    if (debug.on())
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   504
                        debug.log("onSubscribe: requesting "
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   505
                                  + Math.max(1, buffers.remainingCapacity() - 1));
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   506
                    s.request(Math.max(1, buffers.remainingCapacity() - 1));
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   507
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   508
            } catch (Throwable t) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   509
                failed = t;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   510
                try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   511
                    close();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   512
                } catch (IOException x) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   513
                    // OK
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   514
                } finally {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   515
                    onError(t);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   516
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   517
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   518
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   519
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   520
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   521
        public void onNext(List<ByteBuffer> t) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   522
            Objects.requireNonNull(t);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   523
            try {
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   524
                if (debug.on()) debug.log("next item received");
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   525
                if (!buffers.offer(t)) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   526
                    throw new IllegalStateException("queue is full");
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   527
                }
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   528
                if (debug.on()) debug.log("item offered");
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   529
            } catch (Throwable ex) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   530
                failed = ex;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   531
                try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   532
                    close();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   533
                } catch (IOException ex1) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   534
                    // OK
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   535
                } finally {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   536
                    onError(ex);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   537
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   538
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   539
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   540
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   541
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   542
        public void onError(Throwable thrwbl) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   543
            subscription = null;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   544
            failed = Objects.requireNonNull(thrwbl);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   545
            // The client process that reads the input stream might
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   546
            // be blocked in queue.take().
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   547
            // Tries to offer LAST_LIST to the queue. If the queue is
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   548
            // full we don't care if we can't insert this buffer, as
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   549
            // the client can't be blocked in queue.take() in that case.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   550
            // Adding LAST_LIST to the queue is harmless, as the client
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   551
            // should find failed != null before handling LAST_LIST.
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   552
            buffers.offer(LAST_LIST);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   553
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   554
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   555
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   556
        public void onComplete() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   557
            subscription = null;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   558
            onNext(LAST_LIST);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   559
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   560
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   561
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   562
        public void close() throws IOException {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   563
            Flow.Subscription s;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   564
            synchronized (this) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   565
                if (closed) return;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   566
                closed = true;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   567
                s = subscription;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   568
                subscription = null;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   569
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   570
            // s will be null if already completed
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   571
            try {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   572
                if (s != null) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   573
                    s.cancel();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   574
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   575
            } finally {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   576
                buffers.offer(LAST_LIST);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   577
                super.close();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   578
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   579
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   580
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   581
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   582
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   583
    public static BodySubscriber<Stream<String>> createLineStream() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   584
        return createLineStream(UTF_8);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   585
    }
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   586
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   587
    public static BodySubscriber<Stream<String>> createLineStream(Charset charset) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   588
        Objects.requireNonNull(charset);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   589
        BodySubscriber<InputStream> s = new HttpResponseInputStream();
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   590
        // Creates a MappingSubscriber with a trusted finisher that is
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   591
        // trusted not to block.
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   592
        return new MappingSubscriber<InputStream,Stream<String>>(s,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   593
            (InputStream stream) -> {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   594
                return new BufferedReader(new InputStreamReader(stream, charset))
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   595
                            .lines().onClose(() -> Utils.close(stream));
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   596
            }, true);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   597
    }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   598
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   599
    /**
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   600
     * Currently this consumes all of the data and ignores it
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   601
     */
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   602
    public static class NullSubscriber<T> implements TrustedSubscriber<T> {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   603
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   604
        private final CompletableFuture<T> cf = new MinimalFuture<>();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   605
        private final Optional<T> result;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   606
        private final AtomicBoolean subscribed = new AtomicBoolean();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   607
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   608
        public NullSubscriber(Optional<T> result) {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   609
            this.result = result;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   610
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   611
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   612
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   613
        public void onSubscribe(Flow.Subscription subscription) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   614
            Objects.requireNonNull(subscription);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   615
            if (!subscribed.compareAndSet(false, true)) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   616
                subscription.cancel();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   617
            } else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   618
                subscription.request(Long.MAX_VALUE);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   619
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   620
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   621
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   622
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   623
        public void onNext(List<ByteBuffer> items) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   624
            Objects.requireNonNull(items);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   625
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   626
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   627
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   628
        public void onError(Throwable throwable) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   629
            Objects.requireNonNull(throwable);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   630
            cf.completeExceptionally(throwable);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   631
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   632
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   633
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   634
        public void onComplete() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   635
            if (result.isPresent()) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   636
                cf.complete(result.get());
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   637
            } else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   638
                cf.complete(null);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   639
            }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   640
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   641
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   642
        @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   643
        public CompletionStage<T> getBody() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   644
            return cf;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   645
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
   646
    }
48408
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   647
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   648
    /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   649
    public static final class SubscriberAdapter<S extends Subscriber<? super List<ByteBuffer>>,R>
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   650
        implements TrustedSubscriber<R>
48408
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   651
    {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   652
        private final CompletableFuture<R> cf = new MinimalFuture<>();
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   653
        private final S subscriber;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   654
        private final Function<? super S,? extends R> finisher;
48408
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   655
        private volatile Subscription subscription;
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   656
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   657
        // The finisher isn't called until all bytes have been received,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   658
        // and so shouldn't need an executor. No need to override
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   659
        // TrustedSubscriber::needsExecutor
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   660
        public SubscriberAdapter(S subscriber, Function<? super S,? extends R> finisher) {
48408
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   661
            this.subscriber = Objects.requireNonNull(subscriber);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   662
            this.finisher = Objects.requireNonNull(finisher);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   663
        }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   664
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   665
        @Override
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   666
        public void onSubscribe(Subscription subscription) {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   667
            Objects.requireNonNull(subscription);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   668
            if (this.subscription != null) {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   669
                subscription.cancel();
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   670
            } else {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   671
                this.subscription = subscription;
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   672
                subscriber.onSubscribe(subscription);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   673
            }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   674
        }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   675
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   676
        @Override
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   677
        public void onNext(List<ByteBuffer> item) {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   678
            Objects.requireNonNull(item);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   679
            try {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   680
                subscriber.onNext(item);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   681
            } catch (Throwable throwable) {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   682
                subscription.cancel();
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   683
                onError(throwable);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   684
            }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   685
        }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   686
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   687
        @Override
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   688
        public void onError(Throwable throwable) {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   689
            Objects.requireNonNull(throwable);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   690
            try {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   691
                subscriber.onError(throwable);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   692
            } finally {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   693
                cf.completeExceptionally(throwable);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   694
            }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   695
        }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   696
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   697
        @Override
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   698
        public void onComplete() {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   699
            try {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   700
                subscriber.onComplete();
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   701
            } finally {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   702
                try {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   703
                    cf.complete(finisher.apply(subscriber));
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   704
                } catch (Throwable throwable) {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   705
                    cf.completeExceptionally(throwable);
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   706
                }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   707
            }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   708
        }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   709
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   710
        @Override
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   711
        public CompletionStage<R> getBody() {
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   712
            return cf;
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   713
        }
4f830b447edf 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher
chegar
parents: 48379
diff changeset
   714
    }
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   715
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   716
    /**
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   717
     * A body subscriber which receives input from an upstream subscriber
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   718
     * and maps that subscriber's body type to a new type. The upstream subscriber
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   719
     * delegates all flow operations directly to this object. The
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   720
     * {@link CompletionStage} returned by {@link #getBody()}} takes the output
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   721
     * of the upstream {@code getBody()} and applies the mapper function to
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   722
     * obtain the new {@code CompletionStage} type.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   723
     *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   724
     * @param <T> the upstream body type
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   725
     * @param <U> this subscriber's body type
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   726
     */
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   727
    public static class MappingSubscriber<T,U> implements TrustedSubscriber<U> {
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   728
        private final BodySubscriber<T> upstream;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   729
        private final Function<? super T,? extends U> mapper;
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   730
        private final boolean trusted;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   731
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   732
        public MappingSubscriber(BodySubscriber<T> upstream,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   733
                                 Function<? super T,? extends U> mapper) {
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   734
            this(upstream, mapper, false);
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   735
        }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   736
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   737
        // creates a MappingSubscriber with a mapper that is trusted
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   738
        // to not block when called.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   739
        MappingSubscriber(BodySubscriber<T> upstream,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   740
                          Function<? super T,? extends U> mapper,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   741
                          boolean trusted) {
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   742
            this.upstream = Objects.requireNonNull(upstream);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   743
            this.mapper = Objects.requireNonNull(mapper);
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   744
            this.trusted = trusted;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   745
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   746
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   747
        // There is no way to know whether a custom mapper function
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   748
        // might block or not - so we should return true unless the
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   749
        // mapper is implemented and trusted by our own code not to
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   750
        // block.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   751
        @Override
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   752
        public boolean needsExecutor() {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   753
            return !trusted || TrustedSubscriber.needsExecutor(upstream);
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   754
        }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   755
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   756
        // If upstream.getBody() is already completed (case of InputStream),
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   757
        // then calling upstream.getBody().thenApply(mapper) might block
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   758
        // if the mapper blocks. We should probably add a variant of
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   759
        // MappingSubscriber that calls thenApplyAsync instead, but this
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   760
        // needs a new public API point. See needsExecutor() above.
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   761
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   762
        public CompletionStage<U> getBody() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   763
            return upstream.getBody().thenApply(mapper);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   764
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   766
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   767
        public void onSubscribe(Flow.Subscription subscription) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   768
            upstream.onSubscribe(subscription);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   769
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   770
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   771
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   772
        public void onNext(List<ByteBuffer> item) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   773
            upstream.onNext(item);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   774
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   775
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   776
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   777
        public void onError(Throwable throwable) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   778
            upstream.onError(throwable);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   779
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   780
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   781
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   782
        public void onComplete() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   783
            upstream.onComplete();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   784
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   785
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   786
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   787
    // A BodySubscriber that returns a Publisher<List<ByteBuffer>>
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   788
    static class PublishingBodySubscriber
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
   789
            implements TrustedSubscriber<Flow.Publisher<List<ByteBuffer>>> {
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   790
        private final MinimalFuture<Flow.Subscription>
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   791
                subscriptionCF = new MinimalFuture<>();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   792
        private final MinimalFuture<SubscriberRef>
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   793
                subscribedCF = new MinimalFuture<>();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   794
        private AtomicReference<SubscriberRef>
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   795
                subscriberRef = new AtomicReference<>();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   796
        private final CompletionStage<Flow.Publisher<List<ByteBuffer>>> body =
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   797
                subscriptionCF.thenCompose(
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   798
                        (s) -> MinimalFuture.completedFuture(this::subscribe));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   799
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   800
        // We use the completionCF to ensure that only one of
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   801
        // onError or onComplete is ever called.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   802
        private final MinimalFuture<Void> completionCF;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   803
        private PublishingBodySubscriber() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   804
            completionCF = new MinimalFuture<>();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   805
            completionCF.whenComplete(
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   806
                    (r,t) -> subscribedCF.thenAccept( s -> complete(s, t)));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   807
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   808
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   809
        // An object that holds a reference to a Flow.Subscriber.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   810
        // The reference is cleared when the subscriber is completed - either
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   811
        // normally or exceptionally, or when the subscription is cancelled.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   812
        static final class SubscriberRef {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   813
            volatile Flow.Subscriber<? super List<ByteBuffer>> ref;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   814
            SubscriberRef(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   815
                ref = subscriber;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   816
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   817
            Flow.Subscriber<? super List<ByteBuffer>> get() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   818
                return ref;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   819
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   820
            Flow.Subscriber<? super List<ByteBuffer>> clear() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   821
                Flow.Subscriber<? super List<ByteBuffer>> res = ref;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   822
                ref = null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   823
                return res;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   824
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   825
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   826
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   827
        // A subscription that wraps an upstream subscription and
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   828
        // holds a reference to a subscriber. The subscriber reference
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   829
        // is cleared when the subscription is cancelled
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   830
        final static class SubscriptionRef implements Flow.Subscription {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   831
            final Flow.Subscription subscription;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   832
            final SubscriberRef subscriberRef;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   833
            SubscriptionRef(Flow.Subscription subscription,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   834
                            SubscriberRef subscriberRef) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   835
                this.subscription = subscription;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   836
                this.subscriberRef = subscriberRef;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   837
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   838
            @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   839
            public void request(long n) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   840
                if (subscriberRef.get() != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   841
                    subscription.request(n);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   842
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   843
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   844
            @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   845
            public void cancel() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   846
                subscription.cancel();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   847
                subscriberRef.clear();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   848
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   849
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   850
            void subscribe() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   851
                Subscriber<?> subscriber = subscriberRef.get();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   852
                if (subscriber != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   853
                    subscriber.onSubscribe(this);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   854
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   855
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   856
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   857
            @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   858
            public String toString() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   859
                return "SubscriptionRef/"
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   860
                        + subscription.getClass().getName()
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   861
                        + "@"
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   862
                        + System.identityHashCode(subscription);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   863
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   864
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   865
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   866
        // This is a callback for the subscribedCF.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   867
        // Do not call directly!
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   868
        private void complete(SubscriberRef ref, Throwable t) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   869
            assert ref != null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   870
            Subscriber<?> s = ref.clear();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   871
            // maybe null if subscription was cancelled
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   872
            if (s == null) return;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   873
            if (t == null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   874
                try {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   875
                    s.onComplete();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   876
                } catch (Throwable x) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   877
                    s.onError(x);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   878
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   879
            } else {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   880
                s.onError(t);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   881
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   882
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   883
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   884
        private void signalError(Throwable err) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   885
            if (err == null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   886
                err = new NullPointerException("null throwable");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   887
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   888
            completionCF.completeExceptionally(err);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   889
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   890
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   891
        private void signalComplete() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   892
            completionCF.complete(null);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   893
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   894
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   895
        private void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   896
            Objects.requireNonNull(subscriber, "subscriber must not be null");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   897
            SubscriberRef ref = new SubscriberRef(subscriber);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   898
            if (subscriberRef.compareAndSet(null, ref)) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   899
                subscriptionCF.thenAccept((s) -> {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   900
                    SubscriptionRef subscription = new SubscriptionRef(s,ref);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   901
                    try {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   902
                        subscription.subscribe();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   903
                        subscribedCF.complete(ref);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   904
                    } catch (Throwable t) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   905
                        if (Log.errors()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   906
                            Log.logError("Failed to call onSubscribe: " +
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   907
                                    "cancelling subscription: " + t);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   908
                            Log.logError(t);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   909
                        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   910
                        subscription.cancel();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   911
                    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   912
                });
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   913
            } else {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   914
                subscriber.onSubscribe(new Flow.Subscription() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   915
                    @Override public void request(long n) { }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   916
                    @Override public void cancel() { }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   917
                });
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   918
                subscriber.onError(new IllegalStateException(
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   919
                        "This publisher has already one subscriber"));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   920
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   921
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   922
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   923
        private final AtomicBoolean subscribed = new AtomicBoolean();
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   924
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   925
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   926
        public void onSubscribe(Flow.Subscription subscription) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   927
            Objects.requireNonNull(subscription);
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   928
            if (!subscribed.compareAndSet(false, true)) {
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   929
                subscription.cancel();
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   930
            } else {
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   931
                subscriptionCF.complete(subscription);
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   932
            }
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   933
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   934
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   935
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   936
        public void onNext(List<ByteBuffer> item) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   937
            Objects.requireNonNull(item);
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   938
            try {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   939
                // cannot be called before onSubscribe()
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   940
                assert subscriptionCF.isDone();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   941
                SubscriberRef ref = subscriberRef.get();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   942
                // cannot be called before subscriber calls request(1)
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   943
                assert ref != null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   944
                Flow.Subscriber<? super List<ByteBuffer>>
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   945
                        subscriber = ref.get();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   946
                if (subscriber != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   947
                    // may be null if subscription was cancelled.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   948
                    subscriber.onNext(item);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   949
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   950
            } catch (Throwable err) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   951
                signalError(err);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   952
                subscriptionCF.thenAccept(s -> s.cancel());
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   953
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   954
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   955
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   956
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   957
        public void onError(Throwable throwable) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   958
            // cannot be called before onSubscribe();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   959
            assert suppress(subscriptionCF.isDone(),
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   960
                    "onError called before onSubscribe",
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   961
                    throwable);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   962
            // onError can be called before request(1), and therefore can
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   963
            // be called before subscriberRef is set.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   964
            signalError(throwable);
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 53467
diff changeset
   965
            Objects.requireNonNull(throwable);
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   966
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   967
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   968
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   969
        public void onComplete() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   970
            // cannot be called before onSubscribe()
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   971
            if (!subscriptionCF.isDone()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   972
                signalError(new InternalError(
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   973
                        "onComplete called before onSubscribed"));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   974
            } else {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   975
                // onComplete can be called before request(1),
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   976
                // and therefore can be called before subscriberRef
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   977
                // is set.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   978
                signalComplete();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   979
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   980
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   981
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   982
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   983
        public CompletionStage<Flow.Publisher<List<ByteBuffer>>> getBody() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   984
            return body;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   985
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   986
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   987
        private boolean suppress(boolean condition,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   988
                                 String assertion,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   989
                                 Throwable carrier) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   990
            if (!condition) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   991
                if (carrier != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   992
                    carrier.addSuppressed(new AssertionError(assertion));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   993
                } else if (Log.errors()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   994
                    Log.logError(new AssertionError(assertion));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   995
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   996
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   997
            return true;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   998
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
   999
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1000
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1001
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1002
    public static BodySubscriber<Flow.Publisher<List<ByteBuffer>>>
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1003
    createPublisher() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1004
        return new PublishingBodySubscriber();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1005
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48408
diff changeset
  1006
53467
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1007
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1008
    /**
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1009
     * Tries to determine whether bs::getBody must be invoked asynchronously,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1010
     * and if so, uses the provided executor to do it.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1011
     * If the executor is a {@link HttpClientImpl.DelegatingExecutor},
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1012
     * uses the executor's delegate.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1013
     * @param e    The executor to use if an executor is required.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1014
     * @param bs   The BodySubscriber (trusted or not)
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1015
     * @param <T>  The type of the response.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1016
     * @return A completion stage that completes when the completion
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1017
     *         stage returned by bs::getBody completes. This may, or
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1018
     *         may not, be the same completion stage.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1019
     */
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1020
    public static <T> CompletionStage<T> getBodyAsync(Executor e, BodySubscriber<T> bs) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1021
        if (TrustedSubscriber.needsExecutor(bs)) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1022
            // getBody must be called in the executor
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1023
            return getBodyAsync(e, bs, new MinimalFuture<>());
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1024
        } else {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1025
            // No executor needed
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1026
            return bs.getBody();
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1027
        }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1028
    }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1029
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1030
    /**
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1031
     * Invokes bs::getBody using the provided executor.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1032
     * If invoking bs::getBody requires an executor, and the given executor
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1033
     * is a {@link HttpClientImpl.DelegatingExecutor}, then the executor's
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1034
     * delegate is used. If an error occurs anywhere then the given {code cf}
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1035
     * is completed exceptionally (this method does not throw).
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1036
     * @param e   The executor that should be used to call bs::getBody
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1037
     * @param bs  The BodySubscriber
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1038
     * @param cf  A completable future that this function will set up
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1039
     *            to complete when the completion stage returned by
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1040
     *            bs::getBody completes.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1041
     *            In case of any error while trying to set up the
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1042
     *            completion chain, {@code cf} will be completed
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1043
     *            exceptionally with that error.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1044
     * @param <T> The response type.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1045
     * @return The provided {@code cf}.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1046
     */
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1047
    public static <T> CompletableFuture<T> getBodyAsync(Executor e,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1048
                                                      BodySubscriber<T> bs,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1049
                                                      CompletableFuture<T> cf) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1050
        return getBodyAsync(e, bs, cf, cf::completeExceptionally);
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1051
    }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1052
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1053
    /**
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1054
     * Invokes bs::getBody using the provided executor.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1055
     * If invoking bs::getBody requires an executor, and the given executor
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1056
     * is a {@link HttpClientImpl.DelegatingExecutor}, then the executor's
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1057
     * delegate is used.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1058
     * The provided {@code cf} is completed with the result (exceptional
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1059
     * or not) of the completion stage returned by bs::getBody.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1060
     * If an error occurs when trying to set up the
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1061
     * completion chain, the provided {@code errorHandler} is invoked,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1062
     * but {@code cf} is not necessarily affected.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1063
     * This method does not throw.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1064
     * @param e   The executor that should be used to call bs::getBody
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1065
     * @param bs  The BodySubscriber
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1066
     * @param cf  A completable future that this function will set up
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1067
     *            to complete when the completion stage returned by
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1068
     *            bs::getBody completes.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1069
     *            In case of any error while trying to set up the
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1070
     *            completion chain, {@code cf} will be completed
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1071
     *            exceptionally with that error.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1072
     * @param errorHandler The handler to invoke if an error is raised
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1073
     *                     while trying to set up the completion chain.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1074
     * @param <T> The response type.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1075
     * @return The provide {@code cf}. If the {@code errorHandler} is
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1076
     * invoked, it is the responsibility of the {@code errorHandler} to
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1077
     * complete the {@code cf}, if needed.
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1078
     */
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1079
    public static <T> CompletableFuture<T> getBodyAsync(Executor e,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1080
                                                      BodySubscriber<T> bs,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1081
                                                      CompletableFuture<T> cf,
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1082
                                                      Consumer<Throwable> errorHandler) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1083
        assert errorHandler != null;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1084
        try {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1085
            assert e != null;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1086
            assert cf != null;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1087
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1088
            if (TrustedSubscriber.needsExecutor(bs)) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1089
                e = (e instanceof HttpClientImpl.DelegatingExecutor)
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1090
                        ? ((HttpClientImpl.DelegatingExecutor) e).delegate() : e;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1091
            }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1092
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1093
            e.execute(() -> {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1094
                try {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1095
                    bs.getBody().whenComplete((r, t) -> {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1096
                        if (t != null) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1097
                            cf.completeExceptionally(t);
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1098
                        } else {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1099
                            cf.complete(r);
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1100
                        }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1101
                    });
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1102
                } catch (Throwable t) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1103
                    errorHandler.accept(t);
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1104
                }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1105
            });
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1106
            return cf;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1107
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1108
        } catch (Throwable t) {
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1109
            errorHandler.accept(t);
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1110
        }
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1111
        return cf;
97cf88608d76 8217264: HttpClient: Blocking operations in mapper function do not work as documented
dfuchs
parents: 49765
diff changeset
  1112
    }
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents:
diff changeset
  1113
}