jdk/src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java
author dl
Fri, 15 Jul 2016 13:55:51 -0700
changeset 39723 9aa34e4a0469
parent 36936 bfcdf736a998
child 40734 48879ea67e2a
permissions -rw-r--r--
8157523: Various improvements to ForkJoin/SubmissionPublisher code Reviewed-by: martin, psandoz, rriggs, plevart, dfuchs
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     1
/*
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     3
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     4
 * This code is free software; you can redistribute it and/or modify it
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     5
 * under the terms of the GNU General Public License version 2 only, as
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     6
 * published by the Free Software Foundation.  Oracle designates this
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     7
 * particular file as subject to the "Classpath" exception as provided
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     8
 * by Oracle in the LICENSE file that accompanied this code.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
     9
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    10
 * This code is distributed in the hope that it will be useful, but WITHOUT
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    11
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    12
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    13
 * version 2 for more details (a copy is included in the LICENSE file that
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    14
 * accompanied this code).
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    15
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    16
 * You should have received a copy of the GNU General Public License version
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    17
 * 2 along with this work; if not, write to the Free Software Foundation,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    18
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    19
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    20
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    21
 * or visit www.oracle.com if you need additional information or have any
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    22
 * questions.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    23
 */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    24
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    25
/*
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    26
 * This file is available under and governed by the GNU General Public
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    27
 * License version 2 only, as published by the Free Software Foundation.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    28
 * However, the following notice accompanied the original version of this
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    29
 * file:
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    30
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    31
 * Written by Doug Lea with assistance from members of JCP JSR-166
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    32
 * Expert Group and released to the public domain, as explained at
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    33
 * http://creativecommons.org/publicdomain/zero/1.0/
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    34
 */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    35
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    36
package java.util.concurrent;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    37
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
    38
import java.lang.invoke.MethodHandles;
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
    39
import java.lang.invoke.VarHandle;
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    40
import java.util.ArrayList;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    41
import java.util.List;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    42
import java.util.concurrent.locks.LockSupport;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    43
import java.util.function.BiConsumer;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    44
import java.util.function.BiPredicate;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    45
import java.util.function.Consumer;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    46
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    47
/**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    48
 * A {@link Flow.Publisher} that asynchronously issues submitted
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    49
 * (non-null) items to current subscribers until it is closed.  Each
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    50
 * current subscriber receives newly submitted items in the same order
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    51
 * unless drops or exceptions are encountered.  Using a
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    52
 * SubmissionPublisher allows item generators to act as compliant <a
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    53
 * href="http://www.reactive-streams.org/"> reactive-streams</a>
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    54
 * Publishers relying on drop handling and/or blocking for flow
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    55
 * control.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    56
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    57
 * <p>A SubmissionPublisher uses the {@link Executor} supplied in its
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    58
 * constructor for delivery to subscribers. The best choice of
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    59
 * Executor depends on expected usage. If the generator(s) of
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    60
 * submitted items run in separate threads, and the number of
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    61
 * subscribers can be estimated, consider using a {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    62
 * Executors#newFixedThreadPool}. Otherwise consider using the
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    63
 * default, normally the {@link ForkJoinPool#commonPool}.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    64
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    65
 * <p>Buffering allows producers and consumers to transiently operate
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    66
 * at different rates.  Each subscriber uses an independent buffer.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    67
 * Buffers are created upon first use and expanded as needed up to the
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    68
 * given maximum. (The enforced capacity may be rounded up to the
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    69
 * nearest power of two and/or bounded by the largest value supported
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    70
 * by this implementation.)  Invocations of {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    71
 * Flow.Subscription#request(long) request} do not directly result in
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    72
 * buffer expansion, but risk saturation if unfilled requests exceed
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    73
 * the maximum capacity.  The default value of {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    74
 * Flow#defaultBufferSize()} may provide a useful starting point for
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    75
 * choosing a capacity based on expected rates, resources, and usages.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    76
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    77
 * <p>Publication methods support different policies about what to do
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    78
 * when buffers are saturated. Method {@link #submit(Object) submit}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    79
 * blocks until resources are available. This is simplest, but least
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    80
 * responsive.  The {@code offer} methods may drop items (either
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    81
 * immediately or with bounded timeout), but provide an opportunity to
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    82
 * interpose a handler and then retry.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    83
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    84
 * <p>If any Subscriber method throws an exception, its subscription
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    85
 * is cancelled.  If a handler is supplied as a constructor argument,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    86
 * it is invoked before cancellation upon an exception in method
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    87
 * {@link Flow.Subscriber#onNext onNext}, but exceptions in methods
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    88
 * {@link Flow.Subscriber#onSubscribe onSubscribe},
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    89
 * {@link Flow.Subscriber#onError(Throwable) onError} and
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    90
 * {@link Flow.Subscriber#onComplete() onComplete} are not recorded or
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    91
 * handled before cancellation.  If the supplied Executor throws
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    92
 * {@link RejectedExecutionException} (or any other RuntimeException
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    93
 * or Error) when attempting to execute a task, or a drop handler
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    94
 * throws an exception when processing a dropped item, then the
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    95
 * exception is rethrown. In these cases, not all subscribers will
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    96
 * have been issued the published item. It is usually good practice to
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    97
 * {@link #closeExceptionally closeExceptionally} in these cases.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    98
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
    99
 * <p>Method {@link #consume(Consumer)} simplifies support for a
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   100
 * common case in which the only action of a subscriber is to request
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   101
 * and process all items using a supplied function.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   102
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   103
 * <p>This class may also serve as a convenient base for subclasses
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   104
 * that generate items, and use the methods in this class to publish
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   105
 * them.  For example here is a class that periodically publishes the
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   106
 * items generated from a supplier. (In practice you might add methods
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   107
 * to independently start and stop generation, to share Executors
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   108
 * among publishers, and so on, or use a SubmissionPublisher as a
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   109
 * component rather than a superclass.)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   110
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   111
 * <pre> {@code
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   112
 * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   113
 *   final ScheduledFuture<?> periodicTask;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   114
 *   final ScheduledExecutorService scheduler;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   115
 *   PeriodicPublisher(Executor executor, int maxBufferCapacity,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   116
 *                     Supplier<? extends T> supplier,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   117
 *                     long period, TimeUnit unit) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   118
 *     super(executor, maxBufferCapacity);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   119
 *     scheduler = new ScheduledThreadPoolExecutor(1);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   120
 *     periodicTask = scheduler.scheduleAtFixedRate(
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   121
 *       () -> submit(supplier.get()), 0, period, unit);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   122
 *   }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   123
 *   public void close() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   124
 *     periodicTask.cancel(false);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   125
 *     scheduler.shutdown();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   126
 *     super.close();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   127
 *   }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   128
 * }}</pre>
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   129
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   130
 * <p>Here is an example of a {@link Flow.Processor} implementation.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   131
 * It uses single-step requests to its publisher for simplicity of
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   132
 * illustration. A more adaptive version could monitor flow using the
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   133
 * lag estimate returned from {@code submit}, along with other utility
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   134
 * methods.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   135
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   136
 * <pre> {@code
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   137
 * class TransformProcessor<S,T> extends SubmissionPublisher<T>
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   138
 *   implements Flow.Processor<S,T> {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   139
 *   final Function<? super S, ? extends T> function;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   140
 *   Flow.Subscription subscription;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   141
 *   TransformProcessor(Executor executor, int maxBufferCapacity,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   142
 *                      Function<? super S, ? extends T> function) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   143
 *     super(executor, maxBufferCapacity);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   144
 *     this.function = function;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   145
 *   }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   146
 *   public void onSubscribe(Flow.Subscription subscription) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   147
 *     (this.subscription = subscription).request(1);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   148
 *   }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   149
 *   public void onNext(S item) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   150
 *     subscription.request(1);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   151
 *     submit(function.apply(item));
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   152
 *   }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   153
 *   public void onError(Throwable ex) { closeExceptionally(ex); }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   154
 *   public void onComplete() { close(); }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   155
 * }}</pre>
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   156
 *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   157
 * @param <T> the published item type
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   158
 * @author Doug Lea
35302
e4d2275861c3 8136494: Update "@since 1.9" to "@since 9" to match java.version.specification
iris
parents: 34369
diff changeset
   159
 * @since 9
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   160
 */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   161
