|
1 /* |
|
2 * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. |
|
8 * |
|
9 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
12 * version 2 for more details (a copy is included in the LICENSE file that |
|
13 * accompanied this code). |
|
14 * |
|
15 * You should have received a copy of the GNU General Public License version |
|
16 * 2 along with this work; if not, write to the Free Software Foundation, |
|
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
18 * |
|
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
20 * or visit www.oracle.com if you need additional information or have any |
|
21 * questions. |
|
22 */ |
|
23 |
|
24 package org.reactivestreams.tck; |
|
25 |
|
26 import org.reactivestreams.Publisher; |
|
27 import org.reactivestreams.Subscriber; |
|
28 import org.reactivestreams.Subscription; |
|
29 import org.reactivestreams.tck.TestEnvironment.ManualPublisher; |
|
30 import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; |
|
31 import org.reactivestreams.tck.flow.support.Optional; |
|
32 import org.reactivestreams.tck.flow.support.SubscriberBlackboxVerificationRules; |
|
33 import org.reactivestreams.tck.flow.support.TestException; |
|
34 import org.testng.SkipException; |
|
35 import org.testng.annotations.AfterClass; |
|
36 import org.testng.annotations.BeforeClass; |
|
37 import org.testng.annotations.BeforeMethod; |
|
38 import org.testng.annotations.Test; |
|
39 |
|
40 import java.util.concurrent.ExecutorService; |
|
41 import java.util.concurrent.Executors; |
|
42 |
|
43 import static org.reactivestreams.tck.SubscriberWhiteboxVerification.BlackboxSubscriberProxy; |
|
44 import static org.testng.Assert.assertTrue; |
|
45 |
|
46 /** |
|
47 * Provides tests for verifying {@link org.reactivestreams.Subscriber} and {@link org.reactivestreams.Subscription} |
|
48 * specification rules, without any modifications to the tested implementation (also known as "Black Box" testing). |
|
49 * |
|
50 * This verification is NOT able to check many of the rules of the spec, and if you want more |
|
51 * verification of your implementation you'll have to implement {@code org.reactivestreams.tck.SubscriberWhiteboxVerification} |
|
52 * instead. |
|
53 * |
|
54 * @see org.reactivestreams.Subscriber |
|
55 * @see org.reactivestreams.Subscription |
|
56 */ |
|
57 public abstract class SubscriberBlackboxVerification<T> extends WithHelperPublisher<T> |
|
58 implements SubscriberBlackboxVerificationRules { |
|
59 |
|
60 protected final TestEnvironment env; |
|
61 |
|
62 protected SubscriberBlackboxVerification(TestEnvironment env) { |
|
63 this.env = env; |
|
64 } |
|
65 |
|
66 // USER API |
|
67 |
|
68 /** |
|
69 * This is the main method you must implement in your test incarnation. |
|
70 * It must create a new {@link org.reactivestreams.Subscriber} instance to be subjected to the testing logic. |
|
71 */ |
|
72 public abstract Subscriber<T> createSubscriber(); |
|
73 |
|
74 /** |
|
75 * Override this method if the Subscriber implementation you are verifying |
|
76 * needs an external signal before it signals demand to its Publisher. |
|
77 * |
|
78 * By default this method does nothing. |
|
79 */ |
|
80 public void triggerRequest(final Subscriber<? super T> subscriber) { |
|
81 // this method is intentionally left blank |
|
82 } |
|
83 |
|
84 // ENV SETUP |
|
85 |
|
86 /** |
|
87 * Executor service used by the default provided asynchronous Publisher. |
|
88 * @see #createHelperPublisher(long) |
|
89 */ |
|
90 private ExecutorService publisherExecutor; |
|
91 @BeforeClass public void startPublisherExecutorService() { publisherExecutor = Executors.newFixedThreadPool(4); } |
|
92 @AfterClass public void shutdownPublisherExecutorService() { if (publisherExecutor != null) publisherExecutor.shutdown(); } |
|
93 @Override public ExecutorService publisherExecutorService() { return publisherExecutor; } |
|
94 |
|
95 ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// |
|
96 |
|
97 @BeforeMethod |
|
98 public void setUp() throws Exception { |
|
99 env.clearAsyncErrors(); |
|
100 } |
|
101 |
|
102 ////////////////////// SPEC RULE VERIFICATION /////////////////////////////// |
|
103 |
|
104 @Override @Test |
|
105 public void required_spec201_blackbox_mustSignalDemandViaSubscriptionRequest() throws Throwable { |
|
106 blackboxSubscriberTest(new BlackboxTestStageTestRun() { |
|
107 @Override |
|
108 public void run(BlackboxTestStage stage) throws InterruptedException { |
|
109 triggerRequest(stage.subProxy().sub()); |
|
110 final long requested = stage.expectRequest();// assuming subscriber wants to consume elements... |
|
111 final long signalsToEmit = Math.min(requested, 512); // protecting against Subscriber which sends ridiculous large demand |
|
112 |
|
113 // should cope with up to requested number of elements |
|
114 for (int i = 0; i < signalsToEmit && sampleIsCancelled(stage, i, 10); i++) |
|
115 stage.signalNext(); |
|
116 |
|
117 // we complete after `signalsToEmit` (which can be less than `requested`), |
|
118 // which is legal under https://github.com/reactive-streams/reactive-streams-jvm#1.2 |
|
119 stage.sendCompletion(); |
|
120 } |
|
121 |
|
122 /** |
|
123 * In order to allow some "skid" and not check state on each iteration, |
|
124 * only check {@code stage.isCancelled} every {@code checkInterval}'th iteration. |
|
125 */ |
|
126 private boolean sampleIsCancelled(BlackboxTestStage stage, int i, int checkInterval) throws InterruptedException { |
|
127 if (i % checkInterval == 0) return stage.isCancelled(); |
|
128 else return false; |
|
129 } |
|
130 }); |
|
131 } |
|
132 |
|
133 @Override @Test |
|
134 public void untested_spec202_blackbox_shouldAsynchronouslyDispatch() throws Exception { |
|
135 notVerified(); // cannot be meaningfully tested, or can it? |
|
136 } |
|
137 |
|
138 @Override @Test |
|
139 public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnComplete() throws Throwable { |
|
140 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { |
|
141 @Override |
|
142 public void run(BlackboxTestStage stage) throws Throwable { |
|
143 final Subscription subs = new Subscription() { |
|
144 @Override |
|
145 public void request(long n) { |
|
146 final Optional<StackTraceElement> onCompleteStackTraceElement = env.findCallerMethodInStackTrace("onComplete"); |
|
147 if (onCompleteStackTraceElement.isDefined()) { |
|
148 final StackTraceElement stackElem = onCompleteStackTraceElement.get(); |
|
149 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", |
|
150 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); |
|
151 } |
|
152 } |
|
153 |
|
154 @Override |
|
155 public void cancel() { |
|
156 final Optional<StackTraceElement> onCompleteStackElement = env.findCallerMethodInStackTrace("onComplete"); |
|
157 if (onCompleteStackElement.isDefined()) { |
|
158 final StackTraceElement stackElem = onCompleteStackElement.get(); |
|
159 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onComplete (Rule 2.3)! (Caller: %s::%s line %d)", |
|
160 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); |
|
161 } |
|
162 } |
|
163 }; |
|
164 |
|
165 final Subscriber<T> sub = createSubscriber(); |
|
166 sub.onSubscribe(subs); |
|
167 sub.onComplete(); |
|
168 |
|
169 env.verifyNoAsyncErrorsNoDelay(); |
|
170 } |
|
171 }); |
|
172 } |
|
173 |
|
174 @Override @Test |
|
175 public void required_spec203_blackbox_mustNotCallMethodsOnSubscriptionOrPublisherInOnError() throws Throwable { |
|
176 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { |
|
177 @Override |
|
178 public void run(BlackboxTestStage stage) throws Throwable { |
|
179 final Subscription subs = new Subscription() { |
|
180 @Override |
|
181 public void request(long n) { |
|
182 Throwable thr = new Throwable(); |
|
183 for (StackTraceElement stackElem : thr.getStackTrace()) { |
|
184 if (stackElem.getMethodName().equals("onError")) { |
|
185 env.flop(String.format("Subscription::request MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", |
|
186 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); |
|
187 } |
|
188 } |
|
189 } |
|
190 |
|
191 @Override |
|
192 public void cancel() { |
|
193 Throwable thr = new Throwable(); |
|
194 for (StackTraceElement stackElem : thr.getStackTrace()) { |
|
195 if (stackElem.getMethodName().equals("onError")) { |
|
196 env.flop(String.format("Subscription::cancel MUST NOT be called from Subscriber::onError (Rule 2.3)! (Caller: %s::%s line %d)", |
|
197 stackElem.getClassName(), stackElem.getMethodName(), stackElem.getLineNumber())); |
|
198 } |
|
199 } |
|
200 } |
|
201 }; |
|
202 |
|
203 final Subscriber<T> sub = createSubscriber(); |
|
204 sub.onSubscribe(subs); |
|
205 sub.onError(new TestException()); |
|
206 |
|
207 env.verifyNoAsyncErrorsNoDelay(); |
|
208 } |
|
209 }); |
|
210 } |
|
211 |
|
212 @Override @Test |
|
213 public void untested_spec204_blackbox_mustConsiderTheSubscriptionAsCancelledInAfterRecievingOnCompleteOrOnError() throws Exception { |
|
214 notVerified(); // cannot be meaningfully tested, or can it? |
|
215 } |
|
216 |
|
217 @Override @Test |
|
218 public void required_spec205_blackbox_mustCallSubscriptionCancelIfItAlreadyHasAnSubscriptionAndReceivesAnotherOnSubscribeSignal() throws Exception { |
|
219 new BlackboxTestStage(env) {{ |
|
220 // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail |
|
221 final TestEnvironment.Latch secondSubscriptionCancelled = new TestEnvironment.Latch(env); |
|
222 sub().onSubscribe( |
|
223 new Subscription() { |
|
224 @Override |
|
225 public void request(long elements) { |
|
226 env.flop(String.format("Subscriber %s illegally called `subscription.request(%s)`!", sub(), elements)); |
|
227 } |
|
228 |
|
229 @Override |
|
230 public void cancel() { |
|
231 secondSubscriptionCancelled.close(); |
|
232 } |
|
233 |
|
234 @Override |
|
235 public String toString() { |
|
236 return "SecondSubscription(should get cancelled)"; |
|
237 } |
|
238 }); |
|
239 |
|
240 secondSubscriptionCancelled.expectClose("Expected SecondSubscription given to subscriber to be cancelled, but `Subscription.cancel()` was not called."); |
|
241 env.verifyNoAsyncErrorsNoDelay(); |
|
242 sendCompletion(); // we're done, complete the subscriber under test |
|
243 }}; |
|
244 } |
|
245 |
|
246 @Override @Test |
|
247 public void untested_spec206_blackbox_mustCallSubscriptionCancelIfItIsNoLongerValid() throws Exception { |
|
248 notVerified(); // cannot be meaningfully tested, or can it? |
|
249 } |
|
250 |
|
251 @Override @Test |
|
252 public void untested_spec207_blackbox_mustEnsureAllCallsOnItsSubscriptionTakePlaceFromTheSameThreadOrTakeCareOfSynchronization() throws Exception { |
|
253 notVerified(); // cannot be meaningfully tested, or can it? |
|
254 // the same thread part of the clause can be verified but that is not very useful, or is it? |
|
255 } |
|
256 |
|
257 @Override @Test |
|
258 public void untested_spec208_blackbox_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel() throws Throwable { |
|
259 notVerified(); // cannot be meaningfully tested as black box, or can it? |
|
260 } |
|
261 |
|
262 @Override @Test |
|
263 public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithPrecedingRequestCall() throws Throwable { |
|
264 blackboxSubscriberTest(new BlackboxTestStageTestRun() { |
|
265 @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored") |
|
266 public void run(BlackboxTestStage stage) throws Throwable { |
|
267 triggerRequest(stage.subProxy().sub()); |
|
268 final long notUsed = stage.expectRequest(); // received request signal |
|
269 stage.sub().onComplete(); |
|
270 stage.subProxy().expectCompletion(); |
|
271 } |
|
272 }); |
|
273 } |
|
274 |
|
275 @Override @Test |
|
276 public void required_spec209_blackbox_mustBePreparedToReceiveAnOnCompleteSignalWithoutPrecedingRequestCall() throws Throwable { |
|
277 blackboxSubscriberTest(new BlackboxTestStageTestRun() { |
|
278 @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored") |
|
279 public void run(BlackboxTestStage stage) throws Throwable { |
|
280 final Subscriber<? super T> sub = stage.sub(); |
|
281 sub.onComplete(); |
|
282 stage.subProxy().expectCompletion(); |
|
283 } |
|
284 }); |
|
285 } |
|
286 |
|
287 @Override @Test |
|
288 public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithPrecedingRequestCall() throws Throwable { |
|
289 blackboxSubscriberTest(new BlackboxTestStageTestRun() { |
|
290 @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored") |
|
291 public void run(BlackboxTestStage stage) throws Throwable { |
|
292 triggerRequest(stage.subProxy().sub()); |
|
293 final long notUsed = stage.expectRequest(); // received request signal |
|
294 stage.sub().onError(new TestException()); // in response to that, we fail |
|
295 stage.subProxy().expectError(Throwable.class); |
|
296 } |
|
297 }); |
|
298 } |
|
299 |
|
300 // Verifies rule: https://github.com/reactive-streams/reactive-streams-jvm#2.10 |
|
301 @Override @Test |
|
302 public void required_spec210_blackbox_mustBePreparedToReceiveAnOnErrorSignalWithoutPrecedingRequestCall() throws Throwable { |
|
303 blackboxSubscriberTest(new BlackboxTestStageTestRun() { |
|
304 @Override @SuppressWarnings("ThrowableResultOfMethodCallIgnored") |
|
305 public void run(BlackboxTestStage stage) throws Throwable { |
|
306 |
|
307 stage.sub().onError(new TestException()); |
|
308 stage.subProxy().expectError(Throwable.class); |
|
309 } |
|
310 }); |
|
311 } |
|
312 |
|
313 @Override @Test |
|
314 public void untested_spec211_blackbox_mustMakeSureThatAllCallsOnItsMethodsHappenBeforeTheProcessingOfTheRespectiveEvents() throws Exception { |
|
315 notVerified(); // cannot be meaningfully tested, or can it? |
|
316 } |
|
317 |
|
318 @Override @Test |
|
319 public void untested_spec212_blackbox_mustNotCallOnSubscribeMoreThanOnceBasedOnObjectEquality() throws Throwable { |
|
320 notVerified(); // cannot be meaningfully tested as black box, or can it? |
|
321 } |
|
322 |
|
323 @Override @Test |
|
324 public void untested_spec213_blackbox_failingOnSignalInvocation() throws Exception { |
|
325 notVerified(); // cannot be meaningfully tested, or can it? |
|
326 } |
|
327 |
|
328 @Override @Test |
|
329 public void required_spec213_blackbox_onSubscribe_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { |
|
330 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { |
|
331 @Override |
|
332 public void run(BlackboxTestStage stage) throws Throwable { |
|
333 |
|
334 { |
|
335 final Subscriber<T> sub = createSubscriber(); |
|
336 boolean gotNPE = false; |
|
337 try { |
|
338 sub.onSubscribe(null); |
|
339 } catch(final NullPointerException expected) { |
|
340 gotNPE = true; |
|
341 } |
|
342 assertTrue(gotNPE, "onSubscribe(null) did not throw NullPointerException"); |
|
343 } |
|
344 |
|
345 env.verifyNoAsyncErrorsNoDelay(); |
|
346 } |
|
347 }); |
|
348 } |
|
349 |
|
350 @Override @Test |
|
351 public void required_spec213_blackbox_onNext_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { |
|
352 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { |
|
353 @Override |
|
354 public void run(BlackboxTestStage stage) throws Throwable { |
|
355 final Subscription subscription = new Subscription() { |
|
356 @Override public void request(final long elements) {} |
|
357 @Override public void cancel() {} |
|
358 }; |
|
359 |
|
360 { |
|
361 final Subscriber<T> sub = createSubscriber(); |
|
362 boolean gotNPE = false; |
|
363 sub.onSubscribe(subscription); |
|
364 try { |
|
365 sub.onNext(null); |
|
366 } catch(final NullPointerException expected) { |
|
367 gotNPE = true; |
|
368 } |
|
369 assertTrue(gotNPE, "onNext(null) did not throw NullPointerException"); |
|
370 } |
|
371 |
|
372 env.verifyNoAsyncErrorsNoDelay(); |
|
373 } |
|
374 }); |
|
375 } |
|
376 |
|
377 @Override @Test |
|
378 public void required_spec213_blackbox_onError_mustThrowNullPointerExceptionWhenParametersAreNull() throws Throwable { |
|
379 blackboxSubscriberWithoutSetupTest(new BlackboxTestStageTestRun() { |
|
380 @Override |
|
381 public void run(BlackboxTestStage stage) throws Throwable { |
|
382 final Subscription subscription = new Subscription() { |
|
383 @Override public void request(final long elements) {} |
|
384 @Override public void cancel() {} |
|
385 }; |
|
386 |
|
387 { |
|
388 final Subscriber<T> sub = createSubscriber(); |
|
389 boolean gotNPE = false; |
|
390 sub.onSubscribe(subscription); |
|
391 try { |
|
392 sub.onError(null); |
|
393 } catch(final NullPointerException expected) { |
|
394 gotNPE = true; |
|
395 } |
|
396 assertTrue(gotNPE, "onError(null) did not throw NullPointerException"); |
|
397 } |
|
398 |
|
399 env.verifyNoAsyncErrorsNoDelay(); |
|
400 } |
|
401 }); |
|
402 } |
|
403 |
|
404 ////////////////////// SUBSCRIPTION SPEC RULE VERIFICATION ////////////////// |
|
405 |
|
406 @Override @Test |
|
407 public void untested_spec301_blackbox_mustNotBeCalledOutsideSubscriberContext() throws Exception { |
|
408 notVerified(); // cannot be meaningfully tested, or can it? |
|
409 } |
|
410 |
|
411 @Override @Test |
|
412 public void untested_spec308_blackbox_requestMustRegisterGivenNumberElementsToBeProduced() throws Throwable { |
|
413 notVerified(); // cannot be meaningfully tested as black box, or can it? |
|
414 } |
|
415 |
|
416 @Override @Test |
|
417 public void untested_spec310_blackbox_requestMaySynchronouslyCallOnNextOnSubscriber() throws Exception { |
|
418 notVerified(); // cannot be meaningfully tested, or can it? |
|
419 } |
|
420 |
|
421 @Override @Test |
|
422 public void untested_spec311_blackbox_requestMaySynchronouslyCallOnCompleteOrOnError() throws Exception { |
|
423 notVerified(); // cannot be meaningfully tested, or can it? |
|
424 } |
|
425 |
|
426 @Override @Test |
|
427 public void untested_spec314_blackbox_cancelMayCauseThePublisherToShutdownIfNoOtherSubscriptionExists() throws Exception { |
|
428 notVerified(); // cannot be meaningfully tested, or can it? |
|
429 } |
|
430 |
|
431 @Override @Test |
|
432 public void untested_spec315_blackbox_cancelMustNotThrowExceptionAndMustSignalOnError() throws Exception { |
|
433 notVerified(); // cannot be meaningfully tested, or can it? |
|
434 } |
|
435 |
|
436 @Override @Test |
|
437 public void untested_spec316_blackbox_requestMustNotThrowExceptionAndMustOnErrorTheSubscriber() throws Exception { |
|
438 notVerified(); // cannot be meaningfully tested, or can it? |
|
439 } |
|
440 |
|
441 /////////////////////// ADDITIONAL "COROLLARY" TESTS //////////////////////// |
|
442 |
|
443 /////////////////////// TEST INFRASTRUCTURE ///////////////////////////////// |
|
444 |
|
445 abstract class BlackboxTestStageTestRun { |
|
446 public abstract void run(BlackboxTestStage stage) throws Throwable; |
|
447 } |
|
448 |
|
449 public void blackboxSubscriberTest(BlackboxTestStageTestRun body) throws Throwable { |
|
450 BlackboxTestStage stage = new BlackboxTestStage(env, true); |
|
451 body.run(stage); |
|
452 } |
|
453 |
|
454 public void blackboxSubscriberWithoutSetupTest(BlackboxTestStageTestRun body) throws Throwable { |
|
455 BlackboxTestStage stage = new BlackboxTestStage(env, false); |
|
456 body.run(stage); |
|
457 } |
|
458 |
|
459 public class BlackboxTestStage extends ManualPublisher<T> { |
|
460 public Publisher<T> pub; |
|
461 public ManualSubscriber<T> tees; // gives us access to an infinite stream of T values |
|
462 |
|
463 public T lastT = null; |
|
464 private Optional<BlackboxSubscriberProxy<T>> subProxy = Optional.empty(); |
|
465 |
|
466 public BlackboxTestStage(TestEnvironment env) throws InterruptedException { |
|
467 this(env, true); |
|
468 } |
|
469 |
|
470 public BlackboxTestStage(TestEnvironment env, boolean runDefaultInit) throws InterruptedException { |
|
471 super(env); |
|
472 if (runDefaultInit) { |
|
473 pub = this.createHelperPublisher(Long.MAX_VALUE); |
|
474 tees = env.newManualSubscriber(pub); |
|
475 Subscriber<T> sub = createSubscriber(); |
|
476 subProxy = Optional.of(createBlackboxSubscriberProxy(env, sub)); |
|
477 subscribe(subProxy.get()); |
|
478 } |
|
479 } |
|
480 |
|
481 public Subscriber<? super T> sub() { |
|
482 return subscriber.value(); |
|
483 } |
|
484 |
|
485 /** |
|
486 * Proxy for the {@link #sub()} {@code Subscriber}, providing certain assertions on methods being called on the Subscriber. |
|
487 */ |
|
488 public BlackboxSubscriberProxy<T> subProxy() { |
|
489 return subProxy.get(); |
|
490 } |
|
491 |
|
492 public Publisher<T> createHelperPublisher(long elements) { |
|
493 return SubscriberBlackboxVerification.this.createHelperPublisher(elements); |
|
494 } |
|
495 |
|
496 public BlackboxSubscriberProxy<T> createBlackboxSubscriberProxy(TestEnvironment env, Subscriber<T> sub) { |
|
497 return new BlackboxSubscriberProxy<T>(env, sub); |
|
498 } |
|
499 |
|
500 public T signalNext() throws InterruptedException { |
|
501 T element = nextT(); |
|
502 sendNext(element); |
|
503 return element; |
|
504 } |
|
505 |
|
506 public T nextT() throws InterruptedException { |
|
507 lastT = tees.requestNextElement(); |
|
508 return lastT; |
|
509 } |
|
510 |
|
511 } |
|
512 |
|
513 public void notVerified() { |
|
514 throw new SkipException("Not verified using this TCK."); |
|
515 } |
|
516 } |