jdk/src/java.base/share/classes/java/util/concurrent/Flow.java
author dl
Wed, 21 Dec 2016 14:26:52 -0800
changeset 42927 1d31e540bfcb
parent 40817 4f5fb115676d
child 44125 dbd27e1dfe6f
permissions -rw-r--r--
8170484: Miscellaneous changes imported from jsr166 CVS 2016-12 Reviewed-by: martin, smarks, psandoz
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     1
/*
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     3
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     4
 * This code is free software; you can redistribute it and/or modify it
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     5
 * under the terms of the GNU General Public License version 2 only, as
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     6
 * published by the Free Software Foundation.  Oracle designates this
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     7
 * particular file as subject to the "Classpath" exception as provided
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     8
 * by Oracle in the LICENSE file that accompanied this code.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     9
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    10
 * This code is distributed in the hope that it will be useful, but WITHOUT
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    11
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    12
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    13
 * version 2 for more details (a copy is included in the LICENSE file that
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    14
 * accompanied this code).
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    15
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    16
 * You should have received a copy of the GNU General Public License version
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    17
 * 2 along with this work; if not, write to the Free Software Foundation,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    18
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    19
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    20
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    21
 * or visit www.oracle.com if you need additional information or have any
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    22
 * questions.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    23
 */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    24
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    25
/*
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    26
 * This file is available under and governed by the GNU General Public
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    27
 * License version 2 only, as published by the Free Software Foundation.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    28
 * However, the following notice accompanied the original version of this
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    29
 * file:
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    30
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    31
 * Written by Doug Lea with assistance from members of JCP JSR-166
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    32
 * Expert Group and released to the public domain, as explained at
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    33
 * http://creativecommons.org/publicdomain/zero/1.0/
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    34
 */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    35
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    36
package java.util.concurrent;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    37
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    38
/**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    39
 * Interrelated interfaces and static methods for establishing
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    40
 * flow-controlled components in which {@link Publisher Publishers}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    41
 * produce items consumed by one or more {@link Subscriber
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    42
 * Subscribers}, each managed by a {@link Subscription
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    43
 * Subscription}.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    44
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    45
 * <p>These interfaces correspond to the <a
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    46
 * href="http://www.reactive-streams.org/"> reactive-streams</a>
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    47
 * specification.  They apply in both concurrent and distributed
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    48
 * asynchronous settings: All (seven) methods are defined in {@code
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    49
 * void} "one-way" message style. Communication relies on a simple form
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    50
 * of flow control (method {@link Subscription#request}) that can be
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    51
 * used to avoid resource management problems that may otherwise occur
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    52
 * in "push" based systems.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    53
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    54
 * <p><b>Examples.</b> A {@link Publisher} usually defines its own
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    55
 * {@link Subscription} implementation; constructing one in method
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    56
 * {@code subscribe} and issuing it to the calling {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    57
 * Subscriber}. It publishes items to the subscriber asynchronously,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    58
 * normally using an {@link Executor}.  For example, here is a very
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    59
 * simple publisher that only issues (when requested) a single {@code
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    60
 * TRUE} item to a single subscriber.  Because the subscriber receives
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    61
 * only a single item, this class does not use buffering and ordering
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    62
 * control required in most implementations (for example {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    63
 * SubmissionPublisher}).
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    64
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    65
 * <pre> {@code
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    66
 * class OneShotPublisher implements Publisher<Boolean> {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    67
 *   private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    68
 *   private boolean subscribed; // true after first subscribe
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    69
 *   public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    70
 *     if (subscribed)
40817
4f5fb115676d 8164169: Miscellaneous changes imported from jsr166 CVS 2016-09
dl
parents: 35302
diff changeset
    71
 *       subscriber.onError(new IllegalStateException()); // only one allowed
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    72
 *     else {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    73
 *       subscribed = true;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    74
 *       subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    75
 *     }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    76
 *   }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    77
 *   static class OneShotSubscription implements Subscription {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    78
 *     private final Subscriber<? super Boolean> subscriber;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    79
 *     private final ExecutorService executor;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    80
 *     private Future<?> future; // to allow cancellation
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    81
 *     private boolean completed;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    82
 *     OneShotSubscription(Subscriber<? super Boolean> subscriber,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    83
 *                         ExecutorService executor) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    84
 *       this.subscriber = subscriber;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    85
 *       this.executor = executor;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    86
 *     }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    87
 *     public synchronized void request(long n) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    88
 *       if (n != 0 && !completed) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    89
 *         completed = true;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    90
 *         if (n < 0) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    91
 *           IllegalArgumentException ex = new IllegalArgumentException();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    92
 *           executor.execute(() -> subscriber.onError(ex));
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    93
 *         } else {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    94
 *           future = executor.submit(() -> {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    95
 *             subscriber.onNext(Boolean.TRUE);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    96
 *             subscriber.onComplete();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    97
 *           });
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    98
 *         }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    99
 *       }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   100
 *     }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   101
 *     public synchronized void cancel() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   102
 *       completed = true;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   103
 *       if (future != null) future.cancel(false);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   104
 *     }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   105
 *   }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   106
 * }}</pre>
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   107
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   108
 * <p>A {@link Subscriber} arranges that items be requested and
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   109
 * processed.  Items (invocations of {@link Subscriber#onNext}) are
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   110
 * not issued unless requested, but multiple items may be requested.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   111
 * Many Subscriber implementations can arrange this in the style of
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   112
 * the following example, where a buffer size of 1 single-steps, and
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   113
 * larger sizes usually allow for more efficient overlapped processing
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   114
 * with less communication; for example with a value of 64, this keeps
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   115
 * total outstanding requests between 32 and 64.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   116
 * Because Subscriber method invocations for a given {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   117
 * Subscription} are strictly ordered, there is no need for these
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   118
 * methods to use locks or volatiles unless a Subscriber maintains
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   119
 * multiple Subscriptions (in which case it is better to instead
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   120
 * define multiple Subscribers, each with its own Subscription).
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   121
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   122
 * <pre> {@code
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   123
 * class SampleSubscriber<T> implements Subscriber<T> {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   124
 *   final Consumer<? super T> consumer;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   125
 *   Subscription subscription;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   126
 *   final long bufferSize;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   127
 *   long count;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   128
 *   SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   129
 *     this.bufferSize = bufferSize;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   130
 *     this.consumer = consumer;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   131
 *   }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   132
 *   public void onSubscribe(Subscription subscription) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   133
 *     long initialRequestSize = bufferSize;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   134
 *     count = bufferSize - bufferSize / 2; // re-request when half consumed
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   135
 *     (this.subscription = subscription).request(initialRequestSize);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   136
 *   }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   137
 *   public void onNext(T item) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   138
 *     if (--count <= 0)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   139
 *       subscription.request(count = bufferSize - bufferSize / 2);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   140
 *     consumer.accept(item);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   141
 *   }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   142
 *   public void onError(Throwable ex) { ex.printStackTrace(); }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   143
 *   public void onComplete() {}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   144
 * }}</pre>
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   145
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   146
 * <p>The default value of {@link #defaultBufferSize} may provide a
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   147
 * useful starting point for choosing request sizes and capacities in
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   148
 * Flow components based on expected rates, resources, and usages.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   149
 * Or, when flow control is never needed, a subscriber may initially
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   150
 * request an effectively unbounded number of items, as in:
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   151
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   152
 * <pre> {@code
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   153
 * class UnboundedSubscriber<T> implements Subscriber<T> {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   154
 *   public void onSubscribe(Subscription subscription) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   155
 *     subscription.request(Long.MAX_VALUE); // effectively unbounded
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   156
 *   }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   157
 *   public void onNext(T item) { use(item); }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   158
 *   public void onError(Throwable ex) { ex.printStackTrace(); }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   159
 *   public void onComplete() {}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   160
 *   void use(T item) { ... }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   161
 * }}</pre>
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   162
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   163
 * @author Doug Lea
35302
e4d2275861c3 8136494: Update "@since 1.9" to "@since 9" to match java.version.specification
iris
parents: 32989
diff changeset
   164
 * @since 9
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   165
 */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   166
