test/jdk/java/net/httpclient/reactivestreams-tck/org/reactivestreams/example/unicast/AsyncSubscriber.java
changeset 55546 3ae57bbf9585
equal deleted inserted replaced
55545:8a153a932d0f 55546:3ae57bbf9585
       
     1 /*
       
     2  * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 package org.reactivestreams.example.unicast;
       
    25 
       
    26 import org.reactivestreams.Subscriber;
       
    27 import org.reactivestreams.Subscription;
       
    28 
       
    29 import java.util.concurrent.Executor;
       
    30 import java.util.concurrent.atomic.AtomicBoolean;
       
    31 import java.util.concurrent.ConcurrentLinkedQueue;
       
    32 
       
    33 /**
       
    34  * AsyncSubscriber is an implementation of Reactive Streams `Subscriber`,
       
    35  * it runs asynchronously (on an Executor), requests one element
       
    36  * at a time, and invokes a user-defined method to process each element.
       
    37  *
       
    38  * NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden.
       
    39  */
       
    40 public abstract class AsyncSubscriber<T> implements Subscriber<T>, Runnable {
       
    41 
       
    42   // Signal represents the asynchronous protocol between the Publisher and Subscriber
       
    43   private static interface Signal {}
       
    44 
       
    45   private enum OnComplete implements Signal { Instance; }
       
    46 
       
    47   private static class OnError implements Signal {
       
    48     public final Throwable error;
       
    49     public OnError(final Throwable error) { this.error = error; }
       
    50   }
       
    51 
       
    52   private static class OnNext<T> implements Signal {
       
    53     public final T next;
       
    54     public OnNext(final T next) { this.next = next; }
       
    55   }
       
    56 
       
    57   private static class OnSubscribe implements Signal {
       
    58     public final Subscription subscription;
       
    59     public OnSubscribe(final Subscription subscription) { this.subscription = subscription; }
       
    60   }
       
    61 
       
    62   private Subscription subscription; // Obeying rule 3.1, we make this private!
       
    63   private boolean done; // It's useful to keep track of whether this Subscriber is done or not
       
    64   private final Executor executor; // This is the Executor we'll use to be asynchronous, obeying rule 2.2
       
    65 
       
    66   // Only one constructor, and it's only accessible for the subclasses
       
    67   protected AsyncSubscriber(Executor executor) {
       
    68     if (executor == null) throw null;
       
    69     this.executor = executor;
       
    70   }
       
    71 
       
    72   // Showcases a convenience method to idempotently marking the Subscriber as "done", so we don't want to process more elements
       
    73   // herefor we also need to cancel our `Subscription`.
       
    74   private final void done() {
       
    75     //On this line we could add a guard against `!done`, but since rule 3.7 says that `Subscription.cancel()` is idempotent, we don't need to.
       
    76     done = true; // If `whenNext` throws an exception, let's consider ourselves done (not accepting more elements)
       
    77     if (subscription != null) { // If we are bailing out before we got a `Subscription` there's little need for cancelling it.
       
    78       try {
       
    79         subscription.cancel(); // Cancel the subscription
       
    80       } catch(final Throwable t) {
       
    81         //Subscription.cancel is not allowed to throw an exception, according to rule 3.15
       
    82         (new IllegalStateException(subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
       
    83       }
       
    84     }
       
    85   }
       
    86 
       
    87   // This method is invoked when the OnNext signals arrive
       
    88   // Returns whether more elements are desired or not, and if no more elements are desired,
       
    89   // for convenience.
       
    90   protected abstract boolean whenNext(final T element);
       
    91 
       
    92   // This method is invoked when the OnComplete signal arrives
       
    93   // override this method to implement your own custom onComplete logic.
       
    94   protected void whenComplete() { }
       
    95 
       
    96   // This method is invoked if the OnError signal arrives
       
    97   // override this method to implement your own custom onError logic.
       
    98   protected void whenError(Throwable error) { }
       
    99 
       
   100   private final void handleOnSubscribe(final Subscription s) {
       
   101     if (s == null) {
       
   102       // Getting a null `Subscription` here is not valid so lets just ignore it.
       
   103     } else if (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully
       
   104       try {
       
   105         s.cancel(); // Cancel the additional subscription to follow rule 2.5
       
   106       } catch(final Throwable t) {
       
   107         //Subscription.cancel is not allowed to throw an exception, according to rule 3.15
       
   108         (new IllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
       
   109       }
       
   110     } else {
       
   111       // We have to assign it locally before we use it, if we want to be a synchronous `Subscriber`
       
   112       // Because according to rule 3.10, the Subscription is allowed to call `onNext` synchronously from within `request`
       
   113       subscription = s;
       
   114       try {
       
   115         // If we want elements, according to rule 2.1 we need to call `request`
       
   116         // And, according to rule 3.2 we are allowed to call this synchronously from within the `onSubscribe` method
       
   117         s.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
       
   118       } catch(final Throwable t) {
       
   119         // Subscription.request is not allowed to throw according to rule 3.16
       
   120         (new IllegalStateException(s + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err);
       
   121       }
       
   122     }
       
   123   }
       
   124 
       
   125   private final void handleOnNext(final T element) {
       
   126     if (!done) { // If we aren't already done
       
   127       if(subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
       
   128         // Check for spec violation of 2.1 and 1.09
       
   129         (new IllegalStateException("Someone violated the Reactive Streams rule 1.09 and 2.1 by signalling OnNext before `Subscription.request`. (no Subscription)")).printStackTrace(System.err);
       
   130       } else {
       
   131         try {
       
   132           if (whenNext(element)) {
       
   133             try {
       
   134               subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
       
   135             } catch(final Throwable t) {
       
   136               // Subscription.request is not allowed to throw according to rule 3.16
       
   137               (new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err);
       
   138             }
       
   139           } else {
       
   140             done(); // This is legal according to rule 2.6
       
   141           }
       
   142         } catch(final Throwable t) {
       
   143           done();
       
   144           try {
       
   145             onError(t);
       
   146           } catch(final Throwable t2) {
       
   147             //Subscriber.onError is not allowed to throw an exception, according to rule 2.13
       
   148             (new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
       
   149           }
       
   150         }
       
   151       }
       
   152     }
       
   153   }
       
   154 
       
   155   // Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
       
   156   private void handleOnComplete() {
       
   157     if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
       
   158       // Publisher is not allowed to signal onComplete before onSubscribe according to rule 1.09
       
   159       (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err);
       
   160     } else {
       
   161       done = true; // Obey rule 2.4
       
   162       whenComplete();
       
   163     }
       
   164   }
       
   165 
       
   166   // Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
       
   167   private void handleOnError(final Throwable error) {
       
   168     if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
       
   169       // Publisher is not allowed to signal onError before onSubscribe according to rule 1.09
       
   170       (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err);
       
   171     } else {
       
   172       done = true; // Obey rule 2.4
       
   173       whenError(error);
       
   174     }
       
   175   }
       
   176 
       
   177   // We implement the OnX methods on `Subscriber` to send Signals that we will process asycnhronously, but only one at a time
       
   178 
       
   179   @Override public final void onSubscribe(final Subscription s) {
       
   180     // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
       
   181     if (s == null) throw null;
       
   182 
       
   183     signal(new OnSubscribe(s));
       
   184   }
       
   185 
       
   186   @Override public final void onNext(final T element) {
       
   187     // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
       
   188     if (element == null) throw null;
       
   189 
       
   190     signal(new OnNext<T>(element));
       
   191   }
       
   192 
       
   193   @Override public final void onError(final Throwable t) {
       
   194     // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
       
   195     if (t == null) throw null;
       
   196 
       
   197     signal(new OnError(t));
       
   198   }
       
   199 
       
   200   @Override public final void onComplete() {
       
   201      signal(OnComplete.Instance);
       
   202   }
       
   203 
       
   204   // This `ConcurrentLinkedQueue` will track signals that are sent to this `Subscriber`, like `OnComplete` and `OnNext` ,
       
   205   // and obeying rule 2.11
       
   206   private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<Signal>();
       
   207 
       
   208   // We are using this `AtomicBoolean` to make sure that this `Subscriber` doesn't run concurrently with itself,
       
   209   // obeying rule 2.7 and 2.11
       
   210   private final AtomicBoolean on = new AtomicBoolean(false);
       
   211 
       
   212    @SuppressWarnings("unchecked")
       
   213    @Override public final void run() {
       
   214     if(on.get()) { // establishes a happens-before relationship with the end of the previous run
       
   215       try {
       
   216         final Signal s = inboundSignals.poll(); // We take a signal off the queue
       
   217         if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
       
   218           // Below we simply unpack the `Signal`s and invoke the corresponding methods
       
   219           if (s instanceof OnNext<?>)
       
   220             handleOnNext(((OnNext<T>)s).next);
       
   221           else if (s instanceof OnSubscribe)
       
   222             handleOnSubscribe(((OnSubscribe)s).subscription);
       
   223           else if (s instanceof OnError) // We are always able to handle OnError, obeying rule 2.10
       
   224             handleOnError(((OnError)s).error);
       
   225           else if (s == OnComplete.Instance) // We are always able to handle OnComplete, obeying rule 2.9
       
   226             handleOnComplete();
       
   227         }
       
   228       } finally {
       
   229         on.set(false); // establishes a happens-before relationship with the beginning of the next run
       
   230         if(!inboundSignals.isEmpty()) // If we still have signals to process
       
   231           tryScheduleToExecute(); // Then we try to schedule ourselves to execute again
       
   232       }
       
   233     }
       
   234   }
       
   235 
       
   236   // What `signal` does is that it sends signals to the `Subscription` asynchronously
       
   237   private void signal(final Signal signal) {
       
   238     if (inboundSignals.offer(signal)) // No need to null-check here as ConcurrentLinkedQueue does this for us
       
   239       tryScheduleToExecute(); // Then we try to schedule it for execution, if it isn't already
       
   240   }
       
   241 
       
   242   // This method makes sure that this `Subscriber` is only executing on one Thread at a time
       
   243   private final void tryScheduleToExecute() {
       
   244     if(on.compareAndSet(false, true)) {
       
   245       try {
       
   246         executor.execute(this);
       
   247       } catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully and not violate rule 2.13
       
   248         if (!done) {
       
   249           try {
       
   250             done(); // First of all, this failure is not recoverable, so we need to cancel our subscription
       
   251           } finally {
       
   252             inboundSignals.clear(); // We're not going to need these anymore
       
   253             // This subscription is cancelled by now, but letting the Subscriber become schedulable again means
       
   254             // that we can drain the inboundSignals queue if anything arrives after clearing
       
   255             on.set(false);
       
   256           }
       
   257         }
       
   258       }
       
   259     }
       
   260   }
       
   261 }