src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java
author jwilhelm
Thu, 12 Sep 2019 03:21:11 +0200
changeset 58094 0f6c749acd15
parent 55426 4efe251009b4
permissions -rw-r--r--
Added tag jdk-14+14 for changeset cddef3bde924
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
42460
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
     1
/*
55426
4efe251009b4 8226303: Examine the HttpRequest.BodyPublishers for exception handling
prappo
parents: 49765
diff changeset
     2
 * Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
42460
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
     4
 *
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
     7
 * published by the Free Software Foundation.  Oracle designates this
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
     8
 * particular file as subject to the "Classpath" exception as provided
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
     9
 * by Oracle in the LICENSE file that accompanied this code.
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    10
 *
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    11
 * This code is distributed in the hope that it will be useful, but WITHOUT
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    12
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    13
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    14
 * version 2 for more details (a copy is included in the LICENSE file that
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    15
 * accompanied this code).
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    16
 *
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    17
 * You should have received a copy of the GNU General Public License version
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    18
 * 2 along with this work; if not, write to the Free Software Foundation,
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    19
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    20
 *
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    21
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    22
 * or visit www.oracle.com if you need additional information or have any
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    23
 * questions.
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    24
 */
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    25
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48083
diff changeset
    26
package jdk.internal.net.http;
42460
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    27
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    28
import java.util.Iterator;
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    29
import java.util.concurrent.Flow;
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48083
diff changeset
    30
import jdk.internal.net.http.common.Demand;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents: 48083
diff changeset
    31
import jdk.internal.net.http.common.SequentialScheduler;
42460
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    32
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    33
/**
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    34
 * A Publisher that publishes items obtained from the given Iterable. Each new
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    35
 * subscription gets a new Iterator.
42460
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    36
 */
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    37
class PullPublisher<T> implements Flow.Publisher<T> {
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    38
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    39
    // Only one of `iterable` and `throwable` can be non-null. throwable is
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    40
    // non-null when an error has been encountered, by the creator of
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    41
    // PullPublisher, while subscribing the subscriber, but before subscribe has
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    42
    // completed.
42460
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    43
    private final Iterable<T> iterable;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    44
    private final Throwable throwable;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    45
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    46
    PullPublisher(Iterable<T> iterable, Throwable throwable) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    47
        this.iterable = iterable;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    48
        this.throwable = throwable;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    49
    }
42460
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    50
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    51
    PullPublisher(Iterable<T> iterable) {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    52
        this(iterable, null);
42460
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    53
    }
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    54
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    55
    @Override
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    56
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    57
        Subscription sub;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    58
        if (throwable != null) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    59
            assert iterable == null : "non-null iterable: " + iterable;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    60
            sub = new Subscription(subscriber, null, throwable);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    61
        } else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    62
            assert throwable == null : "non-null exception: " + throwable;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    63
            sub = new Subscription(subscriber, iterable.iterator(), null);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    64
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    65
        subscriber.onSubscribe(sub);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    66
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    67
        if (throwable != null) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    68
            sub.pullScheduler.runOrSchedule();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    69
        }
42460
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    70
    }
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    71
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    72
    private class Subscription implements Flow.Subscription {
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    73
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    74
        private final Flow.Subscriber<? super T> subscriber;
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    75
        private final Iterator<T> iter;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    76
        private volatile boolean completed;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    77
        private volatile boolean cancelled;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    78
        private volatile Throwable error;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    79
        final SequentialScheduler pullScheduler = new SequentialScheduler(new PullTask());
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    80
        private final Demand demand = new Demand();
42460
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    81
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    82
        Subscription(Flow.Subscriber<? super T> subscriber,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    83
                     Iterator<T> iter,
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    84
                     Throwable throwable) {
42460
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    85
            this.subscriber = subscriber;
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
    86
            this.iter = iter;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    87
            this.error = throwable;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    88
        }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    89
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    90
        final class PullTask extends SequentialScheduler.CompleteRestartableTask {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    91
            @Override
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    92
            protected void run() {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    93
                if (completed || cancelled) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    94
                    return;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    95
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    96
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    97
                Throwable t = error;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    98
                if (t != null) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
    99
                    completed = true;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   100
                    pullScheduler.stop();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   101
                    subscriber.onError(t);
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   102
                    return;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   103
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   104
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   105
                while (demand.tryDecrement() && !cancelled) {
55426
4efe251009b4 8226303: Examine the HttpRequest.BodyPublishers for exception handling
prappo
parents: 49765
diff changeset
   106
                    T next;
4efe251009b4 8226303: Examine the HttpRequest.BodyPublishers for exception handling
prappo
parents: 49765
diff changeset
   107
                    try {
4efe251009b4 8226303: Examine the HttpRequest.BodyPublishers for exception handling
prappo
parents: 49765
diff changeset
   108
                        if (!iter.hasNext()) {
4efe251009b4 8226303: Examine the HttpRequest.BodyPublishers for exception handling
prappo
parents: 49765
diff changeset
   109
                            break;
4efe251009b4 8226303: Examine the HttpRequest.BodyPublishers for exception handling
prappo
parents: 49765
diff changeset
   110
                        }
4efe251009b4 8226303: Examine the HttpRequest.BodyPublishers for exception handling
prappo
parents: 49765
diff changeset
   111
                        next = iter.next();
4efe251009b4 8226303: Examine the HttpRequest.BodyPublishers for exception handling
prappo
parents: 49765
diff changeset
   112
                    } catch (Throwable t1) {
4efe251009b4 8226303: Examine the HttpRequest.BodyPublishers for exception handling
prappo
parents: 49765
diff changeset
   113
                        completed = true;
4efe251009b4 8226303: Examine the HttpRequest.BodyPublishers for exception handling
prappo
parents: 49765
diff changeset
   114
                        pullScheduler.stop();
4efe251009b4 8226303: Examine the HttpRequest.BodyPublishers for exception handling
prappo
parents: 49765
diff changeset
   115
                        subscriber.onError(t1);
4efe251009b4 8226303: Examine the HttpRequest.BodyPublishers for exception handling
prappo
parents: 49765
diff changeset
   116
                        return;
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   117
                    }
55426
4efe251009b4 8226303: Examine the HttpRequest.BodyPublishers for exception handling
prappo
parents: 49765
diff changeset
   118
                    subscriber.onNext(next);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   119
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   120
                if (!iter.hasNext() && !cancelled) {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   121
                    completed = true;
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   122
                    pullScheduler.stop();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   123
                    subscriber.onComplete();
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   124
                }
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   125
            }
42460
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
   126
        }
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
   127
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
   128
        @Override
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
   129
        public void request(long n) {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   130
            if (cancelled)
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   131
                return;  // no-op
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   132
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   133
            if (n <= 0) {
55426
4efe251009b4 8226303: Examine the HttpRequest.BodyPublishers for exception handling
prappo
parents: 49765
diff changeset
   134
                error = new IllegalArgumentException("non-positive subscription request: " + n);
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   135
            } else {
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   136
                demand.increase(n);
42460
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
   137
            }
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   138
            pullScheduler.runOrSchedule();
42460
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
   139
        }
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
   140
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
   141
        @Override
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
   142
        public void cancel() {
48083
b1c1b4ef4be2 8191494: Refresh incubating HTTP Client
chegar
parents: 47216
diff changeset
   143
            cancelled = true;
42460
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
   144
        }
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
   145
    }
7133f144981a 8170648: Move java.net.http package out of Java SE to incubator namespace
michaelm
parents:
diff changeset
   146
}