public final class Flow {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   167
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   168
    private Flow() {} // uninstantiable
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   169
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   170
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   171
     * A producer of items (and related control messages) received by
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   172
     * Subscribers.  Each current {@link Subscriber} receives the same
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   173
     * items (via method {@code onNext}) in the same order, unless
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   174
     * drops or errors are encountered. If a Publisher encounters an
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   175
     * error that does not allow items to be issued to a Subscriber,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   176
     * that Subscriber receives {@code onError}, and then receives no
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   177
     * further messages.  Otherwise, when it is known that no further
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   178
     * messages will be issued to it, a subscriber receives {@code
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   179
     * onComplete}.  Publishers ensure that Subscriber method
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   180
     * invocations for each subscription are strictly ordered in <a
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   181
     * href="package-summary.html#MemoryVisibility"><i>happens-before</i></a>
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   182
     * order.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   183
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   184
     * <p>Publishers may vary in policy about whether drops (failures
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   185
     * to issue an item because of resource limitations) are treated
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   186
     * as unrecoverable errors.  Publishers may also vary about
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   187
     * whether Subscribers receive items that were produced or
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   188
     * available before they subscribed.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   189
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   190
     * @param <T> the published item type
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   191
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   192
    @FunctionalInterface
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   193
    public static interface Publisher<T> {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   194
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   195
         * Adds the given Subscriber if possible.  If already
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   196
         * subscribed, or the attempt to subscribe fails due to policy
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   197
         * violations or errors, the Subscriber's {@code onError}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   198
         * method is invoked with an {@link IllegalStateException}.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   199
         * Otherwise, the Subscriber's {@code onSubscribe} method is
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   200
         * invoked with a new {@link Subscription}.  Subscribers may
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   201
         * enable receiving items by invoking the {@code request}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   202
         * method of this Subscription, and may unsubscribe by
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   203
         * invoking its {@code cancel} method.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   204
         *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   205
         * @param subscriber the subscriber
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   206
         * @throws NullPointerException if subscriber is null
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   207
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   208
        public void subscribe(Subscriber<? super T> subscriber);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   209
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   210
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   211
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   212
     * A receiver of messages.  The methods in this interface are
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   213
     * invoked in strict sequential order for each {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   214
     * Subscription}.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   215
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   216
     * @param <T> the subscribed item type
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   217
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   218
    public static interface Subscriber<T> {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   219
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   220
         * Method invoked prior to invoking any other Subscriber
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   221
         * methods for the given Subscription. If this method throws
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   222
         * an exception, resulting behavior is not guaranteed, but may
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   223
         * cause the Subscription not to be established or to be cancelled.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   224
         *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   225
         * <p>Typically, implementations of this method invoke {@code
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   226
         * subscription.request} to enable receiving items.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   227
         *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   228
         * @param subscription a new subscription
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   229
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   230
        public void onSubscribe(Subscription subscription);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   231
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   232
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   233
         * Method invoked with a Subscription's next item.  If this
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   234
         * method throws an exception, resulting behavior is not
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   235
         * guaranteed, but may cause the Subscription to be cancelled.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   236
         *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   237
         * @param item the item
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   238
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   239
        public void onNext(T item);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   240
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   241
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   242
         * Method invoked upon an unrecoverable error encountered by a
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   243
         * Publisher or Subscription, after which no other Subscriber
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   244
         * methods are invoked by the Subscription.  If this method
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   245
         * itself throws an exception, resulting behavior is
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   246
         * undefined.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   247
         *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   248
         * @param throwable the exception
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   249
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   250
        public void onError(Throwable throwable);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   251
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   252
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   253
         * Method invoked when it is known that no additional
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   254
         * Subscriber method invocations will occur for a Subscription
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   255
         * that is not already terminated by error, after which no
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   256
         * other Subscriber methods are invoked by the Subscription.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   257
         * If this method throws an exception, resulting behavior is
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   258
         * undefined.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   259
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   260
        public void onComplete();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   261
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   262
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   263
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   264
     * Message control linking a {@link Publisher} and {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   265
     * Subscriber}.  Subscribers receive items only when requested,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   266
     * and may cancel at any time. The methods in this interface are
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   267
     * intended to be invoked only by their Subscribers; usages in
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   268
     * other contexts have undefined effects.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   269
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   270
    public static interface Subscription {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   271
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   272
         * Adds the given number {@code n} of items to the current
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   273
         * unfulfilled demand for this subscription.  If {@code n} is
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   274
         * negative, the Subscriber will receive an {@code onError}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   275
         * signal with an {@link IllegalArgumentException} argument.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   276
         * Otherwise, the Subscriber will receive up to {@code n}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   277
         * additional {@code onNext} invocations (or fewer if
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   278
         * terminated).
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   279
         *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   280
         * @param n the increment of demand; a value of {@code
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   281
         * Long.MAX_VALUE} may be considered as effectively unbounded
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   282
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   283
        public void request(long n);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   284
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   285
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   286
         * Causes the Subscriber to (eventually) stop receiving
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   287
         * messages.  Implementation is best-effort -- additional
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   288
         * messages may be received after invoking this method.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   289
         * A cancelled subscription need not ever receive an
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   290
         * {@code onComplete} or {@code onError} signal.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   291
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   292
        public void cancel();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   293
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   294
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   295
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   296
     * A component that acts as both a Subscriber and Publisher.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   297
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   298
     * @param <T> the subscribed item type
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   299
     * @param <R> the published item type
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   300
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   301
    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   302
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   303
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   304
    static final int DEFAULT_BUFFER_SIZE = 256;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   305
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   306
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   307
     * Returns a default value for Publisher or Subscriber buffering,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   308
     * that may be used in the absence of other constraints.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   309
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   310
     * @implNote
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   311
     * The current value returned is 256.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   312
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   313
     * @return the buffer size value
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   314
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   315
    public static int defaultBufferSize() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   316
        return DEFAULT_BUFFER_SIZE;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   317
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   318
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   319
}