test/jdk/java/net/httpclient/reactivestreams-tck/org/reactivestreams/tck/SubscriberBlackboxVerification.java
changeset 55546 3ae57bbf9585
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/reactivestreams-tck/org/reactivestreams/tck/SubscriberBlackboxVerification.java	Tue Jul 02 13:25:51 2019 +0100
@@ -0,0 +1,516 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package org.reactivestreams.tck;
+
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.reactivestreams.tck.TestEnvironment.ManualPublisher;
+import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
+import org.reactivestreams.tck.flow.support.Optional;
+import org.reactivestreams.tck.flow.support.SubscriberBlackboxVerificationRules;
+import org.reactivestreams.tck.flow.support.TestException;
+import org.testng.SkipException;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription}
+ * specification rules, without any modifications to the tested implementation (also known as "Black Box" testing).
+ *
+ * This verification is NOT able to check many of the rules of the spec, and if you want more
+ * verification of your implementation you'll have to implement {@code org.reactivestreams.tck.SubscriberWhiteboxVerification}
+ * instead.
+ *
+ * @see org.reactivestreams.Subscriber
+ * @see org.reactivestreams.Subscription
+ */
+public abstract class SubscriberBlackboxVerification<T> extends WithHelperPublisher<T>
+  implements SubscriberBlackboxVerificationRules {
+
+  protected final TestEnvironment env;
+
+  protected SubscriberBlackboxVerification(TestEnvironment env) {
+    this.env = env;
+  }
+
+  // USER API
+
+  /**
+   * This is the main method you must implement in your test incarnation.
+   * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
+   */
+  public abstract Subscriber<T> createSubscriber();
+
+  /**
+   * Override this method if the Subscriber implementation you are verifying
+   * needs an external signal before it signals demand to its Publisher.
+   *
+   * By default this method does nothing.
+   */
+  public void triggerRequest(final Subscriber<? super T> subscriber) {
+    // this method is intentionally left blank
+  }
+
+  // ENV SETUP
+
+  /**
+   * Executor service used by the default provided asynchronous Publisher.
+   * @see #createHelperPublisher(long)
+   */
+  private ExecutorService publisherExecutor;
+  @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
+  @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
+  @Override public ExecutorService publisherExecutorService() { return publisherExecutor; }
+
+  ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
+
+  @BeforeMethod
+  public void setUp() throws Exception {
+    env.clearAsyncErrors();
+  }
+
+  ////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
+
+  @Override @Test
+  public void required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest() throws Throwable {
+    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
+      @Override
+      public void run(BlackboxTestStage stage) throws InterruptedException {
+        triggerRequest(stage.subProxy().sub());
+        final long requested = stage.expectRequest();// assuming subscriber wants to consume elements...
+        final long signalsToEmit = Math.min(requested, 512); // protecting against Subscriber which sends ridiculous large demand
+
+        // should cope with up to requested number of elements
+        for (int i = 0; i < signalsToEmit && sampleIsCancelled(stage, i, 10); i++)
+          stage.signalNext();
+
+        // we complete after `signalsToEmit` (which can be less than `requested`),
+        // which is legal under https://github.com/reactive-streams/reactive-streams-jvm#1.2
+        stage.sendCompletion();
+      }
+
+      /**
+       * In order to allow some "skid" and not check state on each iteration,
+       * only check {@code stage.isCancelled} every {@code checkInterval}'th iteration.
+       */
+      private boolean sampleIsCancelled(BlackboxTestStage stage, int i, int checkInterval) throws InterruptedException {
+        if (i % checkInterval == 0) return stage.isCancelled();
+        else return false;
+      }
+    });
+  }
+
+  @Override @Test
+  public void untested_spec202_blackbox_shouldAsynchronouslyDispatch() throws Exception {
+    notVerified(); // cannot be meaningfully tested, or can it?
+  }
+
+  @Override @Test
+  public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
+    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
+      @Override
+      public void run(BlackboxTestStage stage) throws Throwable {
+        final Subscription subs = new Subscription() {
+          @Override
+          public void request(long n) {
+            final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace("onComplete");
+            if (onCompleteStackTraceElement.isDefined()) {
+              final StackTraceElement stackElem = onCompleteStackTraceElement.get();
+              env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
+                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
+            }
+          }
+
+          @Override
+          public void cancel() {
+            final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace("onComplete");
+            if (onCompleteStackElement.isDefined()) {
+              final StackTraceElement stackElem = onCompleteStackElement.get();
+              env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
+                                     stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
+            }
+          }
+        };
+
+        final Subscriber<T> sub = createSubscriber();
+        sub.onSubscribe(subs);
+        sub.onComplete();
+
+        env.verifyNoAsyncErrorsNoDelay();
+      }
+    });
+  }
+
+  @Override @Test
+  public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
+    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
+      @Override
+      public void run(BlackboxTestStage stage) throws Throwable {
+        final Subscription subs = new Subscription() {
+          @Override
+          public void request(long n) {
+            Throwable thr = new Throwable();
+            for (StackTraceElement stackElem : thr.getStackTrace()) {
+              if (stackElem.getMethodName().equals("onError")) {
+                env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
+                                       stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
+              }
+            }
+          }
+
+          @Override
+          public void cancel() {
+            Throwable thr = new Throwable();
+            for (StackTraceElement stackElem : thr.getStackTrace()) {
+              if (stackElem.getMethodName().equals("onError")) {
+                env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
+                                       stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
+              }
+            }
+          }
+        };
+
+        final Subscriber<T> sub = createSubscriber();
+        sub.onSubscribe(subs);
+        sub.onError(new TestException());
+
+        env.verifyNoAsyncErrorsNoDelay();
+      }
+    });
+  }
+
+  @Override @Test
+  public void untested_spec204_blackbox_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
+    notVerified(); // cannot be meaningfully tested, or can it?
+  }
+
+  @Override @Test
+  public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
+    new BlackboxTestStage(env) {{
+      // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
+      final TestEnvironment.Latch secondSubscriptionCancelled = new TestEnvironment.Latch(env);
+      sub().onSubscribe(
+          new Subscription() {
+            @Override
+            public void request(long elements) {
+              env.flop(String.format("Subscriber %s illegally called `subscription.request(%s)`!", sub(), elements));
+            }
+
+            @Override
+            public void cancel() {
+              secondSubscriptionCancelled.close();
+            }
+
+            @Override
+            public String toString() {
+              return "SecondSubscription(should get cancelled)";
+            }
+          });
+
+      secondSubscriptionCancelled.expectClose("Expected SecondSubscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called.");
+      env.verifyNoAsyncErrorsNoDelay();
+      sendCompletion(); // we're done, complete the subscriber under test
+    }};
+  }
+
+  @Override @Test
+  public void untested_spec206_blackbox_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
+    notVerified(); // cannot be meaningfully tested, or can it?
+  }
+
+  @Override @Test
+  public void untested_spec207_blackbox_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
+    notVerified(); // cannot be meaningfully tested, or can it?
+    // the same thread part of the clause can be verified but that is not very useful, or is it?
+  }
+
+  @Override @Test
+  public void untested_spec208_blackbox_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
+    notVerified(); // cannot be meaningfully tested as black box, or can it?
+  }
+
+  @Override @Test
+  public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
+    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
+      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+      public void run(BlackboxTestStage stage) throws Throwable {
+        triggerRequest(stage.subProxy().sub());
+        final long notUsed = stage.expectRequest(); // received request signal
+        stage.sub().onComplete();
+        stage.subProxy().expectCompletion();
+      }
+    });
+  }
+
+  @Override @Test
+  public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
+    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
+      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+      public void run(BlackboxTestStage stage) throws Throwable {
+        final Subscriber<? super T> sub = stage.sub();
+        sub.onComplete();
+        stage.subProxy().expectCompletion();
+      }
+    });
+  }
+
+  @Override @Test
+  public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
+    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
+      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+      public void run(BlackboxTestStage stage) throws Throwable {
+        triggerRequest(stage.subProxy().sub());
+        final long notUsed = stage.expectRequest(); // received request signal
+        stage.sub().onError(new TestException()); // in response to that, we fail
+        stage.subProxy().expectError(Throwable.class);
+      }
+    });
+  }
+
+  // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
+  @Override @Test
+  public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
+    blackboxSubscriberTest(new BlackboxTestStageTestRun() {
+      @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+      public void run(BlackboxTestStage stage) throws Throwable {
+
+        stage.sub().onError(new TestException());
+        stage.subProxy().expectError(Throwable.class);
+      }
+    });
+  }
+
+  @Override @Test
+  public void untested_spec211_blackbox_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
+    notVerified(); // cannot be meaningfully tested, or can it?
+  }
+
+  @Override @Test
+  public void untested_spec212_blackbox_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable {
+    notVerified(); // cannot be meaningfully tested as black box, or can it?
+  }
+
+  @Override @Test
+  public void untested_spec213_blackbox_failingOnSignalInvocation() throws Exception {
+    notVerified(); // cannot be meaningfully tested, or can it?
+  }
+
+  @Override @Test
+  public void required_spec213_blackbox_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
+    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
+      @Override
+      public void run(BlackboxTestStage stage) throws Throwable {
+
+        {
+          final Subscriber<T> sub = createSubscriber();
+          boolean gotNPE = false;
+          try {
+            sub.onSubscribe(null);
+          } catch(final NullPointerException expected) {
+            gotNPE = true;
+          }
+          assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
+        }
+
+        env.verifyNoAsyncErrorsNoDelay();
+      }
+    });
+  }
+
+  @Override @Test
+  public void required_spec213_blackbox_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
+    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
+      @Override
+      public void run(BlackboxTestStage stage) throws Throwable {
+        final Subscription subscription = new Subscription() {
+          @Override public void request(final long elements) {}
+          @Override public void cancel() {}
+        };
+
+        {
+          final Subscriber<T> sub = createSubscriber();
+          boolean gotNPE = false;
+          sub.onSubscribe(subscription);
+          try {
+            sub.onNext(null);
+          } catch(final NullPointerException expected) {
+            gotNPE = true;
+          }
+          assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
+        }
+
+        env.verifyNoAsyncErrorsNoDelay();
+      }
+    });
+  }
+
+  @Override @Test
+  public void required_spec213_blackbox_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
+    blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
+      @Override
+      public void run(BlackboxTestStage stage) throws Throwable {
+        final Subscription subscription = new Subscription() {
+          @Override public void request(final long elements) {}
+          @Override public void cancel() {}
+        };
+
+        {
+          final Subscriber<T> sub = createSubscriber();
+          boolean gotNPE = false;
+          sub.onSubscribe(subscription);
+          try {
+            sub.onError(null);
+          } catch(final NullPointerException expected) {
+            gotNPE = true;
+          }
+          assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
+        }
+
+        env.verifyNoAsyncErrorsNoDelay();
+      }
+    });
+  }
+
+  ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
+
+  @Override @Test
+  public void untested_spec301_blackbox_mustNotBeCalledOutsideSubscriberContext() throws Exception {
+    notVerified(); // cannot be meaningfully tested, or can it?
+  }
+
+  @Override @Test
+  public void untested_spec308_blackbox_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
+    notVerified(); // cannot be meaningfully tested as black box, or can it?
+  }
+
+  @Override @Test
+  public void untested_spec310_blackbox_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
+    notVerified(); // cannot be meaningfully tested, or can it?
+  }
+
+  @Override @Test
+  public void untested_spec311_blackbox_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
+    notVerified(); // cannot be meaningfully tested, or can it?
+  }
+
+  @Override @Test
+  public void untested_spec314_blackbox_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
+    notVerified(); // cannot be meaningfully tested, or can it?
+  }
+
+  @Override @Test
+  public void untested_spec315_blackbox_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
+    notVerified(); // cannot be meaningfully tested, or can it?
+  }
+
+  @Override @Test
+  public void untested_spec316_blackbox_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
+    notVerified(); // cannot be meaningfully tested, or can it?
+  }
+
+  /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
+
+  /////////////////////// TEST INFRASTRUCTURE /////////////////////////////////
+
+  abstract class BlackboxTestStageTestRun {
+    public abstract void run(BlackboxTestStage stage) throws Throwable;
+  }
+
+  public void blackboxSubscriberTest(BlackboxTestStageTestRun body) throws Throwable {
+    BlackboxTestStage stage = new BlackboxTestStage(env, true);
+    body.run(stage);
+  }
+
+  public void blackboxSubscriberWithoutSetupTest(BlackboxTestStageTestRun body) throws Throwable {
+    BlackboxTestStage stage = new BlackboxTestStage(env, false);
+    body.run(stage);
+  }
+
+  public class BlackboxTestStage extends ManualPublisher<T> {
+    public Publisher<T> pub;
+    public ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
+
+    public T lastT = null;
+    private Optional<BlackboxSubscriberProxy<T>> subProxy = Optional.empty();
+
+    public BlackboxTestStage(TestEnvironment env) throws InterruptedException {
+      this(env, true);
+    }
+
+    public BlackboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException {
+      super(env);
+      if (runDefaultInit) {
+        pub = this.createHelperPublisher(Long.MAX_VALUE);
+        tees = env.newManualSubscriber(pub);
+        Subscriber<T> sub = createSubscriber();
+        subProxy = Optional.of(createBlackboxSubscriberProxy(env, sub));
+        subscribe(subProxy.get());
+      }
+    }
+
+    public Subscriber<? super T> sub() {
+      return subscriber.value();
+    }
+
+    /**
+     * Proxy for the {@link #sub()} {@code Subscriber}, providing certain assertions on methods being called on the Subscriber.
+     */
+    public BlackboxSubscriberProxy<T> subProxy() {
+      return subProxy.get();
+    }
+
+    public Publisher<T> createHelperPublisher(long elements) {
+      return SubscriberBlackboxVerification.this.createHelperPublisher(elements);
+    }
+
+    public BlackboxSubscriberProxy<T> createBlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> sub) {
+      return new BlackboxSubscriberProxy<T>(env, sub);
+    }
+
+    public T signalNext() throws InterruptedException {
+      T element = nextT();
+      sendNext(element);
+      return element;
+    }
+
+    public T nextT() throws InterruptedException {
+      lastT = tees.requestNextElement();
+      return lastT;
+    }
+
+  }
+
+  public void notVerified() {
+    throw new SkipException("Not verified using this TCK.");
+  }
+}