test/jdk/java/net/httpclient/reactivestreams-tck/org/reactivestreams/tck/SubscriberBlackboxVerification.java
changeset 55546 3ae57bbf9585
equal deleted inserted replaced
55545:8a153a932d0f 55546:3ae57bbf9585
       
     1 /*
       
     2  * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 package org.reactivestreams.tck;
       
    25 
       
    26 import org.reactivestreams.Publisher;
       
    27 import org.reactivestreams.Subscriber;
       
    28 import org.reactivestreams.Subscription;
       
    29 import org.reactivestreams.tck.TestEnvironment.ManualPublisher;
       
    30 import org.reactivestreams.tck.TestEnvironment.ManualSubscriber;
       
    31 import org.reactivestreams.tck.flow.support.Optional;
       
    32 import org.reactivestreams.tck.flow.support.SubscriberBlackboxVerificationRules;
       
    33 import org.reactivestreams.tck.flow.support.TestException;
       
    34 import org.testng.SkipException;
       
    35 import org.testng.annotations.AfterClass;
       
    36 import org.testng.annotations.BeforeClass;
       
    37 import org.testng.annotations.BeforeMethod;
       
    38 import org.testng.annotations.Test;
       
    39 
       
    40 import java.util.concurrent.ExecutorService;
       
    41 import java.util.concurrent.Executors;
       
    42 
       
    43 import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy;
       
    44 import static org.testng.Assert.assertTrue;
       
    45 
       
    46 /**
       
    47  * Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription}
       
    48  * specification rules, without any modifications to the tested implementation (also known as "Black Box" testing).
       
    49  *
       
    50  * This verification is NOT able to check many of the rules of the spec, and if you want more
       
    51  * verification of your implementation you'll have to implement {@code org.reactivestreams.tck.SubscriberWhiteboxVerification}
       
    52  * instead.
       
    53  *
       
    54  * @see org.reactivestreams.Subscriber
       
    55  * @see org.reactivestreams.Subscription
       
    56  */
       
    57 public abstract class SubscriberBlackboxVerification<T> extends WithHelperPublisher<T>
       
    58   implements SubscriberBlackboxVerificationRules {
       
    59 
       
    60   protected final TestEnvironment env;
       
    61 
       
    62   protected SubscriberBlackboxVerification(TestEnvironment env) {
       
    63     this.env = env;
       
    64   }
       
    65 
       
    66   // USER API
       
    67 
       
    68   /**
       
    69    * This is the main method you must implement in your test incarnation.
       
    70    * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic.
       
    71    */
       
    72   public abstract Subscriber<T> createSubscriber();
       
    73 
       
    74   /**
       
    75    * Override this method if the Subscriber implementation you are verifying
       
    76    * needs an external signal before it signals demand to its Publisher.
       
    77    *
       
    78    * By default this method does nothing.
       
    79    */
       
    80   public void triggerRequest(final Subscriber<? super T> subscriber) {
       
    81     // this method is intentionally left blank
       
    82   }
       
    83 
       
    84   // ENV SETUP
       
    85 
       
    86   /**
       
    87    * Executor service used by the default provided asynchronous Publisher.
       
    88    * @see #createHelperPublisher(long)
       
    89    */
       
    90   private ExecutorService publisherExecutor;
       
    91   @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); }
       
    92   @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); }
       
    93   @Override public ExecutorService publisherExecutorService() { return publisherExecutor; }
       
    94 
       
    95   ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
       
    96 
       
    97   @BeforeMethod
       
    98   public void setUp() throws Exception {
       
    99     env.clearAsyncErrors();
       
   100   }
       
   101 
       
   102   ////////////////////// SPEC RULE VERIFICATION ///////////////////////////////
       
   103 
       
   104   @Override @Test
       
   105   public void required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest() throws Throwable {
       
   106     blackboxSubscriberTest(new BlackboxTestStageTestRun() {
       
   107       @Override
       
   108       public void run(BlackboxTestStage stage) throws InterruptedException {
       
   109         triggerRequest(stage.subProxy().sub());
       
   110         final long requested = stage.expectRequest();// assuming subscriber wants to consume elements...
       
   111         final long signalsToEmit = Math.min(requested, 512); // protecting against Subscriber which sends ridiculous large demand
       
   112 
       
   113         // should cope with up to requested number of elements
       
   114         for (int i = 0; i < signalsToEmit && sampleIsCancelled(stage, i, 10); i++)
       
   115           stage.signalNext();
       
   116 
       
   117         // we complete after `signalsToEmit` (which can be less than `requested`),
       
   118         // which is legal under https://github.com/reactive-streams/reactive-streams-jvm#1.2
       
   119         stage.sendCompletion();
       
   120       }
       
   121 
       
   122       /**
       
   123        * In order to allow some "skid" and not check state on each iteration,
       
   124        * only check {@code stage.isCancelled} every {@code checkInterval}'th iteration.
       
   125        */
       
   126       private boolean sampleIsCancelled(BlackboxTestStage stage, int i, int checkInterval) throws InterruptedException {
       
   127         if (i % checkInterval == 0) return stage.isCancelled();
       
   128         else return false;
       
   129       }
       
   130     });
       
   131   }
       
   132 
       
   133   @Override @Test
       
   134   public void untested_spec202_blackbox_shouldAsynchronouslyDispatch() throws Exception {
       
   135     notVerified(); // cannot be meaningfully tested, or can it?
       
   136   }
       
   137 
       
   138   @Override @Test
       
   139   public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable {
       
   140     blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
       
   141       @Override
       
   142       public void run(BlackboxTestStage stage) throws Throwable {
       
   143         final Subscription subs = new Subscription() {
       
   144           @Override
       
   145           public void request(long n) {
       
   146             final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace("onComplete");
       
   147             if (onCompleteStackTraceElement.isDefined()) {
       
   148               final StackTraceElement stackElem = onCompleteStackTraceElement.get();
       
   149               env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
       
   150                                      stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
       
   151             }
       
   152           }
       
   153 
       
   154           @Override
       
   155           public void cancel() {
       
   156             final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace("onComplete");
       
   157             if (onCompleteStackElement.isDefined()) {
       
   158               final StackTraceElement stackElem = onCompleteStackElement.get();
       
   159               env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)",
       
   160                                      stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
       
   161             }
       
   162           }
       
   163         };
       
   164 
       
   165         final Subscriber<T> sub = createSubscriber();
       
   166         sub.onSubscribe(subs);
       
   167         sub.onComplete();
       
   168 
       
   169         env.verifyNoAsyncErrorsNoDelay();
       
   170       }
       
   171     });
       
   172   }
       
   173 
       
   174   @Override @Test
       
   175   public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable {
       
   176     blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
       
   177       @Override
       
   178       public void run(BlackboxTestStage stage) throws Throwable {
       
   179         final Subscription subs = new Subscription() {
       
   180           @Override
       
   181           public void request(long n) {
       
   182             Throwable thr = new Throwable();
       
   183             for (StackTraceElement stackElem : thr.getStackTrace()) {
       
   184               if (stackElem.getMethodName().equals("onError")) {
       
   185                 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
       
   186                                        stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
       
   187               }
       
   188             }
       
   189           }
       
   190 
       
   191           @Override
       
   192           public void cancel() {
       
   193             Throwable thr = new Throwable();
       
   194             for (StackTraceElement stackElem : thr.getStackTrace()) {
       
   195               if (stackElem.getMethodName().equals("onError")) {
       
   196                 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)",
       
   197                                        stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber()));
       
   198               }
       
   199             }
       
   200           }
       
   201         };
       
   202 
       
   203         final Subscriber<T> sub = createSubscriber();
       
   204         sub.onSubscribe(subs);
       
   205         sub.onError(new TestException());
       
   206 
       
   207         env.verifyNoAsyncErrorsNoDelay();
       
   208       }
       
   209     });
       
   210   }
       
   211 
       
   212   @Override @Test
       
   213   public void untested_spec204_blackbox_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception {
       
   214     notVerified(); // cannot be meaningfully tested, or can it?
       
   215   }
       
   216 
       
   217   @Override @Test
       
   218   public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception {
       
   219     new BlackboxTestStage(env) {{
       
   220       // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail
       
   221       final TestEnvironment.Latch secondSubscriptionCancelled = new TestEnvironment.Latch(env);
       
   222       sub().onSubscribe(
       
   223           new Subscription() {
       
   224             @Override
       
   225             public void request(long elements) {
       
   226               env.flop(String.format("Subscriber %s illegally called `subscription.request(%s)`!", sub(), elements));
       
   227             }
       
   228 
       
   229             @Override
       
   230             public void cancel() {
       
   231               secondSubscriptionCancelled.close();
       
   232             }
       
   233 
       
   234             @Override
       
   235             public String toString() {
       
   236               return "SecondSubscription(should get cancelled)";
       
   237             }
       
   238           });
       
   239 
       
   240       secondSubscriptionCancelled.expectClose("Expected SecondSubscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called.");
       
   241       env.verifyNoAsyncErrorsNoDelay();
       
   242       sendCompletion(); // we're done, complete the subscriber under test
       
   243     }};
       
   244   }
       
   245 
       
   246   @Override @Test
       
   247   public void untested_spec206_blackbox_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception {
       
   248     notVerified(); // cannot be meaningfully tested, or can it?
       
   249   }
       
   250 
       
   251   @Override @Test
       
   252   public void untested_spec207_blackbox_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception {
       
   253     notVerified(); // cannot be meaningfully tested, or can it?
       
   254     // the same thread part of the clause can be verified but that is not very useful, or is it?
       
   255   }
       
   256 
       
   257   @Override @Test
       
   258   public void untested_spec208_blackbox_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable {
       
   259     notVerified(); // cannot be meaningfully tested as black box, or can it?
       
   260   }
       
   261 
       
   262   @Override @Test
       
   263   public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable {
       
   264     blackboxSubscriberTest(new BlackboxTestStageTestRun() {
       
   265       @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
       
   266       public void run(BlackboxTestStage stage) throws Throwable {
       
   267         triggerRequest(stage.subProxy().sub());
       
   268         final long notUsed = stage.expectRequest(); // received request signal
       
   269         stage.sub().onComplete();
       
   270         stage.subProxy().expectCompletion();
       
   271       }
       
   272     });
       
   273   }
       
   274 
       
   275   @Override @Test
       
   276   public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable {
       
   277     blackboxSubscriberTest(new BlackboxTestStageTestRun() {
       
   278       @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
       
   279       public void run(BlackboxTestStage stage) throws Throwable {
       
   280         final Subscriber<? super T> sub = stage.sub();
       
   281         sub.onComplete();
       
   282         stage.subProxy().expectCompletion();
       
   283       }
       
   284     });
       
   285   }
       
   286 
       
   287   @Override @Test
       
   288   public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable {
       
   289     blackboxSubscriberTest(new BlackboxTestStageTestRun() {
       
   290       @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
       
   291       public void run(BlackboxTestStage stage) throws Throwable {
       
   292         triggerRequest(stage.subProxy().sub());
       
   293         final long notUsed = stage.expectRequest(); // received request signal
       
   294         stage.sub().onError(new TestException()); // in response to that, we fail
       
   295         stage.subProxy().expectError(Throwable.class);
       
   296       }
       
   297     });
       
   298   }
       
   299 
       
   300   // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10
       
   301   @Override @Test
       
   302   public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable {
       
   303     blackboxSubscriberTest(new BlackboxTestStageTestRun() {
       
   304       @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
       
   305       public void run(BlackboxTestStage stage) throws Throwable {
       
   306 
       
   307         stage.sub().onError(new TestException());
       
   308         stage.subProxy().expectError(Throwable.class);
       
   309       }
       
   310     });
       
   311   }
       
   312 
       
   313   @Override @Test
       
   314   public void untested_spec211_blackbox_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception {
       
   315     notVerified(); // cannot be meaningfully tested, or can it?
       
   316   }
       
   317 
       
   318   @Override @Test
       
   319   public void untested_spec212_blackbox_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable {
       
   320     notVerified(); // cannot be meaningfully tested as black box, or can it?
       
   321   }
       
   322 
       
   323   @Override @Test
       
   324   public void untested_spec213_blackbox_failingOnSignalInvocation() throws Exception {
       
   325     notVerified(); // cannot be meaningfully tested, or can it?
       
   326   }
       
   327 
       
   328   @Override @Test
       
   329   public void required_spec213_blackbox_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
       
   330     blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
       
   331       @Override
       
   332       public void run(BlackboxTestStage stage) throws Throwable {
       
   333 
       
   334         {
       
   335           final Subscriber<T> sub = createSubscriber();
       
   336           boolean gotNPE = false;
       
   337           try {
       
   338             sub.onSubscribe(null);
       
   339           } catch(final NullPointerException expected) {
       
   340             gotNPE = true;
       
   341           }
       
   342           assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException");
       
   343         }
       
   344 
       
   345         env.verifyNoAsyncErrorsNoDelay();
       
   346       }
       
   347     });
       
   348   }
       
   349 
       
   350   @Override @Test
       
   351   public void required_spec213_blackbox_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
       
   352     blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
       
   353       @Override
       
   354       public void run(BlackboxTestStage stage) throws Throwable {
       
   355         final Subscription subscription = new Subscription() {
       
   356           @Override public void request(final long elements) {}
       
   357           @Override public void cancel() {}
       
   358         };
       
   359 
       
   360         {
       
   361           final Subscriber<T> sub = createSubscriber();
       
   362           boolean gotNPE = false;
       
   363           sub.onSubscribe(subscription);
       
   364           try {
       
   365             sub.onNext(null);
       
   366           } catch(final NullPointerException expected) {
       
   367             gotNPE = true;
       
   368           }
       
   369           assertTrue(gotNPE, "onNext(null) did not throw NullPointerException");
       
   370         }
       
   371 
       
   372         env.verifyNoAsyncErrorsNoDelay();
       
   373       }
       
   374     });
       
   375   }
       
   376 
       
   377   @Override @Test
       
   378   public void required_spec213_blackbox_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable {
       
   379     blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() {
       
   380       @Override
       
   381       public void run(BlackboxTestStage stage) throws Throwable {
       
   382         final Subscription subscription = new Subscription() {
       
   383           @Override public void request(final long elements) {}
       
   384           @Override public void cancel() {}
       
   385         };
       
   386 
       
   387         {
       
   388           final Subscriber<T> sub = createSubscriber();
       
   389           boolean gotNPE = false;
       
   390           sub.onSubscribe(subscription);
       
   391           try {
       
   392             sub.onError(null);
       
   393           } catch(final NullPointerException expected) {
       
   394             gotNPE = true;
       
   395           }
       
   396           assertTrue(gotNPE, "onError(null) did not throw NullPointerException");
       
   397         }
       
   398 
       
   399         env.verifyNoAsyncErrorsNoDelay();
       
   400       }
       
   401     });
       
   402   }
       
   403 
       
   404   ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION //////////////////
       
   405 
       
   406   @Override @Test
       
   407   public void untested_spec301_blackbox_mustNotBeCalledOutsideSubscriberContext() throws Exception {
       
   408     notVerified(); // cannot be meaningfully tested, or can it?
       
   409   }
       
   410 
       
   411   @Override @Test
       
   412   public void untested_spec308_blackbox_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable {
       
   413     notVerified(); // cannot be meaningfully tested as black box, or can it?
       
   414   }
       
   415 
       
   416   @Override @Test
       
   417   public void untested_spec310_blackbox_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception {
       
   418     notVerified(); // cannot be meaningfully tested, or can it?
       
   419   }
       
   420 
       
   421   @Override @Test
       
   422   public void untested_spec311_blackbox_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception {
       
   423     notVerified(); // cannot be meaningfully tested, or can it?
       
   424   }
       
   425 
       
   426   @Override @Test
       
   427   public void untested_spec314_blackbox_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception {
       
   428     notVerified(); // cannot be meaningfully tested, or can it?
       
   429   }
       
   430 
       
   431   @Override @Test
       
   432   public void untested_spec315_blackbox_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception {
       
   433     notVerified(); // cannot be meaningfully tested, or can it?
       
   434   }
       
   435 
       
   436   @Override @Test
       
   437   public void untested_spec316_blackbox_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception {
       
   438     notVerified(); // cannot be meaningfully tested, or can it?
       
   439   }
       
   440 
       
   441   /////////////////////// ADDITIONAL "COROLLARY" TESTS ////////////////////////
       
   442 
       
   443   /////////////////////// TEST INFRASTRUCTURE /////////////////////////////////
       
   444 
       
   445   abstract class BlackboxTestStageTestRun {
       
   446     public abstract void run(BlackboxTestStage stage) throws Throwable;
       
   447   }
       
   448 
       
   449   public void blackboxSubscriberTest(BlackboxTestStageTestRun body) throws Throwable {
       
   450     BlackboxTestStage stage = new BlackboxTestStage(env, true);
       
   451     body.run(stage);
       
   452   }
       
   453 
       
   454   public void blackboxSubscriberWithoutSetupTest(BlackboxTestStageTestRun body) throws Throwable {
       
   455     BlackboxTestStage stage = new BlackboxTestStage(env, false);
       
   456     body.run(stage);
       
   457   }
       
   458 
       
   459   public class BlackboxTestStage extends ManualPublisher<T> {
       
   460     public Publisher<T> pub;
       
   461     public ManualSubscriber<T> tees; // gives us access to an infinite stream of T values
       
   462 
       
   463     public T lastT = null;
       
   464     private Optional<BlackboxSubscriberProxy<T>> subProxy = Optional.empty();
       
   465 
       
   466     public BlackboxTestStage(TestEnvironment env) throws InterruptedException {
       
   467       this(env, true);
       
   468     }
       
   469 
       
   470     public BlackboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException {
       
   471       super(env);
       
   472       if (runDefaultInit) {
       
   473         pub = this.createHelperPublisher(Long.MAX_VALUE);
       
   474         tees = env.newManualSubscriber(pub);
       
   475         Subscriber<T> sub = createSubscriber();
       
   476         subProxy = Optional.of(createBlackboxSubscriberProxy(env, sub));
       
   477         subscribe(subProxy.get());
       
   478       }
       
   479     }
       
   480 
       
   481     public Subscriber<? super T> sub() {
       
   482       return subscriber.value();
       
   483     }
       
   484 
       
   485     /**
       
   486      * Proxy for the {@link #sub()} {@code Subscriber}, providing certain assertions on methods being called on the Subscriber.
       
   487      */
       
   488     public BlackboxSubscriberProxy<T> subProxy() {
       
   489       return subProxy.get();
       
   490     }
       
   491 
       
   492     public Publisher<T> createHelperPublisher(long elements) {
       
   493       return SubscriberBlackboxVerification.this.createHelperPublisher(elements);
       
   494     }
       
   495 
       
   496     public BlackboxSubscriberProxy<T> createBlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> sub) {
       
   497       return new BlackboxSubscriberProxy<T>(env, sub);
       
   498     }
       
   499 
       
   500     public T signalNext() throws InterruptedException {
       
   501       T element = nextT();
       
   502       sendNext(element);
       
   503       return element;
       
   504     }
       
   505 
       
   506     public T nextT() throws InterruptedException {
       
   507       lastT = tees.requestNextElement();
       
   508       return lastT;
       
   509     }
       
   510 
       
   511   }
       
   512 
       
   513   public void notVerified() {
       
   514     throw new SkipException("Not verified using this TCK.");
       
   515   }
       
   516 }