src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java
author chegar
Tue, 18 Jun 2019 14:52:36 +0100
changeset 55402 b78af6d8a252
parent 49765 ee6f7a61f3a5
permissions -rw-r--r--
8225583: Examine the HttpResponse.BodySubscribers for null handling Reviewed-by: dfuchs, prappo
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     1
/*
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 49765
diff changeset
     2
 * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     4
 *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     7
 * published by the Free Software Foundation.  Oracle designates this
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     8
 * particular file as subject to the "Classpath" exception as provided
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     9
 * by Oracle in the LICENSE file that accompanied this code.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    10
 *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    11
 * This code is distributed in the hope that it will be useful, but WITHOUT
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    12
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    13
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    14
 * version 2 for more details (a copy is included in the LICENSE file that
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    15
 * accompanied this code).
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    16
 *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    17
 * You should have received a copy of the GNU General Public License version
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    18
 * 2 along with this work; if not, write to the Free Software Foundation,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    19
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    20
 *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    21
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    22
 * or visit www.oracle.com if you need additional information or have any
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    23
 * questions.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    24
 */
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    25
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    26
package jdk.internal.net.http;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    27
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    28
import java.nio.ByteBuffer;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    29
import java.nio.CharBuffer;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    30
import java.nio.charset.CharacterCodingException;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    31
import java.nio.charset.Charset;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    32
import java.nio.charset.CharsetDecoder;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    33
import java.nio.charset.CoderResult;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    34
import java.nio.charset.CodingErrorAction;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    35
import java.util.List;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    36
import java.util.Objects;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    37
import java.util.concurrent.CompletableFuture;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    38
import java.util.concurrent.CompletionStage;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    39
import java.util.concurrent.ConcurrentLinkedDeque;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    40
import java.util.concurrent.Flow;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    41
import java.util.concurrent.Flow.Subscriber;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    42
import java.util.concurrent.Flow.Subscription;
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 49765
diff changeset
    43
