test/jdk/java/net/httpclient/reactivestreams-tck/org/reactivestreams/example/unicast/AsyncSubscriber.java
author prappo
Tue, 02 Jul 2019 13:25:51 +0100
changeset 55546 3ae57bbf9585
permissions -rw-r--r--
8226602: Test convenience reactive primitives from java.net.http with RS TCK Reviewed-by: chegar, dfuchs
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
55546
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     1
/*
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     2
 * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     4
 *
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     7
 * published by the Free Software Foundation.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     8
 *
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     9
 * This code is distributed in the hope that it will be useful, but WITHOUT
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    10
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    11
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    12
 * version 2 for more details (a copy is included in the LICENSE file that
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    13
 * accompanied this code).
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    14
 *
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    15
 * You should have received a copy of the GNU General Public License version
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    16
 * 2 along with this work; if not, write to the Free Software Foundation,
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    17
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    18
 *
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    19
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    20
 * or visit www.oracle.com if you need additional information or have any
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    21
 * questions.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    22
 */
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    23
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    24
package org.reactivestreams.example.unicast;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    25
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    26
import org.reactivestreams.Subscriber;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    27
import org.reactivestreams.Subscription;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    28
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    29
import java.util.concurrent.Executor;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    30
import java.util.concurrent.atomic.AtomicBoolean;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    31
import java.util.concurrent.ConcurrentLinkedQueue;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    32
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    33
/**
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    34
 * AsyncSubscriber is an implementation of Reactive Streams `Subscriber`,
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    35
 * it runs asynchronously (on an Executor), requests one element
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    36
 * at a time, and invokes a user-defined method to process each element.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    37
 *
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    38
 * NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    39
 */
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    40
public abstract class AsyncSubscriber<T> implements Subscriber<T>, Runnable {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    41
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    42
  // Signal represents the asynchronous protocol between the Publisher and Subscriber
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    43
  private static interface Signal {}
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    44
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    45
  private enum OnComplete implements Signal { Instance; }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    46
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    47
  private static class OnError implements Signal {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    48
    public final Throwable error;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    49
    public OnError(final Throwable error) { this.error = error; }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    50
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    51
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    52
  private static class OnNext<T> implements Signal {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    53
    public final T next;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    54
    public OnNext(final T next) { this.next = next; }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    55
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    56
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    57
  private static class OnSubscribe implements Signal {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    58
    public final Subscription subscription;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    59
    public OnSubscribe(final Subscription subscription) { this.subscription = subscription; }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    60
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    61
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    62
  private Subscription subscription; // Obeying rule 3.1, we make this private!
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    63
  private boolean done; // It's useful to keep track of whether this Subscriber is done or not
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    64
  private final Executor executor; // This is the Executor we'll use to be asynchronous, obeying rule 2.2
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    65
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    66
  // Only one constructor, and it's only accessible for the subclasses
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    67
  protected AsyncSubscriber(Executor executor) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    68
    if (executor == null) throw null;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    69
    this.executor = executor;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    70
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    71
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    72
  // Showcases a convenience method to idempotently marking the Subscriber as "done", so we don't want to process more elements
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    73
  // herefor we also need to cancel our `Subscription`.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    74
  private final void done() {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    75
    //On this line we could add a guard against `!done`, but since rule 3.7 says that `Subscription.cancel()` is idempotent, we don't need to.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    76
    done = true; // If `whenNext` throws an exception, let's consider ourselves done (not accepting more elements)
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    77
    if (subscription != null) { // If we are bailing out before we got a `Subscription` there's little need for cancelling it.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    78
      try {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    79
        subscription.cancel(); // Cancel the subscription
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    80
      } catch(final Throwable t) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    81
        //Subscription.cancel is not allowed to throw an exception, according to rule 3.15
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    82
        (new IllegalStateException(subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    83
      }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    84
    }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    85
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    86
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    87
  // This method is invoked when the OnNext signals arrive
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    88
  // Returns whether more elements are desired or not, and if no more elements are desired,
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    89
  // for convenience.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    90
  protected abstract boolean whenNext(final T element);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    91
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    92
  // This method is invoked when the OnComplete signal arrives
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    93
  // override this method to implement your own custom onComplete logic.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    94
  protected void whenComplete() { }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    95
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    96
  // This method is invoked if the OnError signal arrives
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    97
  // override this method to implement your own custom onError logic.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    98
  protected void whenError(Throwable error) { }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    99
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   100
  private final void handleOnSubscribe(final Subscription s) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   101
    if (s == null) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   102
      // Getting a null `Subscription` here is not valid so lets just ignore it.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   103
    } else if (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   104
      try {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   105
        s.cancel(); // Cancel the additional subscription to follow rule 2.5
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   106
      } catch(final Throwable t) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   107
        //Subscription.cancel is not allowed to throw an exception, according to rule 3.15
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   108
        (new IllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   109
      }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   110
    } else {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   111
      // We have to assign it locally before we use it, if we want to be a synchronous `Subscriber`
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   112
      // Because according to rule 3.10, the Subscription is allowed to call `onNext` synchronously from within `request`
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   113
      subscription = s;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   114
      try {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   115
        // If we want elements, according to rule 2.1 we need to call `request`
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   116
        // And, according to rule 3.2 we are allowed to call this synchronously from within the `onSubscribe` method
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   117
        s.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   118
      } catch(final Throwable t) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   119
        // Subscription.request is not allowed to throw according to rule 3.16
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   120
        (new IllegalStateException(s + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   121
      }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   122
    }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   123
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   124
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   125
  private final void handleOnNext(final T element) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   126
    if (!done) { // If we aren't already done
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   127
      if(subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   128
        // Check for spec violation of 2.1 and 1.09
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   129
        (new IllegalStateException("Someone violated the Reactive Streams rule 1.09 and 2.1 by signalling OnNext before `Subscription.request`. (no Subscription)")).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   130
      } else {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   131
        try {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   132
          if (whenNext(element)) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   133
            try {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   134
              subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   135
            } catch(final Throwable t) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   136
              // Subscription.request is not allowed to throw according to rule 3.16
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   137
              (new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   138
            }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   139
          } else {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   140
            done(); // This is legal according to rule 2.6
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   141
          }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   142
        } catch(final Throwable t) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   143
          done();
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   144
          try {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   145
            onError(t);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   146
          } catch(final Throwable t2) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   147
            //Subscriber.onError is not allowed to throw an exception, according to rule 2.13
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   148
            (new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   149
          }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   150
        }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   151
      }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   152
    }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   153
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   154
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   155
  // Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   156
  private void handleOnComplete() {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   157
    if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   158
      // Publisher is not allowed to signal onComplete before onSubscribe according to rule 1.09
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   159
      (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   160
    } else {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   161
      done = true; // Obey rule 2.4
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   162
      whenComplete();
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   163
    }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   164
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   165
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   166
  // Here it is important that we do not violate 2.2 and 2.3 by calling methods on the `Subscription` or `Publisher`
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   167
  private void handleOnError(final Throwable error) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   168
    if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   169
      // Publisher is not allowed to signal onError before onSubscribe according to rule 1.09
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   170
      (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   171
    } else {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   172
      done = true; // Obey rule 2.4
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   173
      whenError(error);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   174
    }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   175
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   176
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   177
  // We implement the OnX methods on `Subscriber` to send Signals that we will process asycnhronously, but only one at a time
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   178
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   179
  @Override public final void onSubscribe(final Subscription s) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   180
    // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   181
    if (s == null) throw null;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   182
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   183
    signal(new OnSubscribe(s));
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   184
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   185
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   186
  @Override public final void onNext(final T element) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   187
    // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   188
    if (element == null) throw null;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   189
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   190
    signal(new OnNext<T>(element));
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   191
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   192
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   193
  @Override public final void onError(final Throwable t) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   194
    // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   195
    if (t == null) throw null;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   196
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   197
    signal(new OnError(t));
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   198
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   199
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   200
  @Override public final void onComplete() {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   201
     signal(OnComplete.Instance);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   202
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   203
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   204
  // This `ConcurrentLinkedQueue` will track signals that are sent to this `Subscriber`, like `OnComplete` and `OnNext` ,
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   205
  // and obeying rule 2.11
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   206
  private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<Signal>();
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   207
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   208
  // We are using this `AtomicBoolean` to make sure that this `Subscriber` doesn't run concurrently with itself,
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   209
  // obeying rule 2.7 and 2.11
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   210
  private final AtomicBoolean on = new AtomicBoolean(false);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   211
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   212
   @SuppressWarnings("unchecked")
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   213
   @Override public final void run() {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   214
    if(on.get()) { // establishes a happens-before relationship with the end of the previous run
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   215
      try {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   216
        final Signal s = inboundSignals.poll(); // We take a signal off the queue
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   217
        if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   218
          // Below we simply unpack the `Signal`s and invoke the corresponding methods
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   219
          if (s instanceof OnNext<?>)
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   220
            handleOnNext(((OnNext<T>)s).next);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   221
          else if (s instanceof OnSubscribe)
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   222
            handleOnSubscribe(((OnSubscribe)s).subscription);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   223
          else if (s instanceof OnError) // We are always able to handle OnError, obeying rule 2.10
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   224
            handleOnError(((OnError)s).error);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   225
          else if (s == OnComplete.Instance) // We are always able to handle OnComplete, obeying rule 2.9
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   226
            handleOnComplete();
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   227
        }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   228
      } finally {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   229
        on.set(false); // establishes a happens-before relationship with the beginning of the next run
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   230
        if(!inboundSignals.isEmpty()) // If we still have signals to process
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   231
          tryScheduleToExecute(); // Then we try to schedule ourselves to execute again
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   232
      }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   233
    }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   234
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   235
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   236
  // What `signal` does is that it sends signals to the `Subscription` asynchronously
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   237
  private void signal(final Signal signal) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   238
    if (inboundSignals.offer(signal)) // No need to null-check here as ConcurrentLinkedQueue does this for us
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   239
      tryScheduleToExecute(); // Then we try to schedule it for execution, if it isn't already
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   240
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   241
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   242
  // This method makes sure that this `Subscriber` is only executing on one Thread at a time
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   243
  private final void tryScheduleToExecute() {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   244
    if(on.compareAndSet(false, true)) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   245
      try {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   246
        executor.execute(this);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   247
      } catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully and not violate rule 2.13
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   248
        if (!done) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   249
          try {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   250
            done(); // First of all, this failure is not recoverable, so we need to cancel our subscription
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   251
          } finally {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   252
            inboundSignals.clear(); // We're not going to need these anymore
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   253
            // This subscription is cancelled by now, but letting the Subscriber become schedulable again means
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   254
            // that we can drain the inboundSignals queue if anything arrives after clearing
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   255
            on.set(false);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   256
          }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   257
        }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   258
      }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   259
    }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   260
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   261
}