test/jdk/java/net/httpclient/reactivestreams-tck/org/reactivestreams/example/unicast/SyncSubscriber.java
author prappo
Tue, 02 Jul 2019 13:25:51 +0100
changeset 55546 3ae57bbf9585
permissions -rw-r--r--
8226602: Test convenience reactive primitives from java.net.http with RS TCK Reviewed-by: chegar, dfuchs
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
55546
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     1
/*
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     2
 * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     4
 *
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     7
 * published by the Free Software Foundation.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     8
 *
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
     9
 * This code is distributed in the hope that it will be useful, but WITHOUT
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    10
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    11
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    12
 * version 2 for more details (a copy is included in the LICENSE file that
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    13
 * accompanied this code).
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    14
 *
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    15
 * You should have received a copy of the GNU General Public License version
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    16
 * 2 along with this work; if not, write to the Free Software Foundation,
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    17
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    18
 *
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    19
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    20
 * or visit www.oracle.com if you need additional information or have any
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    21
 * questions.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    22
 */
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    23
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    24
package org.reactivestreams.example.unicast;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    25
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    26
import org.reactivestreams.Subscriber;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    27
import org.reactivestreams.Subscription;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    28
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    29
/**
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    30
 * SyncSubscriber is an implementation of Reactive Streams `Subscriber`,
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    31
 * it runs synchronously (on the Publisher's thread) and requests one element
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    32
 * at a time and invokes a user-defined method to process each element.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    33
 *
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    34
 * NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    35
 */
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    36
public abstract class SyncSubscriber<T> implements Subscriber<T> {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    37
  private Subscription subscription; // Obeying rule 3.1, we make this private!
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    38
  private boolean done = false;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    39
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    40
  @Override public void onSubscribe(final Subscription s) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    41
    // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    42
    if (s == null) throw null;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    43
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    44
    if (subscription != null) { // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    45
      try {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    46
        s.cancel(); // Cancel the additional subscription
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    47
      } catch(final Throwable t) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    48
        //Subscription.cancel is not allowed to throw an exception, according to rule 3.15
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    49
        (new IllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    50
      }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    51
    } else {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    52
      // We have to assign it locally before we use it, if we want to be a synchronous `Subscriber`
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    53
      // Because according to rule 3.10, the Subscription is allowed to call `onNext` synchronously from within `request`
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    54
      subscription = s;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    55
      try {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    56
        // If we want elements, according to rule 2.1 we need to call `request`
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    57
        // And, according to rule 3.2 we are allowed to call this synchronously from within the `onSubscribe` method
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    58
        s.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    59
      } catch(final Throwable t) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    60
        // Subscription.request is not allowed to throw according to rule 3.16
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    61
        (new IllegalStateException(s + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    62
      }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    63
    }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    64
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    65
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    66
  @Override public void onNext(final T element) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    67
    if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    68
      (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onNext prior to onSubscribe.")).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    69
    } else {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    70
      // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    71
      if (element == null) throw null;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    72
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    73
      if (!done) { // If we aren't already done
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    74
        try {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    75
          if (whenNext(element)) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    76
            try {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    77
              subscription.request(1); // Our Subscriber is unbuffered and modest, it requests one element at a time
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    78
            } catch (final Throwable t) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    79
              // Subscription.request is not allowed to throw according to rule 3.16
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    80
              (new IllegalStateException(subscription + " violated the Reactive Streams rule 3.16 by throwing an exception from request.", t)).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    81
            }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    82
          } else {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    83
            done();
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    84
          }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    85
        } catch (final Throwable t) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    86
          done();
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    87
          try {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    88
            onError(t);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    89
          } catch (final Throwable t2) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    90
            //Subscriber.onError is not allowed to throw an exception, according to rule 2.13
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    91
            (new IllegalStateException(this + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    92
          }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    93
        }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    94
      }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    95
    }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    96
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    97
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    98
  // Showcases a convenience method to idempotently marking the Subscriber as "done", so we don't want to process more elements
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
    99
  // herefor we also need to cancel our `Subscription`.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   100
  private void done() {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   101
    //On this line we could add a guard against `!done`, but since rule 3.7 says that `Subscription.cancel()` is idempotent, we don't need to.
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   102
    done = true; // If we `whenNext` throws an exception, let's consider ourselves done (not accepting more elements)
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   103
    try {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   104
      subscription.cancel(); // Cancel the subscription
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   105
    } catch(final Throwable t) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   106
      //Subscription.cancel is not allowed to throw an exception, according to rule 3.15
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   107
      (new IllegalStateException(subscription + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t)).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   108
    }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   109
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   110
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   111
  // This method is left as an exercise to the reader/extension point
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   112
  // Returns whether more elements are desired or not, and if no more elements are desired
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   113
  protected abstract boolean whenNext(final T element);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   114
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   115
  @Override public void onError(final Throwable t) {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   116
    if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   117
      (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onError prior to onSubscribe.")).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   118
    } else {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   119
      // As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   120
      if (t == null) throw null;
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   121
      // Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   122
      // And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   123
    }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   124
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   125
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   126
  @Override public void onComplete() {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   127
    if (subscription == null) { // Technically this check is not needed, since we are expecting Publishers to conform to the spec
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   128
      (new IllegalStateException("Publisher violated the Reactive Streams rule 1.09 signalling onComplete prior to onSubscribe.")).printStackTrace(System.err);
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   129
    } else {
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   130
      // Here we are not allowed to call any methods on the `Subscription` or the `Publisher`, as per rule 2.3
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   131
      // And anyway, the `Subscription` is considered to be cancelled if this method gets called, as per rule 2.4
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   132
    }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   133
  }
3ae57bbf9585 8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff changeset
   134
}