import java.util.concurrent.atomic.AtomicBoolean;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    44
import java.util.concurrent.atomic.AtomicLong;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    45
import java.util.concurrent.atomic.AtomicReference;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    46
import java.util.function.Function;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    47
import jdk.internal.net.http.common.Demand;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    48
import java.net.http.HttpResponse.BodySubscriber;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    49
import jdk.internal.net.http.common.MinimalFuture;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    50
import jdk.internal.net.http.common.SequentialScheduler;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    51
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    52
/** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    53
public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    54
        implements BodySubscriber<R> {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    55
    private final CompletableFuture<R> cf = new MinimalFuture<>();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    56
    private final S subscriber;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    57
    private final Function<? super S, ? extends R> finisher;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    58
    private final Charset charset;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    59
    private final String eol;
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 49765
diff changeset
    60
    private final AtomicBoolean subscribed = new AtomicBoolean();
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    61
    private volatile LineSubscription downstream;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    62
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    63
    private LineSubscriberAdapter(S subscriber,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    64
                                  Function<? super S, ? extends R> finisher,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    65
                                  Charset charset,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    66
                                  String eol) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    67
        if (eol != null && eol.isEmpty())
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    68
            throw new IllegalArgumentException("empty line separator");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    69
        this.subscriber = Objects.requireNonNull(subscriber);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    70
        this.finisher = Objects.requireNonNull(finisher);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    71
        this.charset = Objects.requireNonNull(charset);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    72
        this.eol = eol;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    73
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    74
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    75
    @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    76
    public void onSubscribe(Subscription subscription) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 49765
diff changeset
    77
        Objects.requireNonNull(subscription);
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 49765
diff changeset
    78
        if (!subscribed.compareAndSet(false, true)) {
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 49765
diff changeset
    79
            subscription.cancel();
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 49765
diff changeset
    80
            return;
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 49765
diff changeset
    81
        }
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 49765
diff changeset
    82
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    83
        downstream = LineSubscription.create(subscription,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    84
                                             charset,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    85
                                             eol,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    86
                                             subscriber,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    87
                                             cf);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    88
        subscriber.onSubscribe(downstream);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    89
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    90
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    91
    @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    92
    public void onNext(List<ByteBuffer> item) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 49765
diff changeset
    93
        Objects.requireNonNull(item);
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    94
        try {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    95
            downstream.submit(item);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    96
        } catch (Throwable t) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    97
            onError(t);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    98
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    99
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   100
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   101
    @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   102
    public void onError(Throwable throwable) {
55402
b78af6d8a252 8225583: Examine the HttpResponse.BodySubscribers for null handling
chegar
parents: 49765
diff changeset
   103
        Objects.requireNonNull(throwable);
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   104
        try {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   105
            downstream.signalError(throwable);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   106
        } finally {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   107
            cf.completeExceptionally(throwable);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   108
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   109
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   110
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   111
    @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   112
    public void onComplete() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   113
        try {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   114
            downstream.signalComplete();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   115
        } finally {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   116
            cf.complete(finisher.apply(subscriber));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   117
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   118
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   119
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   120
    @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   121
    public CompletionStage<R> getBody() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   122
        return cf;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   123
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   124
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   125
    public static <S extends Subscriber<? super String>, R> LineSubscriberAdapter<S, R>
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   126
    create(S subscriber, Function<? super S, ? extends R> finisher, Charset charset, String eol)
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   127
    {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   128
        if (eol != null && eol.isEmpty())
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   129
            throw new IllegalArgumentException("empty line separator");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   130
        return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber),
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   131
                Objects.requireNonNull(finisher),
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   132
                Objects.requireNonNull(charset),
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   133
                eol);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   134
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   135
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   136
    static final class LineSubscription implements Flow.Subscription {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   137
        final Flow.Subscription upstreamSubscription;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   138
        final CharsetDecoder decoder;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   139
        final String newline;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   140
        final Demand downstreamDemand;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   141
        final ConcurrentLinkedDeque<ByteBuffer> queue;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   142
        final SequentialScheduler scheduler;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   143
        final Flow.Subscriber<? super String> upstream;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   144
        final CompletableFuture<?> cf;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   145
        private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   146
        private final AtomicLong demanded = new AtomicLong();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   147
        private volatile boolean completed;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   148
        private volatile boolean cancelled;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   149
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   150
        private final char[] chars = new char[1024];
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   151
        private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   152
        private final CharBuffer buffer = CharBuffer.wrap(chars);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   153
        private final StringBuilder builder = new StringBuilder();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   154
        private String nextLine;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   155
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   156
        private LineSubscription(Flow.Subscription s,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   157
                                 CharsetDecoder dec,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   158
                                 String separator,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   159
                                 Flow.Subscriber<? super String> subscriber,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   160
                                 CompletableFuture<?> completion) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   161
            downstreamDemand = new Demand();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   162
            queue = new ConcurrentLinkedDeque<>();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   163
            upstreamSubscription = Objects.requireNonNull(s);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   164
            decoder = Objects.requireNonNull(dec);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   165
            newline = separator;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   166
            upstream = Objects.requireNonNull(subscriber);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   167
            cf = Objects.requireNonNull(completion);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   168
            scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   169
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   170
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   171
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   172
        public void request(long n) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   173
            if (cancelled) return;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   174
            if (downstreamDemand.increase(n)) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   175
                scheduler.runOrSchedule();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   176
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   177
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   178
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   179
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   180
        public void cancel() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   181
            cancelled = true;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   182
            upstreamSubscription.cancel();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   183
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   184
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   185
        public void submit(List<ByteBuffer> list) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   186
            queue.addAll(list);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   187
            demanded.decrementAndGet();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   188
            scheduler.runOrSchedule();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   189
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   190
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   191
        public void signalComplete() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   192
            completed = true;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   193
            scheduler.runOrSchedule();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   194
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   195
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   196
        public void signalError(Throwable error) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   197
            if (errorRef.compareAndSet(null,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   198
                    Objects.requireNonNull(error))) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   199
                scheduler.runOrSchedule();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   200
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   201
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   202
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   203
        // This method looks at whether some bytes where left over (in leftover)
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   204
        // from decoding the previous buffer when the previous buffer was in
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   205
        // underflow. If so, it takes bytes one by one from the new buffer 'in'
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   206
        // and combines them with the leftover bytes until 'in' is exhausted or a
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   207
        // character was produced in 'out', resolving the previous underflow.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   208
        // Returns true if the buffer is still in underflow, false otherwise.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   209
        // However, in both situation some chars might have been produced in 'out'.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   210
        private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput)
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   211
                throws CharacterCodingException {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   212
            int limit = leftover.position();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   213
            if (limit == 0) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   214
                // no leftover
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   215
                return false;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   216
            } else {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   217
                CoderResult res = null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   218
                while (in.hasRemaining()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   219
                    leftover.position(limit);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   220
                    leftover.limit(++limit);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   221
                    leftover.put(in.get());
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   222
                    leftover.position(0);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   223
                    res = decoder.decode(leftover, out,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   224
                            endOfInput && !in.hasRemaining());
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   225
                    int remaining = leftover.remaining();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   226
                    if (remaining > 0) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   227
                        assert leftover.position() == 0;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   228
                        leftover.position(remaining);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   229
                    } else {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   230
                        leftover.position(0);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   231
                    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   232
                    leftover.limit(leftover.capacity());
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   233
                    if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   234
                        continue;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   235
                    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   236
                    if (res.isError()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   237
                        res.throwException();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   238
                    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   239
                    assert !res.isOverflow();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   240
                    return false;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   241
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   242
                return !endOfInput;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   243
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   244
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   245
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   246
        // extract characters from start to end and remove them from
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   247
        // the StringBuilder
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   248
        private static String take(StringBuilder b, int start, int end) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   249
            assert start == 0;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   250
            String line;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   251
            if (end == start) return "";
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   252
            line = b.substring(start, end);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   253
            b.delete(start, end);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   254
            return line;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   255
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   256
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   257
        // finds end of line, returns -1 if not found, or the position after
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   258
        // the line delimiter if found, removing the delimiter in the process.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   259
        private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   260
            int len = b.length();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   261
            if (eol != null) { // delimiter explicitly specified
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   262
                int i = b.indexOf(eol);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   263
                if (i >= 0) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   264
                    // remove the delimiter and returns the position
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   265
                    // of the char after it.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   266
                    b.delete(i, i + eol.length());
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   267
                    return i;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   268
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   269
            } else { // no delimiter specified, behaves as BufferedReader::readLine
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   270
                boolean crfound = false;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   271
                for (int i = 0; i < len; i++) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   272
                    char c = b.charAt(i);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   273
                    if (c == '\n') {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   274
                        // '\n' or '\r\n' found.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   275
                        // remove the delimiter and returns the position
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   276
                        // of the char after it.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   277
                        b.delete(crfound ? i - 1 : i, i + 1);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   278
                        return crfound ? i - 1 : i;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   279
                    } else if (crfound) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   280
                        // previous char was '\r', c != '\n'
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   281
                        assert i != 0;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   282
                        // remove the delimiter and returns the position
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   283
                        // of the char after it.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   284
                        b.delete(i - 1, i);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   285
                        return i - 1;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   286
                    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   287
                    crfound = c == '\r';
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   288
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   289
                if (crfound && endOfInput) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   290
                    // remove the delimiter and returns the position
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   291
                    // of the char after it.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   292
                    b.delete(len - 1, len);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   293
                    return len - 1;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   294
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   295
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   296
            return endOfInput && len > 0 ? len : -1;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   297
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   298
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   299
        // Looks at whether the StringBuilder contains a line.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   300
        // Returns null if more character are needed.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   301
        private static String nextLine(StringBuilder b, String eol, boolean endOfInput) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   302
            int next = endOfLine(b, eol, endOfInput);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   303
            return (next > -1) ? take(b, 0, next) : null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   304
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   305
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   306
        // Attempts to read the next line. Returns the next line if
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   307
        // the delimiter was found, null otherwise. The delimiters are
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   308
        // consumed.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   309
        private String nextLine()
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   310
                throws CharacterCodingException {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   311
            assert nextLine == null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   312
            LINES:
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   313
            while (nextLine == null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   314
                boolean endOfInput = completed && queue.isEmpty();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   315
                nextLine = nextLine(builder, newline,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   316
                        endOfInput && leftover.position() == 0);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   317
                if (nextLine != null) return nextLine;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   318
                ByteBuffer b;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   319
                BUFFERS:
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   320
                while ((b = queue.peek()) != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   321
                    if (!b.hasRemaining()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   322
                        queue.poll();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   323
                        continue BUFFERS;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   324
                    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   325
                    BYTES:
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   326
                    while (b.hasRemaining()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   327
                        buffer.position(0);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   328
                        buffer.limit(buffer.capacity());
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   329
                        boolean endofInput = completed && queue.size() <= 1;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   330
                        if (isUnderFlow(b, buffer, endofInput)) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   331
                            assert !b.hasRemaining();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   332
                            if (buffer.position() > 0) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   333
                                buffer.flip();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   334
                                builder.append(buffer);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   335
                            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   336
                            continue BUFFERS;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   337
                        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   338
                        CoderResult res = decoder.decode(b, buffer, endofInput);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   339
                        if (res.isError()) res.throwException();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   340
                        if (buffer.position() > 0) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   341
                            buffer.flip();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   342
                            builder.append(buffer);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   343
                            continue LINES;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   344
                        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   345
                        if (res.isUnderflow() && b.hasRemaining()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   346
                            //System.out.println("underflow: adding " + b.remaining() + " bytes");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   347
                            leftover.put(b);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   348
                            assert !b.hasRemaining();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   349
                            continue BUFFERS;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   350
                        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   351
                    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   352
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   353
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   354
                assert queue.isEmpty();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   355
                if (endOfInput) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   356
                    // Time to cleanup: there may be some undecoded leftover bytes
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   357
                    // We need to flush them out.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   358
                    // The decoder has been configured to replace malformed/unmappable
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   359
                    // chars with some replacement, in order to behave like
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   360
                    // InputStreamReader.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   361
                    leftover.flip();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   362
                    buffer.position(0);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   363
                    buffer.limit(buffer.capacity());
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   364
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   365
                    // decode() must be called just before flush, even if there
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   366
                    // is nothing to decode. We must do this even if leftover
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   367
                    // has no remaining bytes.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   368
                    CoderResult res = decoder.decode(leftover, buffer, endOfInput);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   369
                    if (buffer.position() > 0) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   370
                        buffer.flip();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   371
                        builder.append(buffer);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   372
                    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   373
                    if (res.isError()) res.throwException();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   374
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   375
                    // Now call decoder.flush()
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   376
                    buffer.position(0);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   377
                    buffer.limit(buffer.capacity());
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   378
                    res = decoder.flush(buffer);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   379
                    if (buffer.position() > 0) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   380
                        buffer.flip();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   381
                        builder.append(buffer);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   382
                    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   383
                    if (res.isError()) res.throwException();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   384
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   385
                    // It's possible that we reach here twice - just for the
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   386
                    // purpose of checking that no bytes were left over, so
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   387
                    // we reset leftover/decoder to make the function reentrant.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   388
                    leftover.position(0);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   389
                    leftover.limit(leftover.capacity());
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   390
                    decoder.reset();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   391
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   392
                    // if some chars were produced then this call will
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   393
                    // return them.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   394
                    return nextLine = nextLine(builder, newline, endOfInput);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   395
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   396
                return null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   397
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   398
            return null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   399
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   400
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   401
        // The main sequential scheduler loop.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   402
        private void loop() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   403
            try {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   404
                while (!cancelled) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   405
                    Throwable error = errorRef.get();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   406
                    if (error != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   407
                        cancelled = true;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   408
                        scheduler.stop();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   409
                        upstream.onError(error);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   410
                        cf.completeExceptionally(error);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   411
                        return;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   412
                    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   413
                    if (nextLine == null) nextLine = nextLine();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   414
                    if (nextLine == null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   415
                        if (completed) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   416
                            scheduler.stop();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   417
                            if (leftover.position() != 0) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   418
                                // Underflow: not all bytes could be
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   419
                                // decoded, but no more bytes will be coming.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   420
                                // This should not happen as we should already
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   421
                                // have got a MalformedInputException, or
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   422
                                // replaced the unmappable chars.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   423
                                errorRef.compareAndSet(null,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   424
                                        new IllegalStateException(
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   425
                                                "premature end of input ("
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   426
                                                        + leftover.position()
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   427
                                                        + " undecoded bytes)"));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   428
                                continue;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   429
                            } else {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   430
                                upstream.onComplete();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   431
                            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   432
                            return;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   433
                        } else if (demanded.get() == 0
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   434
                                && !downstreamDemand.isFulfilled()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   435
                            long incr = Math.max(1, downstreamDemand.get());
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   436
                            demanded.addAndGet(incr);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   437
                            upstreamSubscription.request(incr);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   438
                            continue;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   439
                        } else return;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   440
                    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   441
                    assert nextLine != null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   442
                    assert newline != null && !nextLine.endsWith(newline)
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   443
                            || !nextLine.endsWith("\n") || !nextLine.endsWith("\r");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   444
                    if (downstreamDemand.tryDecrement()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   445
                        String forward = nextLine;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   446
                        nextLine = null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   447
                        upstream.onNext(forward);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   448
                    } else return; // no demand: come back later
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   449
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   450
            } catch (Throwable t) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   451
                try {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   452
                    upstreamSubscription.cancel();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   453
                } finally {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   454
                    signalError(t);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   455
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   456
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   457
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   458
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   459
        static LineSubscription create(Flow.Subscription s,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   460
                                       Charset charset,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   461
                                       String lineSeparator,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   462
                                       Flow.Subscriber<? super String> upstream,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   463
                                       CompletableFuture<?> cf) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   464
            return new LineSubscription(Objects.requireNonNull(s),
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   465
                    Objects.requireNonNull(charset).newDecoder()
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   466
                            // use the same decoder configuration than
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   467
                            // java.io.InputStreamReader
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   468
                            .onMalformedInput(CodingErrorAction.REPLACE)
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   469
                            .onUnmappableCharacter(CodingErrorAction.REPLACE),
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   470
                    lineSeparator,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   471
                    Objects.requireNonNull(upstream),
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   472
                    Objects.requireNonNull(cf));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   473
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   474
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   475
}
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   476