public class SubmissionPublisher<T> implements Flow.Publisher<T>,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   162
                                               AutoCloseable {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   163
    /*
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   164
     * Most mechanics are handled by BufferedSubscription. This class
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   165
     * mainly tracks subscribers and ensures sequentiality, by using
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   166
     * built-in synchronization locks across public methods. (Using
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   167
     * built-in locks works well in the most typical case in which
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   168
     * only one thread submits items).
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   169
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   170
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   171
    /** The largest possible power of two array size. */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   172
    static final int BUFFER_CAPACITY_LIMIT = 1 << 30;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   173
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   174
    /** Round capacity to power of 2, at most limit. */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   175
    static final int roundCapacity(int cap) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   176
        int n = cap - 1;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   177
        n |= n >>> 1;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   178
        n |= n >>> 2;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   179
        n |= n >>> 4;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   180
        n |= n >>> 8;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   181
        n |= n >>> 16;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   182
        return (n <= 0) ? 1 : // at least 1
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   183
            (n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   184
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   185
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   186
    // default Executor setup; nearly the same as CompletableFuture
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   187
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   188
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   189
     * Default executor -- ForkJoinPool.commonPool() unless it cannot
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   190
     * support parallelism.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   191
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   192
    private static final Executor ASYNC_POOL =
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   193
        (ForkJoinPool.getCommonPoolParallelism() > 1) ?
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   194
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   195
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   196
    /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   197
    private static final class ThreadPerTaskExecutor implements Executor {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   198
        public void execute(Runnable r) { new Thread(r).start(); }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   199
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   200
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   201
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   202
     * Clients (BufferedSubscriptions) are maintained in a linked list
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   203
     * (via their "next" fields). This works well for publish loops.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   204
     * It requires O(n) traversal to check for duplicate subscribers,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   205
     * but we expect that subscribing is much less common than
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   206
     * publishing. Unsubscribing occurs only during traversal loops,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   207
     * when BufferedSubscription methods return negative values
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   208
     * signifying that they have been disabled.  To reduce
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   209
     * head-of-line blocking, submit and offer methods first call
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   210
     * BufferedSubscription.offer on each subscriber, and place
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   211
     * saturated ones in retries list (using nextRetry field), and
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   212
     * retry, possibly blocking or dropping.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   213
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   214
    BufferedSubscription<T> clients;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   215
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   216
    /** Run status, updated only within locks */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   217
    volatile boolean closed;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   218
    /** If non-null, the exception in closeExceptionally */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   219
    volatile Throwable closedException;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   220
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   221
    // Parameters for constructing BufferedSubscriptions
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   222
    final Executor executor;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   223
    final BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   224
    final int maxBufferCapacity;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   225
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   226
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   227
     * Creates a new SubmissionPublisher using the given Executor for
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   228
     * async delivery to subscribers, with the given maximum buffer size
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   229
     * for each subscriber, and, if non-null, the given handler invoked
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   230
     * when any Subscriber throws an exception in method {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   231
     * Flow.Subscriber#onNext(Object) onNext}.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   232
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   233
     * @param executor the executor to use for async delivery,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   234
     * supporting creation of at least one independent thread
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   235
     * @param maxBufferCapacity the maximum capacity for each
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   236
     * subscriber's buffer (the enforced capacity may be rounded up to
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   237
     * the nearest power of two and/or bounded by the largest value
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   238
     * supported by this implementation; method {@link #getMaxBufferCapacity}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   239
     * returns the actual value)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   240
     * @param handler if non-null, procedure to invoke upon exception
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   241
     * thrown in method {@code onNext}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   242
     * @throws NullPointerException if executor is null
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   243
     * @throws IllegalArgumentException if maxBufferCapacity not
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   244
     * positive
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   245
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   246
    public SubmissionPublisher(Executor executor, int maxBufferCapacity,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   247
                               BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> handler) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   248
        if (executor == null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   249
            throw new NullPointerException();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   250
        if (maxBufferCapacity <= 0)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   251
            throw new IllegalArgumentException("capacity must be positive");
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   252
        this.executor = executor;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   253
        this.onNextHandler = handler;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   254
        this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   255
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   256
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   257
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   258
     * Creates a new SubmissionPublisher using the given Executor for
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   259
     * async delivery to subscribers, with the given maximum buffer size
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   260
     * for each subscriber, and no handler for Subscriber exceptions in
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   261
     * method {@link Flow.Subscriber#onNext(Object) onNext}.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   262
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   263
     * @param executor the executor to use for async delivery,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   264
     * supporting creation of at least one independent thread
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   265
     * @param maxBufferCapacity the maximum capacity for each
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   266
     * subscriber's buffer (the enforced capacity may be rounded up to
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   267
     * the nearest power of two and/or bounded by the largest value
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   268
     * supported by this implementation; method {@link #getMaxBufferCapacity}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   269
     * returns the actual value)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   270
     * @throws NullPointerException if executor is null
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   271
     * @throws IllegalArgumentException if maxBufferCapacity not
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   272
     * positive
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   273
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   274
    public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   275
        this(executor, maxBufferCapacity, null);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   276
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   277
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   278
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   279
     * Creates a new SubmissionPublisher using the {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   280
     * ForkJoinPool#commonPool()} for async delivery to subscribers
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   281
     * (unless it does not support a parallelism level of at least two,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   282
     * in which case, a new Thread is created to run each task), with
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   283
     * maximum buffer capacity of {@link Flow#defaultBufferSize}, and no
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   284
     * handler for Subscriber exceptions in method {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   285
     * Flow.Subscriber#onNext(Object) onNext}.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   286
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   287
    public SubmissionPublisher() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   288
        this(ASYNC_POOL, Flow.defaultBufferSize(), null);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   289
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   290
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   291
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   292
     * Adds the given Subscriber unless already subscribed.  If already
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   293
     * subscribed, the Subscriber's {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   294
     * Flow.Subscriber#onError(Throwable) onError} method is invoked on
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   295
     * the existing subscription with an {@link IllegalStateException}.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   296
     * Otherwise, upon success, the Subscriber's {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   297
     * Flow.Subscriber#onSubscribe onSubscribe} method is invoked
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   298
     * asynchronously with a new {@link Flow.Subscription}.  If {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   299
     * Flow.Subscriber#onSubscribe onSubscribe} throws an exception, the
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   300
     * subscription is cancelled. Otherwise, if this SubmissionPublisher
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   301
     * was closed exceptionally, then the subscriber's {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   302
     * Flow.Subscriber#onError onError} method is invoked with the
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   303
     * corresponding exception, or if closed without exception, the
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   304
     * subscriber's {@link Flow.Subscriber#onComplete() onComplete}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   305
     * method is invoked.  Subscribers may enable receiving items by
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   306
     * invoking the {@link Flow.Subscription#request(long) request}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   307
     * method of the new Subscription, and may unsubscribe by invoking
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   308
     * its {@link Flow.Subscription#cancel() cancel} method.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   309
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   310
     * @param subscriber the subscriber
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   311
     * @throws NullPointerException if subscriber is null
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   312
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   313
    public void subscribe(Flow.Subscriber<? super T> subscriber) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   314
        if (subscriber == null) throw new NullPointerException();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   315
        BufferedSubscription<T> subscription =
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   316
            new BufferedSubscription<T>(subscriber, executor,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   317
                                        onNextHandler, maxBufferCapacity);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   318
        synchronized (this) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   319
            for (BufferedSubscription<T> b = clients, pred = null;;) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   320
                if (b == null) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   321
                    Throwable ex;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   322
                    subscription.onSubscribe();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   323
                    if ((ex = closedException) != null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   324
                        subscription.onError(ex);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   325
                    else if (closed)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   326
                        subscription.onComplete();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   327
                    else if (pred == null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   328
                        clients = subscription;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   329
                    else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   330
                        pred.next = subscription;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   331
                    break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   332
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   333
                BufferedSubscription<T> next = b.next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   334
                if (b.isDisabled()) { // remove
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   335
                    b.next = null;    // detach
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   336
                    if (pred == null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   337
                        clients = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   338
                    else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   339
                        pred.next = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   340
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   341
                else if (subscriber.equals(b.subscriber)) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   342
                    b.onError(new IllegalStateException("Duplicate subscribe"));
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   343
                    break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   344
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   345
                else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   346
                    pred = b;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   347
                b = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   348
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   349
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   350
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   351
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   352
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   353
     * Publishes the given item to each current subscriber by
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   354
     * asynchronously invoking its {@link Flow.Subscriber#onNext(Object)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   355
     * onNext} method, blocking uninterruptibly while resources for any
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   356
     * subscriber are unavailable. This method returns an estimate of
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   357
     * the maximum lag (number of items submitted but not yet consumed)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   358
     * among all current subscribers. This value is at least one
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   359
     * (accounting for this submitted item) if there are any
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   360
     * subscribers, else zero.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   361
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   362
     * <p>If the Executor for this publisher throws a
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   363
     * RejectedExecutionException (or any other RuntimeException or
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   364
     * Error) when attempting to asynchronously notify subscribers,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   365
     * then this exception is rethrown, in which case not all
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   366
     * subscribers will have been issued this item.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   367
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   368
     * @param item the (non-null) item to publish
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   369
     * @return the estimated maximum lag among subscribers
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   370
     * @throws IllegalStateException if closed
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   371
     * @throws NullPointerException if item is null
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   372
     * @throws RejectedExecutionException if thrown by Executor
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   373
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   374
    public int submit(T item) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   375
        if (item == null) throw new NullPointerException();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   376
        int lag = 0;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   377
        boolean complete;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   378
        synchronized (this) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   379
            complete = closed;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   380
            BufferedSubscription<T> b = clients;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   381
            if (!complete) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   382
                BufferedSubscription<T> pred = null, r = null, rtail = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   383
                while (b != null) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   384
                    BufferedSubscription<T> next = b.next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   385
                    int stat = b.offer(item);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   386
                    if (stat < 0) {           // disabled
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   387
                        b.next = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   388
                        if (pred == null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   389
                            clients = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   390
                        else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   391
                            pred.next = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   392
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   393
                    else {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   394
                        if (stat > lag)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   395
                            lag = stat;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   396
                        else if (stat == 0) { // place on retry list
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   397
                            b.nextRetry = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   398
                            if (rtail == null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   399
                                r = b;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   400
                            else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   401
                                rtail.nextRetry = b;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   402
                            rtail = b;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   403
                        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   404
                        pred = b;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   405
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   406
                    b = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   407
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   408
                while (r != null) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   409
                    BufferedSubscription<T> nextRetry = r.nextRetry;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   410
                    r.nextRetry = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   411
                    int stat = r.submit(item);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   412
                    if (stat > lag)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   413
                        lag = stat;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   414
                    else if (stat < 0 && clients == r)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   415
                        clients = r.next; // postpone internal unsubscribes
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   416
                    r = nextRetry;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   417
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   418
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   419
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   420
        if (complete)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   421
            throw new IllegalStateException("Closed");
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   422
        else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   423
            return lag;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   424
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   425
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   426
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   427
     * Publishes the given item, if possible, to each current subscriber
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   428
     * by asynchronously invoking its {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   429
     * Flow.Subscriber#onNext(Object) onNext} method. The item may be
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   430
     * dropped by one or more subscribers if resource limits are
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   431
     * exceeded, in which case the given handler (if non-null) is
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   432
     * invoked, and if it returns true, retried once.  Other calls to
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   433
     * methods in this class by other threads are blocked while the
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   434
     * handler is invoked.  Unless recovery is assured, options are
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   435
     * usually limited to logging the error and/or issuing an {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   436
     * Flow.Subscriber#onError(Throwable) onError} signal to the
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   437
     * subscriber.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   438
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   439
     * <p>This method returns a status indicator: If negative, it
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   440
     * represents the (negative) number of drops (failed attempts to
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   441
     * issue the item to a subscriber). Otherwise it is an estimate of
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   442
     * the maximum lag (number of items submitted but not yet
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   443
     * consumed) among all current subscribers. This value is at least
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   444
     * one (accounting for this submitted item) if there are any
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   445
     * subscribers, else zero.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   446
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   447
     * <p>If the Executor for this publisher throws a
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   448
     * RejectedExecutionException (or any other RuntimeException or
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   449
     * Error) when attempting to asynchronously notify subscribers, or
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   450
     * the drop handler throws an exception when processing a dropped
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   451
     * item, then this exception is rethrown.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   452
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   453
     * @param item the (non-null) item to publish
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   454
     * @param onDrop if non-null, the handler invoked upon a drop to a
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   455
     * subscriber, with arguments of the subscriber and item; if it
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   456
     * returns true, an offer is re-attempted (once)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   457
     * @return if negative, the (negative) number of drops; otherwise
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   458
     * an estimate of maximum lag
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   459
     * @throws IllegalStateException if closed
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   460
     * @throws NullPointerException if item is null
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   461
     * @throws RejectedExecutionException if thrown by Executor
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   462
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   463
    public int offer(T item,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   464
                     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   465
        return doOffer(0L, item, onDrop);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   466
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   467
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   468
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   469
     * Publishes the given item, if possible, to each current subscriber
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   470
     * by asynchronously invoking its {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   471
     * Flow.Subscriber#onNext(Object) onNext} method, blocking while
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   472
     * resources for any subscription are unavailable, up to the
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   473
     * specified timeout or until the caller thread is interrupted, at
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   474
     * which point the given handler (if non-null) is invoked, and if it
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   475
     * returns true, retried once. (The drop handler may distinguish
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   476
     * timeouts from interrupts by checking whether the current thread
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   477
     * is interrupted.)  Other calls to methods in this class by other
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   478
     * threads are blocked while the handler is invoked.  Unless
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   479
     * recovery is assured, options are usually limited to logging the
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   480
     * error and/or issuing an {@link Flow.Subscriber#onError(Throwable)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   481
     * onError} signal to the subscriber.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   482
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   483
     * <p>This method returns a status indicator: If negative, it
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   484
     * represents the (negative) number of drops (failed attempts to
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   485
     * issue the item to a subscriber). Otherwise it is an estimate of
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   486
     * the maximum lag (number of items submitted but not yet
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   487
     * consumed) among all current subscribers. This value is at least
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   488
     * one (accounting for this submitted item) if there are any
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   489
     * subscribers, else zero.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   490
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   491
     * <p>If the Executor for this publisher throws a
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   492
     * RejectedExecutionException (or any other RuntimeException or
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   493
     * Error) when attempting to asynchronously notify subscribers, or
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   494
     * the drop handler throws an exception when processing a dropped
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   495
     * item, then this exception is rethrown.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   496
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   497
     * @param item the (non-null) item to publish
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   498
     * @param timeout how long to wait for resources for any subscriber
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   499
     * before giving up, in units of {@code unit}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   500
     * @param unit a {@code TimeUnit} determining how to interpret the
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   501
     * {@code timeout} parameter
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   502
     * @param onDrop if non-null, the handler invoked upon a drop to a
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   503
     * subscriber, with arguments of the subscriber and item; if it
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   504
     * returns true, an offer is re-attempted (once)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   505
     * @return if negative, the (negative) number of drops; otherwise
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   506
     * an estimate of maximum lag
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   507
     * @throws IllegalStateException if closed
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   508
     * @throws NullPointerException if item is null
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   509
     * @throws RejectedExecutionException if thrown by Executor
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   510
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   511
    public int offer(T item, long timeout, TimeUnit unit,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   512
                     BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   513
        return doOffer(unit.toNanos(timeout), item, onDrop);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   514
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   515
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   516
    /** Common implementation for both forms of offer */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   517
    final int doOffer(long nanos, T item,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   518
                      BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   519
        if (item == null) throw new NullPointerException();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   520
        int lag = 0, drops = 0;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   521
        boolean complete;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   522
        synchronized (this) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   523
            complete = closed;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   524
            BufferedSubscription<T> b = clients;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   525
            if (!complete) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   526
                BufferedSubscription<T> pred = null, r = null, rtail = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   527
                while (b != null) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   528
                    BufferedSubscription<T> next = b.next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   529
                    int stat = b.offer(item);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   530
                    if (stat < 0) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   531
                        b.next = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   532
                        if (pred == null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   533
                            clients = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   534
                        else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   535
                            pred.next = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   536
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   537
                    else {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   538
                        if (stat > lag)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   539
                            lag = stat;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   540
                        else if (stat == 0) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   541
                            b.nextRetry = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   542
                            if (rtail == null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   543
                                r = b;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   544
                            else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   545
                                rtail.nextRetry = b;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   546
                            rtail = b;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   547
                        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   548
                        else if (stat > lag)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   549
                            lag = stat;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   550
                        pred = b;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   551
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   552
                    b = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   553
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   554
                while (r != null) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   555
                    BufferedSubscription<T> nextRetry = r.nextRetry;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   556
                    r.nextRetry = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   557
                    int stat = (nanos > 0L) ? r.timedOffer(item, nanos) :
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   558
                        r.offer(item);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   559
                    if (stat == 0 && onDrop != null &&
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   560
                        onDrop.test(r.subscriber, item))
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   561
                        stat = r.offer(item);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   562
                    if (stat == 0)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   563
                        ++drops;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   564
                    else if (stat > lag)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   565
                        lag = stat;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   566
                    else if (stat < 0 && clients == r)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   567
                        clients = r.next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   568
                    r = nextRetry;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   569
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   570
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   571
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   572
        if (complete)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   573
            throw new IllegalStateException("Closed");
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   574
        else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   575
            return (drops > 0) ? -drops : lag;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   576
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   577
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   578
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   579
     * Unless already closed, issues {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   580
     * Flow.Subscriber#onComplete() onComplete} signals to current
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   581
     * subscribers, and disallows subsequent attempts to publish.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   582
     * Upon return, this method does <em>NOT</em> guarantee that all
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   583
     * subscribers have yet completed.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   584
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   585
    public void close() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   586
        if (!closed) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   587
            BufferedSubscription<T> b;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   588
            synchronized (this) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   589
                b = clients;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   590
                clients = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   591
                closed = true;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   592
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   593
            while (b != null) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   594
                BufferedSubscription<T> next = b.next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   595
                b.next = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   596
                b.onComplete();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   597
                b = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   598
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   599
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   600
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   601
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   602
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   603
     * Unless already closed, issues {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   604
     * Flow.Subscriber#onError(Throwable) onError} signals to current
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   605
     * subscribers with the given error, and disallows subsequent
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   606
     * attempts to publish.  Future subscribers also receive the given
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   607
     * error. Upon return, this method does <em>NOT</em> guarantee
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   608
     * that all subscribers have yet completed.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   609
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   610
     * @param error the {@code onError} argument sent to subscribers
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   611
     * @throws NullPointerException if error is null
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   612
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   613
    public void closeExceptionally(Throwable error) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   614
        if (error == null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   615
            throw new NullPointerException();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   616
        if (!closed) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   617
            BufferedSubscription<T> b;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   618
            synchronized (this) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   619
                b = clients;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   620
                clients = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   621
                closed = true;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   622
                closedException = error;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   623
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   624
            while (b != null) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   625
                BufferedSubscription<T> next = b.next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   626
                b.next = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   627
                b.onError(error);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   628
                b = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   629
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   630
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   631
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   632
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   633
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   634
     * Returns true if this publisher is not accepting submissions.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   635
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   636
     * @return true if closed
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   637
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   638
    public boolean isClosed() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   639
        return closed;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   640
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   641
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   642
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   643
     * Returns the exception associated with {@link
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   644
     * #closeExceptionally(Throwable) closeExceptionally}, or null if
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   645
     * not closed or if closed normally.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   646
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   647
     * @return the exception, or null if none
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   648
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   649
    public Throwable getClosedException() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   650
        return closedException;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   651
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   652
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   653
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   654
     * Returns true if this publisher has any subscribers.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   655
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   656
     * @return true if this publisher has any subscribers
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   657
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   658
    public boolean hasSubscribers() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   659
        boolean nonEmpty = false;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   660
        if (!closed) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   661
            synchronized (this) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   662
                for (BufferedSubscription<T> b = clients; b != null;) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   663
                    BufferedSubscription<T> next = b.next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   664
                    if (b.isDisabled()) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   665
                        b.next = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   666
                        b = clients = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   667
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   668
                    else {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   669
                        nonEmpty = true;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   670
                        break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   671
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   672
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   673
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   674
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   675
        return nonEmpty;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   676
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   677
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   678
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   679
     * Returns the number of current subscribers.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   680
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   681
     * @return the number of current subscribers
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   682
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   683
    public int getNumberOfSubscribers() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   684
        int count = 0;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   685
        if (!closed) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   686
            synchronized (this) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   687
                BufferedSubscription<T> pred = null, next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   688
                for (BufferedSubscription<T> b = clients; b != null; b = next) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   689
                    next = b.next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   690
                    if (b.isDisabled()) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   691
                        b.next = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   692
                        if (pred == null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   693
                            clients = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   694
                        else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   695
                            pred.next = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   696
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   697
                    else {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   698
                        pred = b;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   699
                        ++count;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   700
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   701
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   702
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   703
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   704
        return count;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   705
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   706
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   707
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   708
     * Returns the Executor used for asynchronous delivery.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   709
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   710
     * @return the Executor used for asynchronous delivery
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   711
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   712
    public Executor getExecutor() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   713
        return executor;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   714
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   715
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   716
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   717
     * Returns the maximum per-subscriber buffer capacity.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   718
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   719
     * @return the maximum per-subscriber buffer capacity
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   720
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   721
    public int getMaxBufferCapacity() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   722
        return maxBufferCapacity;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   723
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   724
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   725
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   726
     * Returns a list of current subscribers for monitoring and
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   727
     * tracking purposes, not for invoking {@link Flow.Subscriber}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   728
     * methods on the subscribers.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   729
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   730
     * @return list of current subscribers
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   731
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   732
    public List<Flow.Subscriber<? super T>> getSubscribers() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   733
        ArrayList<Flow.Subscriber<? super T>> subs = new ArrayList<>();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   734
        synchronized (this) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   735
            BufferedSubscription<T> pred = null, next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   736
            for (BufferedSubscription<T> b = clients; b != null; b = next) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   737
                next = b.next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   738
                if (b.isDisabled()) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   739
                    b.next = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   740
                    if (pred == null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   741
                        clients = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   742
                    else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   743
                        pred.next = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   744
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   745
                else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   746
                    subs.add(b.subscriber);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   747
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   748
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   749
        return subs;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   750
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   751
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   752
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   753
     * Returns true if the given Subscriber is currently subscribed.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   754
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   755
     * @param subscriber the subscriber
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   756
     * @return true if currently subscribed
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   757
     * @throws NullPointerException if subscriber is null
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   758
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   759
    public boolean isSubscribed(Flow.Subscriber<? super T> subscriber) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   760
        if (subscriber == null) throw new NullPointerException();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   761
        if (!closed) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   762
            synchronized (this) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   763
                BufferedSubscription<T> pred = null, next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   764
                for (BufferedSubscription<T> b = clients; b != null; b = next) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   765
                    next = b.next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   766
                    if (b.isDisabled()) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   767
                        b.next = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   768
                        if (pred == null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   769
                            clients = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   770
                        else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   771
                            pred.next = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   772
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   773
                    else if (subscriber.equals(b.subscriber))
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   774
                        return true;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   775
                    else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   776
                        pred = b;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   777
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   778
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   779
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   780
        return false;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   781
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   782
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   783
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   784
     * Returns an estimate of the minimum number of items requested
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   785
     * (via {@link Flow.Subscription#request(long) request}) but not
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   786
     * yet produced, among all current subscribers.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   787
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   788
     * @return the estimate, or zero if no subscribers
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   789
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   790
    public long estimateMinimumDemand() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   791
        long min = Long.MAX_VALUE;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   792
        boolean nonEmpty = false;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   793
        synchronized (this) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   794
            BufferedSubscription<T> pred = null, next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   795
            for (BufferedSubscription<T> b = clients; b != null; b = next) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   796
                int n; long d;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   797
                next = b.next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   798
                if ((n = b.estimateLag()) < 0) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   799
                    b.next = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   800
                    if (pred == null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   801
                        clients = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   802
                    else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   803
                        pred.next = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   804
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   805
                else {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   806
                    if ((d = b.demand - n) < min)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   807
                        min = d;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   808
                    nonEmpty = true;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   809
                    pred = b;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   810
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   811
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   812
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   813
        return nonEmpty ? min : 0;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   814
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   815
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   816
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   817
     * Returns an estimate of the maximum number of items produced but
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   818
     * not yet consumed among all current subscribers.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   819
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   820
     * @return the estimate
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   821
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   822
    public int estimateMaximumLag() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   823
        int max = 0;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   824
        synchronized (this) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   825
            BufferedSubscription<T> pred = null, next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   826
            for (BufferedSubscription<T> b = clients; b != null; b = next) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   827
                int n;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   828
                next = b.next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   829
                if ((n = b.estimateLag()) < 0) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   830
                    b.next = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   831
                    if (pred == null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   832
                        clients = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   833
                    else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   834
                        pred.next = next;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   835
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   836
                else {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   837
                    if (n > max)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   838
                        max = n;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   839
                    pred = b;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   840
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   841
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   842
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   843
        return max;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   844
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   845
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   846
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   847
     * Processes all published items using the given Consumer function.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   848
     * Returns a CompletableFuture that is completed normally when this
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   849
     * publisher signals {@link Flow.Subscriber#onComplete()
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   850
     * onComplete}, or completed exceptionally upon any error, or an
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   851
     * exception is thrown by the Consumer, or the returned
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   852
     * CompletableFuture is cancelled, in which case no further items
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   853
     * are processed.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   854
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   855
     * @param consumer the function applied to each onNext item
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   856
     * @return a CompletableFuture that is completed normally
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   857
     * when the publisher signals onComplete, and exceptionally
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   858
     * upon any error or cancellation
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   859
     * @throws NullPointerException if consumer is null
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   860
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   861
    public CompletableFuture<Void> consume(Consumer<? super T> consumer) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   862
        if (consumer == null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   863
            throw new NullPointerException();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   864
        CompletableFuture<Void> status = new CompletableFuture<>();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   865
        subscribe(new ConsumerSubscriber<T>(status, consumer));
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   866
        return status;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   867
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   868
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   869
    /** Subscriber for method consume */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   870
    private static final class ConsumerSubscriber<T>
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
   871
        implements Flow.Subscriber<T> {
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   872
        final CompletableFuture<Void> status;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   873
        final Consumer<? super T> consumer;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   874
        Flow.Subscription subscription;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   875
        ConsumerSubscriber(CompletableFuture<Void> status,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   876
                           Consumer<? super T> consumer) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   877
            this.status = status; this.consumer = consumer;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   878
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   879
        public final void onSubscribe(Flow.Subscription subscription) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   880
            this.subscription = subscription;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   881
            status.whenComplete((v, e) -> subscription.cancel());
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   882
            if (!status.isDone())
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   883
                subscription.request(Long.MAX_VALUE);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   884
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   885
        public final void onError(Throwable ex) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   886
            status.completeExceptionally(ex);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   887
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   888
        public final void onComplete() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   889
            status.complete(null);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   890
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   891
        public final void onNext(T item) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   892
            try {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   893
                consumer.accept(item);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   894
            } catch (Throwable ex) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   895
                subscription.cancel();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   896
                status.completeExceptionally(ex);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   897
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   898
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   899
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   900
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   901
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   902
     * A task for consuming buffer items and signals, created and
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   903
     * executed whenever they become available. A task consumes as
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   904
     * many items/signals as possible before terminating, at which
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   905
     * point another task is created when needed. The dual Runnable
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   906
     * and ForkJoinTask declaration saves overhead when executed by
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   907
     * ForkJoinPools, without impacting other kinds of Executors.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   908
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   909
    @SuppressWarnings("serial")
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   910
    static final class ConsumerTask<T> extends ForkJoinTask<Void>
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
   911
        implements Runnable, CompletableFuture.AsynchronousCompletionTask {
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   912
        final BufferedSubscription<T> consumer;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   913
        ConsumerTask(BufferedSubscription<T> consumer) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   914
            this.consumer = consumer;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   915
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   916
        public final Void getRawResult() { return null; }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   917
        public final void setRawResult(Void v) {}
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   918
        public final boolean exec() { consumer.consume(); return false; }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   919
        public final void run() { consumer.consume(); }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   920
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   921
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   922
    /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   923
     * A bounded (ring) buffer with integrated control to start a
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   924
     * consumer task whenever items are available.  The buffer
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   925
     * algorithm is similar to one used inside ForkJoinPool (see its
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   926
     * internal documentation for details) specialized for the case of
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   927
     * at most one concurrent producer and consumer, and power of two
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   928
     * buffer sizes. This allows methods to operate without locks even
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   929
     * while supporting resizing, blocking, task-triggering, and
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   930
     * garbage-free buffers (nulling out elements when consumed),
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   931
     * although supporting these does impose a bit of overhead
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   932
     * compared to plain fixed-size ring buffers.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   933
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   934
     * The publisher guarantees a single producer via its lock.  We
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   935
     * ensure in this class that there is at most one consumer.  The
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   936
     * request and cancel methods must be fully thread-safe but are
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   937
     * coded to exploit the most common case in which they are only
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   938
     * called by consumers (usually within onNext).
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   939
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   940
     * Execution control is managed using the ACTIVE ctl bit. We
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   941
     * ensure that a task is active when consumable items (and
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   942
     * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   943
     * there is demand (unfilled requests).  This is complicated on
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   944
     * the creation side by the possibility of exceptions when trying
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   945
     * to execute tasks. These eventually force DISABLED state, but
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   946
     * sometimes not directly. On the task side, termination (clearing
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   947
     * ACTIVE) that would otherwise race with producers or request()
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   948
     * calls uses the CONSUME keep-alive bit to force a recheck.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   949
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   950
     * The ctl field also manages run state. When DISABLED, no further
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   951
     * updates are possible. Disabling may be preceded by setting
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   952
     * ERROR or COMPLETE (or both -- ERROR has precedence), in which
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   953
     * case the associated Subscriber methods are invoked, possibly
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   954
     * synchronously if there is no active consumer task (including
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   955
     * cases where execute() failed). The cancel() method is supported
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   956
     * by treating as ERROR but suppressing onError signal.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   957
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   958
     * Support for blocking also exploits the fact that there is only
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   959
     * one possible waiter. ManagedBlocker-compatible control fields
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   960
     * are placed in this class itself rather than in wait-nodes.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   961
     * Blocking control relies on the "waiter" field. Producers set
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   962
     * the field before trying to block, but must then recheck (via
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   963
     * offer) before parking. Signalling then just unparks and clears
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
   964
     * waiter field. If the producer and/or consumer are using a
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
   965
     * ForkJoinPool, the producer attempts to help run consumer tasks
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
   966
     * via ForkJoinPool.helpAsyncBlocker before blocking.
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   967
     *
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   968
     * This class uses @Contended and heuristic field declaration
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   969
     * ordering to reduce false-sharing-based memory contention among
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   970
     * instances of BufferedSubscription, but it does not currently
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   971
     * attempt to avoid memory contention among buffers. This field
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   972
     * and element packing can hurt performance especially when each
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   973
     * publisher has only one client operating at a high rate.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   974
     * Addressing this may require allocating substantially more space
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   975
     * than users expect.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   976
     */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   977
    @SuppressWarnings("serial")
34369
b6df4cc80001 8140687: Move @Contended to the jdk.internal.vm.annotation package
chegar
parents: 33674
diff changeset
   978
    @jdk.internal.vm.annotation.Contended
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   979
    private static final class BufferedSubscription<T>
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   980
        implements Flow.Subscription, ForkJoinPool.ManagedBlocker {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   981
        // Order-sensitive field declarations
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   982
        long timeout;                      // > 0 if timed wait
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   983
        volatile long demand;              // # unfilled requests
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   984
        int maxCapacity;                   // reduced on OOME
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   985
        int putStat;                       // offer result for ManagedBlocker
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   986
        volatile int ctl;                  // atomic run state flags
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   987
        volatile int head;                 // next position to take
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   988
        int tail;                          // next position to put
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   989
        Object[] array;                    // buffer: null if disabled
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   990
        Flow.Subscriber<? super T> subscriber; // null if disabled
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   991
        Executor executor;                 // null if disabled
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   992
        BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> onNextHandler;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   993
        volatile Throwable pendingError;   // holds until onError issued
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   994
        volatile Thread waiter;            // blocked producer thread
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   995
        T putItem;                         // for offer within ManagedBlocker
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   996
        BufferedSubscription<T> next;      // used only by publisher
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   997
        BufferedSubscription<T> nextRetry; // used only by publisher
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   998
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
   999
        // ctl values
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1000
        static final int ACTIVE    = 0x01; // consumer task active
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1001
        static final int CONSUME   = 0x02; // keep-alive for consumer task
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1002
        static final int DISABLED  = 0x04; // final state
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1003
        static final int ERROR     = 0x08; // signal onError then disable
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1004
        static final int SUBSCRIBE = 0x10; // signal onSubscribe
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1005
        static final int COMPLETE  = 0x20; // signal onComplete when done
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1006
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1007
        static final long INTERRUPTED = -1L; // timeout vs interrupt sentinel
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1008
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1009
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1010
         * Initial buffer capacity used when maxBufferCapacity is
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1011
         * greater. Must be a power of two.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1012
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1013
        static final int DEFAULT_INITIAL_CAP = 32;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1014
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1015
        BufferedSubscription(Flow.Subscriber<? super T> subscriber,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1016
                             Executor executor,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1017
                             BiConsumer<? super Flow.Subscriber<? super T>,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1018
                             ? super Throwable> onNextHandler,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1019
                             int maxBufferCapacity) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1020
            this.subscriber = subscriber;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1021
            this.executor = executor;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1022
            this.onNextHandler = onNextHandler;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1023
            this.maxCapacity = maxBufferCapacity;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1024
            this.array = new Object[maxBufferCapacity < DEFAULT_INITIAL_CAP ?
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1025
                                    (maxBufferCapacity < 2 ? // at least 2 slots
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1026
                                     2 : maxBufferCapacity) :
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1027
                                    DEFAULT_INITIAL_CAP];
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1028
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1029
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1030
        final boolean isDisabled() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1031
            return ctl == DISABLED;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1032
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1033
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1034
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1035
         * Returns estimated number of buffered items, or -1 if
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1036
         * disabled.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1037
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1038
        final int estimateLag() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1039
            int n;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1040
            return (ctl == DISABLED) ? -1 : ((n = tail - head) > 0) ? n : 0;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1041
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1042
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1043
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1044
         * Tries to add item and start consumer task if necessary.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1045
         * @return -1 if disabled, 0 if dropped, else estimated lag
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1046
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1047
        final int offer(T item) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1048
            int h = head, t = tail, cap, size, stat;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1049
            Object[] a = array;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1050
            if (a != null && (cap = a.length) > 0 && cap >= (size = t + 1 - h)) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1051
                a[(cap - 1) & t] = item;    // relaxed writes OK
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1052
                tail = t + 1;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1053
                stat = size;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1054
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1055
            else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1056
                stat = growAndAdd(a, item);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1057
            return (stat > 0 &&
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1058
                    (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1059
                startOnOffer(stat) : stat;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1060
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1061
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1062
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1063
         * Tries to create or expand buffer, then adds item if possible.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1064
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1065
        private int growAndAdd(Object[] a, T item) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1066
            boolean alloc;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1067
            int cap, stat;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1068
            if ((ctl & (ERROR | DISABLED)) != 0) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1069
                cap = 0;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1070
                stat = -1;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1071
                alloc = false;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1072
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1073
            else if (a == null || (cap = a.length) <= 0) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1074
                cap = 0;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1075
                stat = 1;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1076
                alloc = true;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1077
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1078
            else {
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1079
                VarHandle.fullFence();           // recheck
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1080
                int h = head, t = tail, size = t + 1 - h;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1081
                if (cap >= size) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1082
                    a[(cap - 1) & t] = item;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1083
                    tail = t + 1;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1084
                    stat = size;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1085
                    alloc = false;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1086
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1087
                else if (cap >= maxCapacity) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1088
                    stat = 0;                    // cannot grow
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1089
                    alloc = false;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1090
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1091
                else {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1092
                    stat = cap + 1;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1093
                    alloc = true;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1094
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1095
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1096
            if (alloc) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1097
                int newCap = (cap > 0) ? cap << 1 : 1;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1098
                if (newCap <= cap)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1099
                    stat = 0;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1100
                else {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1101
                    Object[] newArray = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1102
                    try {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1103
                        newArray = new Object[newCap];
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1104
                    } catch (Throwable ex) {     // try to cope with OOME
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1105
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1106
                    if (newArray == null) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1107
                        if (cap > 0)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1108
                            maxCapacity = cap;   // avoid continuous failure
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1109
                        stat = 0;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1110
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1111
                    else {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1112
                        array = newArray;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1113
                        int t = tail;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1114
                        int newMask = newCap - 1;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1115
                        if (a != null && cap > 0) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1116
                            int mask = cap - 1;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1117
                            for (int j = head; j != t; ++j) {
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1118
                                int k = j & mask;
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1119
                                Object x = QA.getAcquire(a, k);
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1120
                                if (x != null && // races with consumer
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1121
                                    QA.compareAndSet(a, k, x, null))
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1122
                                    newArray[j & newMask] = x;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1123
                            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1124
                        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1125
                        newArray[t & newMask] = item;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1126
                        tail = t + 1;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1127
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1128
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1129
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1130
            return stat;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1131
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1132
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1133
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1134
         * Spins/helps/blocks while offer returns 0.  Called only if
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1135
         * initial offer return 0.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1136
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1137
        final int submit(T item) {
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1138
            int stat;
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1139
            if ((stat = offer(item)) == 0) {
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1140
                putItem = item;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1141
                timeout = 0L;
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1142
                putStat = 0;
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1143
                ForkJoinPool.helpAsyncBlocker(executor, this);
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1144
                if ((stat = putStat) == 0) {
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1145
                    try {
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1146
                        ForkJoinPool.managedBlock(this);
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1147
                    } catch (InterruptedException ie) {
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1148
                        timeout = INTERRUPTED;
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1149
                    }
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1150
                    stat = putStat;
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1151
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1152
                if (timeout < 0L)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1153
                    Thread.currentThread().interrupt();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1154
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1155
            return stat;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1156
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1157
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1158
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1159
         * Timeout version; similar to submit.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1160
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1161
        final int timedOffer(T item, long nanos) {
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1162
            int stat;
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1163
            if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) {
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1164
                putItem = item;
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1165
                putStat = 0;
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1166
                ForkJoinPool.helpAsyncBlocker(executor, this);
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1167
                if ((stat = putStat) == 0) {
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1168
                    try {
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1169
                        ForkJoinPool.managedBlock(this);
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1170
                    } catch (InterruptedException ie) {
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1171
                        timeout = INTERRUPTED;
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1172
                    }
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1173
                    stat = putStat;
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1174
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1175
                if (timeout < 0L)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1176
                    Thread.currentThread().interrupt();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1177
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1178
            return stat;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1179
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1180
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1181
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1182
         * Tries to start consumer task after offer.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1183
         * @return -1 if now disabled, else argument
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1184
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1185
        private int startOnOffer(int stat) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1186
            for (;;) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1187
                Executor e; int c;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1188
                if ((c = ctl) == DISABLED || (e = executor) == null) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1189
                    stat = -1;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1190
                    break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1191
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1192
                else if ((c & ACTIVE) != 0) { // ensure keep-alive
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1193
                    if ((c & CONSUME) != 0 ||
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1194
                        CTL.compareAndSet(this, c, c | CONSUME))
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1195
                        break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1196
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1197
                else if (demand == 0L || tail == head)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1198
                    break;
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1199
                else if (CTL.compareAndSet(this, c, c | (ACTIVE | CONSUME))) {
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1200
                    try {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1201
                        e.execute(new ConsumerTask<T>(this));
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1202
                        break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1203
                    } catch (RuntimeException | Error ex) { // back out
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1204
                        do {} while (((c = ctl) & DISABLED) == 0 &&
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1205
                                     (c & ACTIVE) != 0 &&
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1206
                                     !CTL.weakCompareAndSetVolatile
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1207
                                     (this, c, c & ~ACTIVE));
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1208
                        throw ex;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1209
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1210
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1211
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1212
            return stat;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1213
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1214
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1215
        private void signalWaiter(Thread w) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1216
            waiter = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1217
            LockSupport.unpark(w);    // release producer
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1218
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1219
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1220
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1221
         * Nulls out most fields, mainly to avoid garbage retention
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1222
         * until publisher unsubscribes, but also to help cleanly stop
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1223
         * upon error by nulling required components.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1224
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1225
        private void detach() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1226
            Thread w = waiter;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1227
            executor = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1228
            subscriber = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1229
            pendingError = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1230
            signalWaiter(w);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1231
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1232
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1233
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1234
         * Issues error signal, asynchronously if a task is running,
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1235
         * else synchronously.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1236
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1237
        final void onError(Throwable ex) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1238
            for (int c;;) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1239
                if (((c = ctl) & (ERROR | DISABLED)) != 0)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1240
                    break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1241
                else if ((c & ACTIVE) != 0) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1242
                    pendingError = ex;
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1243
                    if (CTL.compareAndSet(this, c, c | ERROR))
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1244
                        break; // cause consumer task to exit
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1245
                }
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1246
                else if (CTL.compareAndSet(this, c, DISABLED)) {
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1247
                    Flow.Subscriber<? super T> s = subscriber;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1248
                    if (s != null && ex != null) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1249
                        try {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1250
                            s.onError(ex);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1251
                        } catch (Throwable ignore) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1252
                        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1253
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1254
                    detach();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1255
                    break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1256
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1257
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1258
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1259
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1260
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1261
         * Tries to start consumer task upon a signal or request;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1262
         * disables on failure.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1263
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1264
        private void startOrDisable() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1265
            Executor e;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1266
            if ((e = executor) != null) { // skip if already disabled
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1267
                try {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1268
                    e.execute(new ConsumerTask<T>(this));
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1269
                } catch (Throwable ex) {  // back out and force signal
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1270
                    for (int c;;) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1271
                        if ((c = ctl) == DISABLED || (c & ACTIVE) == 0)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1272
                            break;
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1273
                        if (CTL.compareAndSet(this, c, c & ~ACTIVE)) {
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1274
                            onError(ex);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1275
                            break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1276
                        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1277
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1278
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1279
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1280
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1281
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1282
        final void onComplete() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1283
            for (int c;;) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1284
                if ((c = ctl) == DISABLED)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1285
                    break;
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1286
                if (CTL.compareAndSet(this, c,
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1287
                                      c | (ACTIVE | CONSUME | COMPLETE))) {
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1288
                    if ((c & ACTIVE) == 0)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1289
                        startOrDisable();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1290
                    break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1291
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1292
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1293
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1294
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1295
        final void onSubscribe() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1296
            for (int c;;) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1297
                if ((c = ctl) == DISABLED)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1298
                    break;
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1299
                if (CTL.compareAndSet(this, c,
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1300
                                      c | (ACTIVE | CONSUME | SUBSCRIBE))) {
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1301
                    if ((c & ACTIVE) == 0)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1302
                        startOrDisable();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1303
                    break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1304
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1305
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1306
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1307
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1308
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1309
         * Causes consumer task to exit if active (without reporting
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1310
         * onError unless there is already a pending error), and
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1311
         * disables.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1312
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1313
        public void cancel() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1314
            for (int c;;) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1315
                if ((c = ctl) == DISABLED)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1316
                    break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1317
                else if ((c & ACTIVE) != 0) {
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1318
                    if (CTL.compareAndSet(this, c,
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1319
                                          c | (CONSUME | ERROR)))
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1320
                        break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1321
                }
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1322
                else if (CTL.compareAndSet(this, c, DISABLED)) {
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1323
                    detach();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1324
                    break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1325
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1326
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1327
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1328
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1329
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1330
         * Adds to demand and possibly starts task.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1331
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1332
        public void request(long n) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1333
            if (n > 0L) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1334
                for (;;) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1335
                    long prev = demand, d;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1336
                    if ((d = prev + n) < prev) // saturate
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1337
                        d = Long.MAX_VALUE;
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1338
                    if (DEMAND.compareAndSet(this, prev, d)) {
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1339
                        for (int c, h;;) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1340
                            if ((c = ctl) == DISABLED)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1341
                                break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1342
                            else if ((c & ACTIVE) != 0) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1343
                                if ((c & CONSUME) != 0 ||
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1344
                                    CTL.compareAndSet(this, c, c | CONSUME))
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1345
                                    break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1346
                            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1347
                            else if ((h = head) != tail) {
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1348
                                if (CTL.compareAndSet(this, c,
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1349
                                                      c | (ACTIVE|CONSUME))) {
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1350
                                    startOrDisable();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1351
                                    break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1352
                                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1353
                            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1354
                            else if (head == h && tail == h)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1355
                                break;          // else stale
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1356
                            if (demand == 0L)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1357
                                break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1358
                        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1359
                        break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1360
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1361
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1362
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1363
            else if (n < 0L)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1364
                onError(new IllegalArgumentException(
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1365
                            "negative subscription request"));
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1366
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1367
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1368
        public final boolean isReleasable() { // for ManagedBlocker
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1369
            T item = putItem;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1370
            if (item != null) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1371
                if ((putStat = offer(item)) == 0)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1372
                    return false;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1373
                putItem = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1374
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1375
            return true;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1376
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1377
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1378
        public final boolean block() { // for ManagedBlocker
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1379
            T item = putItem;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1380
            if (item != null) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1381
                putItem = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1382
                long nanos = timeout;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1383
                long deadline = (nanos > 0L) ? System.nanoTime() + nanos : 0L;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1384
                while ((putStat = offer(item)) == 0) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1385
                    if (Thread.interrupted()) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1386
                        timeout = INTERRUPTED;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1387
                        if (nanos > 0L)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1388
                            break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1389
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1390
                    else if (nanos > 0L &&
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1391
                             (nanos = deadline - System.nanoTime()) <= 0L)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1392
                        break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1393
                    else if (waiter == null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1394
                        waiter = Thread.currentThread();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1395
                    else {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1396
                        if (nanos > 0L)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1397
                            LockSupport.parkNanos(this, nanos);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1398
                        else
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1399
                            LockSupport.park(this);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1400
                        waiter = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1401
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1402
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1403
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1404
            waiter = null;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1405
            return true;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1406
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1407
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1408
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1409
         * Consumer loop, called from ConsumerTask, or indirectly
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1410
         * when helping during submit.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1411
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1412
        final void consume() {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1413
            Flow.Subscriber<? super T> s;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1414
            int h = head;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1415
            if ((s = subscriber) != null) {           // else disabled
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1416
                for (;;) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1417
                    long d = demand;
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1418
                    int c; Object[] a; int n, i; Object x; Thread w;
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1419
                    if (((c = ctl) & (ERROR | SUBSCRIBE | DISABLED)) != 0) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1420
                        if (!checkControl(s, c))
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1421
                            break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1422
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1423
                    else if ((a = array) == null || h == tail ||
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1424
                             (n = a.length) == 0 ||
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1425
                             (x = QA.getAcquire(a, i = (n - 1) & h)) == null) {
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1426
                        if (!checkEmpty(s, c))
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1427
                            break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1428
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1429
                    else if (d == 0L) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1430
                        if (!checkDemand(c))
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1431
                            break;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1432
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1433
                    else if (((c & CONSUME) != 0 ||
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1434
                              CTL.compareAndSet(this, c, c | CONSUME)) &&
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1435
                             QA.compareAndSet(a, i, x, null)) {
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1436
                        HEAD.setRelease(this, ++h);
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1437
                        DEMAND.getAndAdd(this, -1L);
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1438
                        if ((w = waiter) != null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1439
                            signalWaiter(w);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1440
                        try {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1441
                            @SuppressWarnings("unchecked") T y = (T) x;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1442
                            s.onNext(y);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1443
                        } catch (Throwable ex) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1444
                            handleOnNext(s, ex);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1445
                        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1446
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1447
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1448
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1449
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1450
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1451
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1452
         * Responds to control events in consume().
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1453
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1454
        private boolean checkControl(Flow.Subscriber<? super T> s, int c) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1455
            boolean stat = true;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1456
            if ((c & ERROR) != 0) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1457
                Throwable ex = pendingError;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1458
                ctl = DISABLED;           // no need for CAS
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1459
                if (ex != null) {         // null if errorless cancel
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1460
                    try {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1461
                        if (s != null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1462
                            s.onError(ex);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1463
                    } catch (Throwable ignore) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1464
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1465
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1466
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1467
            else if ((c & SUBSCRIBE) != 0) {
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1468
                if (CTL.compareAndSet(this, c, c & ~SUBSCRIBE)) {
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1469
                    try {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1470
                        if (s != null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1471
                            s.onSubscribe(this);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1472
                    } catch (Throwable ex) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1473
                        onError(ex);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1474
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1475
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1476
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1477
            else {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1478
                detach();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1479
                stat = false;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1480
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1481
            return stat;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1482
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1483
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1484
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1485
         * Responds to apparent emptiness in consume().
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1486
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1487
        private boolean checkEmpty(Flow.Subscriber<? super T> s, int c) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1488
            boolean stat = true;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1489
            if (head == tail) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1490
                if ((c & CONSUME) != 0)
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1491
                    CTL.compareAndSet(this, c, c & ~CONSUME);
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1492
                else if ((c & COMPLETE) != 0) {
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1493
                    if (CTL.compareAndSet(this, c, DISABLED)) {
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1494
                        try {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1495
                            if (s != null)
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1496
                                s.onComplete();
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1497
                        } catch (Throwable ignore) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1498
                        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1499
                    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1500
                }
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1501
                else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1502
                    stat = false;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1503
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1504
            return stat;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1505
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1506
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1507
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1508
         * Responds to apparent zero demand in consume().
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1509
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1510
        private boolean checkDemand(int c) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1511
            boolean stat = true;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1512
            if (demand == 0L) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1513
                if ((c & CONSUME) != 0)
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1514
                    CTL.compareAndSet(this, c, c & ~CONSUME);
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1515
                else if (CTL.compareAndSet(this, c, c & ~ACTIVE))
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1516
                    stat = false;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1517
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1518
            return stat;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1519
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1520
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1521
        /**
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1522
         * Processes exception in Subscriber.onNext.
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1523
         */
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1524
        private void handleOnNext(Flow.Subscriber<? super T> s, Throwable ex) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1525
            BiConsumer<? super Flow.Subscriber<? super T>, ? super Throwable> h;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1526
            if ((h = onNextHandler) != null) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1527
                try {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1528
                    h.accept(s, ex);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1529
                } catch (Throwable ignore) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1530
                }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1531
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1532
            onError(ex);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1533
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1534
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1535
        // VarHandle mechanics
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1536
        private static final VarHandle CTL;
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1537
        private static final VarHandle TAIL;
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1538
        private static final VarHandle HEAD;
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1539
        private static final VarHandle DEMAND;
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1540
        private static final VarHandle QA;
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1541
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1542
        static {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1543
            try {
39723
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1544
                MethodHandles.Lookup l = MethodHandles.lookup();
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1545
                CTL = l.findVarHandle(BufferedSubscription.class, "ctl",
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1546
                                      int.class);
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1547
                TAIL = l.findVarHandle(BufferedSubscription.class, "tail",
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1548
                                       int.class);
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1549
                HEAD = l.findVarHandle(BufferedSubscription.class, "head",
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1550
                                       int.class);
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1551
                DEMAND = l.findVarHandle(BufferedSubscription.class, "demand",
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1552
                                         long.class);
9aa34e4a0469 8157523: Various improvements to ForkJoin/SubmissionPublisher code
dl
parents: 36936
diff changeset
  1553
                QA = MethodHandles.arrayElementVarHandle(Object[].class);
32989
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1554
            } catch (ReflectiveOperationException e) {
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1555
                throw new Error(e);
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1556
            }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1557
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1558
            // Reduce the risk of rare disastrous classloading in first call to
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1559
            // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1560
            Class<?> ensureLoaded = LockSupport.class;
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1561
        }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1562
    }
c0ff74aaf943 8134850: Integrate the Flow API
dl
parents:
diff changeset
  1563
}