test/jdk/java/net/httpclient/reactivestreams-tck/org/reactivestreams/example/unicast/AsyncIterablePublisher.java
changeset 55546 3ae57bbf9585
equal deleted inserted replaced
55545:8a153a932d0f 55546:3ae57bbf9585
       
     1 /*
       
     2  * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 package org.reactivestreams.example.unicast;
       
    25 
       
    26 import org.reactivestreams.Publisher;
       
    27 import org.reactivestreams.Subscriber;
       
    28 import org.reactivestreams.Subscription;
       
    29 
       
    30 import java.util.Iterator;
       
    31 import java.util.Collections;
       
    32 import java.util.concurrent.Executor;
       
    33 import java.util.concurrent.atomic.AtomicBoolean;
       
    34 import java.util.concurrent.ConcurrentLinkedQueue;
       
    35 
       
    36 /**
       
    37  * AsyncIterablePublisher is an implementation of Reactive Streams `Publisher`
       
    38  * which executes asynchronously, using a provided `Executor` and produces elements
       
    39  * from a given `Iterable` in a "unicast" configuration to its `Subscribers`.
       
    40  *
       
    41  * NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden.
       
    42  */
       
    43 public class AsyncIterablePublisher<T> implements Publisher<T> {
       
    44   private final static int DEFAULT_BATCHSIZE = 1024;
       
    45 
       
    46   private final Iterable<T> elements; // This is our data source / generator
       
    47   private final Executor executor; // This is our thread pool, which will make sure that our Publisher runs asynchronously to its Subscribers
       
    48   private final int batchSize; // In general, if one uses an `Executor`, one should be nice nad not hog a thread for too long, this is the cap for that, in elements
       
    49 
       
    50   public AsyncIterablePublisher(final Iterable<T> elements, final Executor executor) {
       
    51     this(elements, DEFAULT_BATCHSIZE, executor);
       
    52   }
       
    53 
       
    54   public AsyncIterablePublisher(final Iterable<T> elements, final int batchSize, final Executor executor) {
       
    55     if (elements == null) throw null;
       
    56     if (executor == null) throw null;
       
    57     if (batchSize < 1) throw new IllegalArgumentException("batchSize must be greater than zero!");
       
    58     this.elements = elements;
       
    59     this.executor = executor;
       
    60     this.batchSize = batchSize;
       
    61   }
       
    62 
       
    63   @Override
       
    64   public void subscribe(final Subscriber<? super T> s) {
       
    65     // As per rule 1.11, we have decided to support multiple subscribers in a unicast configuration
       
    66     // for this `Publisher` implementation.
       
    67     // As per 2.13, this method must return normally (i.e. not throw)
       
    68     new SubscriptionImpl(s).init();
       
    69   }
       
    70 
       
    71   // These represent the protocol of the `AsyncIterablePublishers` SubscriptionImpls
       
    72   static interface Signal {};
       
    73   enum Cancel implements Signal { Instance; };
       
    74   enum Subscribe implements Signal { Instance; };
       
    75   enum Send implements Signal { Instance; };
       
    76   static final class Request implements Signal {
       
    77     final long n;
       
    78     Request(final long n) {
       
    79       this.n = n;
       
    80     }
       
    81   };
       
    82 
       
    83   // This is our implementation of the Reactive Streams `Subscription`,
       
    84   // which represents the association between a `Publisher` and a `Subscriber`.
       
    85   final class SubscriptionImpl implements Subscription, Runnable {
       
    86     final Subscriber<? super T> subscriber; // We need a reference to the `Subscriber` so we can talk to it
       
    87     private boolean cancelled = false; // This flag will track whether this `Subscription` is to be considered cancelled or not
       
    88     private long demand = 0; // Here we track the current demand, i.e. what has been requested but not yet delivered
       
    89     private Iterator<T> iterator; // This is our cursor into the data stream, which we will send to the `Subscriber`
       
    90 
       
    91     SubscriptionImpl(final Subscriber<? super T> subscriber) {
       
    92       // As per rule 1.09, we need to throw a `java.lang.NullPointerException` if the `Subscriber` is `null`
       
    93       if (subscriber == null) throw null;
       
    94       this.subscriber = subscriber;
       
    95     }
       
    96 
       
    97     // This `ConcurrentLinkedQueue` will track signals that are sent to this `Subscription`, like `request` and `cancel`
       
    98     private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<Signal>();
       
    99 
       
   100     // We are using this `AtomicBoolean` to make sure that this `Subscription` doesn't run concurrently with itself,
       
   101     // which would violate rule 1.3 among others (no concurrent notifications).
       
   102     private final AtomicBoolean on = new AtomicBoolean(false);
       
   103 
       
   104     // This method will register inbound demand from our `Subscriber` and validate it against rule 3.9 and rule 3.17
       
   105     private void doRequest(final long n) {
       
   106       if (n < 1)
       
   107         terminateDueTo(new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
       
   108       else if (demand + n < 1) {
       
   109         // As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as "effectively unbounded"
       
   110         demand = Long.MAX_VALUE;  // Here we protect from the overflow and treat it as "effectively unbounded"
       
   111         doSend(); // Then we proceed with sending data downstream
       
   112       } else {
       
   113         demand += n; // Here we record the downstream demand
       
   114         doSend(); // Then we can proceed with sending data downstream
       
   115       }
       
   116     }
       
   117 
       
   118     // This handles cancellation requests, and is idempotent, thread-safe and not synchronously performing heavy computations as specified in rule 3.5
       
   119     private void doCancel() {
       
   120       cancelled = true;
       
   121     }
       
   122 
       
   123     // Instead of executing `subscriber.onSubscribe` synchronously from within `Publisher.subscribe`
       
   124     // we execute it asynchronously, this is to avoid executing the user code (`Iterable.iterator`) on the calling thread.
       
   125     // It also makes it easier to follow rule 1.9
       
   126     private void doSubscribe() {
       
   127       try {
       
   128         iterator = elements.iterator();
       
   129         if (iterator == null)
       
   130           iterator = Collections.<T>emptyList().iterator(); // So we can assume that `iterator` is never null
       
   131       } catch(final Throwable t) {
       
   132         subscriber.onSubscribe(new Subscription() { // We need to make sure we signal onSubscribe before onError, obeying rule 1.9
       
   133           @Override public void cancel() {}
       
   134           @Override public void request(long n) {}
       
   135         });
       
   136         terminateDueTo(t); // Here we send onError, obeying rule 1.09
       
   137       }
       
   138 
       
   139       if (!cancelled) {
       
   140         // Deal with setting up the subscription with the subscriber
       
   141         try {
       
   142           subscriber.onSubscribe(this);
       
   143         } catch(final Throwable t) { // Due diligence to obey 2.13
       
   144           terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t));
       
   145         }
       
   146 
       
   147         // Deal with already complete iterators promptly
       
   148         boolean hasElements = false;
       
   149         try {
       
   150           hasElements = iterator.hasNext();
       
   151         } catch(final Throwable t) {
       
   152           terminateDueTo(t); // If hasNext throws, there's something wrong and we need to signal onError as per 1.2, 1.4,
       
   153         }
       
   154 
       
   155         // If we don't have anything to deliver, we're already done, so lets do the right thing and
       
   156         // not wait for demand to deliver `onComplete` as per rule 1.2 and 1.3
       
   157         if (!hasElements) {
       
   158           try {
       
   159             doCancel(); // Rule 1.6 says we need to consider the `Subscription` cancelled when `onComplete` is signalled
       
   160             subscriber.onComplete();
       
   161           } catch(final Throwable t) { // As per rule 2.13, `onComplete` is not allowed to throw exceptions, so we do what we can, and log this.
       
   162             (new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t)).printStackTrace(System.err);
       
   163           }
       
   164         }
       
   165       }
       
   166     }
       
   167 
       
   168     // This is our behavior for producing elements downstream
       
   169     private void doSend() {
       
   170       try {
       
   171         // In order to play nice with the `Executor` we will only send at-most `batchSize` before
       
   172         // rescheduing ourselves and relinquishing the current thread.
       
   173         int leftInBatch = batchSize;
       
   174         do {
       
   175           T next;
       
   176           boolean hasNext;
       
   177           try {
       
   178             next = iterator.next(); // We have already checked `hasNext` when subscribing, so we can fall back to testing -after- `next` is called.
       
   179             hasNext = iterator.hasNext(); // Need to keep track of End-of-Stream
       
   180           } catch (final Throwable t) {
       
   181             terminateDueTo(t); // If `next` or `hasNext` throws (they can, since it is user-provided), we need to treat the stream as errored as per rule 1.4
       
   182             return;
       
   183           }
       
   184           subscriber.onNext(next); // Then we signal the next element downstream to the `Subscriber`
       
   185           if (!hasNext) { // If we are at End-of-Stream
       
   186             doCancel(); // We need to consider this `Subscription` as cancelled as per rule 1.6
       
   187             subscriber.onComplete(); // Then we signal `onComplete` as per rule 1.2 and 1.5
       
   188           }
       
   189         } while (!cancelled           // This makes sure that rule 1.8 is upheld, i.e. we need to stop signalling "eventually"
       
   190                  && --leftInBatch > 0 // This makes sure that we only send `batchSize` number of elements in one go (so we can yield to other Runnables)
       
   191                  && --demand > 0);    // This makes sure that rule 1.1 is upheld (sending more than was demanded)
       
   192 
       
   193         if (!cancelled && demand > 0) // If the `Subscription` is still alive and well, and we have demand to satisfy, we signal ourselves to send more data
       
   194           signal(Send.Instance);
       
   195       } catch(final Throwable t) {
       
   196         // We can only get here if `onNext` or `onComplete` threw, and they are not allowed to according to 2.13, so we can only cancel and log here.
       
   197         doCancel(); // Make sure that we are cancelled, since we cannot do anything else since the `Subscriber` is faulty.
       
   198         (new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext or onComplete.", t)).printStackTrace(System.err);
       
   199       }
       
   200     }
       
   201 
       
   202     // This is a helper method to ensure that we always `cancel` when we signal `onError` as per rule 1.6
       
   203     private void terminateDueTo(final Throwable t) {
       
   204       cancelled = true; // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6
       
   205       try {
       
   206         subscriber.onError(t); // Then we signal the error downstream, to the `Subscriber`
       
   207       } catch(final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.9, and all we can do is to log it.
       
   208         (new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
       
   209       }
       
   210     }
       
   211 
       
   212     // What `signal` does is that it sends signals to the `Subscription` asynchronously
       
   213     private void signal(final Signal signal) {
       
   214       if (inboundSignals.offer(signal)) // No need to null-check here as ConcurrentLinkedQueue does this for us
       
   215         tryScheduleToExecute(); // Then we try to schedule it for execution, if it isn't already
       
   216     }
       
   217 
       
   218     // This is the main "event loop" if you so will
       
   219     @Override public final void run() {
       
   220       if(on.get()) { // establishes a happens-before relationship with the end of the previous run
       
   221         try {
       
   222           final Signal s = inboundSignals.poll(); // We take a signal off the queue
       
   223           if (!cancelled) { // to make sure that we follow rule 1.8, 3.6 and 3.7
       
   224 
       
   225             // Below we simply unpack the `Signal`s and invoke the corresponding methods
       
   226             if (s instanceof Request)
       
   227               doRequest(((Request)s).n);
       
   228             else if (s == Send.Instance)
       
   229               doSend();
       
   230             else if (s == Cancel.Instance)
       
   231               doCancel();
       
   232             else if (s == Subscribe.Instance)
       
   233               doSubscribe();
       
   234           }
       
   235         } finally {
       
   236           on.set(false); // establishes a happens-before relationship with the beginning of the next run
       
   237           if(!inboundSignals.isEmpty()) // If we still have signals to process
       
   238             tryScheduleToExecute(); // Then we try to schedule ourselves to execute again
       
   239         }
       
   240       }
       
   241     }
       
   242 
       
   243     // This method makes sure that this `Subscription` is only running on one Thread at a time,
       
   244     // this is important to make sure that we follow rule 1.3
       
   245     private final void tryScheduleToExecute() {
       
   246       if(on.compareAndSet(false, true)) {
       
   247         try {
       
   248           executor.execute(this);
       
   249         } catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully
       
   250           if (!cancelled) {
       
   251             doCancel(); // First of all, this failure is not recoverable, so we need to follow rule 1.4 and 1.6
       
   252             try {
       
   253               terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t));
       
   254             } finally {
       
   255               inboundSignals.clear(); // We're not going to need these anymore
       
   256               // This subscription is cancelled by now, but letting it become schedulable again means
       
   257               // that we can drain the inboundSignals queue if anything arrives after clearing
       
   258               on.set(false);
       
   259             }
       
   260           }
       
   261         }
       
   262       }
       
   263     }
       
   264 
       
   265     // Our implementation of `Subscription.request` sends a signal to the Subscription that more elements are in demand
       
   266     @Override public void request(final long n) {
       
   267       signal(new Request(n));
       
   268     }
       
   269     // Our implementation of `Subscription.cancel` sends a signal to the Subscription that the `Subscriber` is not interested in any more elements
       
   270     @Override public void cancel() {
       
   271       signal(Cancel.Instance);
       
   272     }
       
   273     // The reason for the `init` method is that we want to ensure the `SubscriptionImpl`
       
   274     // is completely constructed before it is exposed to the thread pool, therefor this
       
   275     // method is only intended to be invoked once, and immediately after the constructor has
       
   276     // finished.
       
   277     void init() {
       
   278       signal(Subscribe.Instance);
       
   279     }
       
   280   };
       
   281 }