author | prappo |
Tue, 02 Jul 2019 13:25:51 +0100 | |
changeset 55546 | 3ae57bbf9585 |
permissions | -rw-r--r-- |
55546
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
1 |
/* |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
2 |
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
3 |
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
4 |
* |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
5 |
* This code is free software; you can redistribute it and/or modify it |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
6 |
* under the terms of the GNU General Public License version 2 only, as |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
7 |
* published by the Free Software Foundation. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
8 |
* |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
9 |
* This code is distributed in the hope that it will be useful, but WITHOUT |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
10 |
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
11 |
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
12 |
* version 2 for more details (a copy is included in the LICENSE file that |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
13 |
* accompanied this code). |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
14 |
* |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
15 |
* You should have received a copy of the GNU General Public License version |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
16 |
* 2 along with this work; if not, write to the Free Software Foundation, |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
17 |
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
18 |
* |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
19 |
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
20 |
* or visit www.oracle.com if you need additional information or have any |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
21 |
* questions. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
22 |
*/ |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
23 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
24 |
package org.reactivestreams.example.unicast; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
25 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
26 |
import org.reactivestreams.Publisher; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
27 |
import org.reactivestreams.Subscriber; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
28 |
import org.reactivestreams.Subscription; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
29 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
30 |
import java.util.Iterator; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
31 |
import java.util.Collections; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
32 |
import java.util.concurrent.Executor; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
33 |
import java.util.concurrent.atomic.AtomicBoolean; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
34 |
import java.util.concurrent.ConcurrentLinkedQueue; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
35 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
36 |
/** |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
37 |
* AsyncIterablePublisher is an implementation of Reactive Streams `Publisher` |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
38 |
* which executes asynchronously, using a provided `Executor` and produces elements |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
39 |
* from a given `Iterable` in a "unicast" configuration to its `Subscribers`. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
40 |
* |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
41 |
* NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
42 |
*/ |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
43 |
public class AsyncIterablePublisher<T> implements Publisher<T> { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
44 |
private final static int DEFAULT_BATCHSIZE = 1024; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
45 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
46 |
private final Iterable<T> elements; // This is our data source / generator |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
47 |
private final Executor executor; // This is our thread pool, which will make sure that our Publisher runs asynchronously to its Subscribers |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
48 |
private final int batchSize; // In general, if one uses an `Executor`, one should be nice nad not hog a thread for too long, this is the cap for that, in elements |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
49 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
50 |
public AsyncIterablePublisher(final Iterable<T> elements, final Executor executor) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
51 |
this(elements, DEFAULT_BATCHSIZE, executor); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
52 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
53 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
54 |
public AsyncIterablePublisher(final Iterable<T> elements, final int batchSize, final Executor executor) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
55 |
if (elements == null) throw null; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
56 |
if (executor == null) throw null; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
57 |
if (batchSize < 1) throw new IllegalArgumentException("batchSize must be greater than zero!"); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
58 |
this.elements = elements; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
59 |
this.executor = executor; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
60 |
this.batchSize = batchSize; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
61 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
62 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
63 |
@Override |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
64 |
public void subscribe(final Subscriber<? super T> s) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
65 |
// As per rule 1.11, we have decided to support multiple subscribers in a unicast configuration |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
66 |
// for this `Publisher` implementation. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
67 |
// As per 2.13, this method must return normally (i.e. not throw) |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
68 |
new SubscriptionImpl(s).init(); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
69 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
70 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
71 |
// These represent the protocol of the `AsyncIterablePublishers` SubscriptionImpls |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
72 |
static interface Signal {}; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
73 |
enum Cancel implements Signal { Instance; }; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
74 |
enum Subscribe implements Signal { Instance; }; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
75 |
enum Send implements Signal { Instance; }; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
76 |
static final class Request implements Signal { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
77 |
final long n; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
78 |
Request(final long n) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
79 |
this.n = n; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
80 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
81 |
}; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
82 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
83 |
// This is our implementation of the Reactive Streams `Subscription`, |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
84 |
// which represents the association between a `Publisher` and a `Subscriber`. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
85 |
final class SubscriptionImpl implements Subscription, Runnable { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
86 |
final Subscriber<? super T> subscriber; // We need a reference to the `Subscriber` so we can talk to it |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
87 |
private boolean cancelled = false; // This flag will track whether this `Subscription` is to be considered cancelled or not |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
88 |
private long demand = 0; // Here we track the current demand, i.e. what has been requested but not yet delivered |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
89 |
private Iterator<T> iterator; // This is our cursor into the data stream, which we will send to the `Subscriber` |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
90 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
91 |
SubscriptionImpl(final Subscriber<? super T> subscriber) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
92 |
// As per rule 1.09, we need to throw a `java.lang.NullPointerException` if the `Subscriber` is `null` |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
93 |
if (subscriber == null) throw null; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
94 |
this.subscriber = subscriber; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
95 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
96 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
97 |
// This `ConcurrentLinkedQueue` will track signals that are sent to this `Subscription`, like `request` and `cancel` |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
98 |
private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue<Signal>(); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
99 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
100 |
// We are using this `AtomicBoolean` to make sure that this `Subscription` doesn't run concurrently with itself, |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
101 |
// which would violate rule 1.3 among others (no concurrent notifications). |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
102 |
private final AtomicBoolean on = new AtomicBoolean(false); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
103 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
104 |
// This method will register inbound demand from our `Subscriber` and validate it against rule 3.9 and rule 3.17 |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
105 |
private void doRequest(final long n) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
106 |
if (n < 1) |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
107 |
terminateDueTo(new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements.")); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
108 |
else if (demand + n < 1) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
109 |
// As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as "effectively unbounded" |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
110 |
demand = Long.MAX_VALUE; // Here we protect from the overflow and treat it as "effectively unbounded" |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
111 |
doSend(); // Then we proceed with sending data downstream |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
112 |
} else { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
113 |
demand += n; // Here we record the downstream demand |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
114 |
doSend(); // Then we can proceed with sending data downstream |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
115 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
116 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
117 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
118 |
// This handles cancellation requests, and is idempotent, thread-safe and not synchronously performing heavy computations as specified in rule 3.5 |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
119 |
private void doCancel() { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
120 |
cancelled = true; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
121 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
122 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
123 |
// Instead of executing `subscriber.onSubscribe` synchronously from within `Publisher.subscribe` |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
124 |
// we execute it asynchronously, this is to avoid executing the user code (`Iterable.iterator`) on the calling thread. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
125 |
// It also makes it easier to follow rule 1.9 |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
126 |
private void doSubscribe() { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
127 |
try { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
128 |
iterator = elements.iterator(); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
129 |
if (iterator == null) |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
130 |
iterator = Collections.<T>emptyList().iterator(); // So we can assume that `iterator` is never null |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
131 |
} catch(final Throwable t) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
132 |
subscriber.onSubscribe(new Subscription() { // We need to make sure we signal onSubscribe before onError, obeying rule 1.9 |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
133 |
@Override public void cancel() {} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
134 |
@Override public void request(long n) {} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
135 |
}); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
136 |
terminateDueTo(t); // Here we send onError, obeying rule 1.09 |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
137 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
138 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
139 |
if (!cancelled) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
140 |
// Deal with setting up the subscription with the subscriber |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
141 |
try { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
142 |
subscriber.onSubscribe(this); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
143 |
} catch(final Throwable t) { // Due diligence to obey 2.13 |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
144 |
terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t)); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
145 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
146 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
147 |
// Deal with already complete iterators promptly |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
148 |
boolean hasElements = false; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
149 |
try { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
150 |
hasElements = iterator.hasNext(); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
151 |
} catch(final Throwable t) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
152 |
terminateDueTo(t); // If hasNext throws, there's something wrong and we need to signal onError as per 1.2, 1.4, |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
153 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
154 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
155 |
// If we don't have anything to deliver, we're already done, so lets do the right thing and |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
156 |
// not wait for demand to deliver `onComplete` as per rule 1.2 and 1.3 |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
157 |
if (!hasElements) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
158 |
try { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
159 |
doCancel(); // Rule 1.6 says we need to consider the `Subscription` cancelled when `onComplete` is signalled |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
160 |
subscriber.onComplete(); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
161 |
} catch(final Throwable t) { // As per rule 2.13, `onComplete` is not allowed to throw exceptions, so we do what we can, and log this. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
162 |
(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t)).printStackTrace(System.err); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
163 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
164 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
165 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
166 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
167 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
168 |
// This is our behavior for producing elements downstream |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
169 |
private void doSend() { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
170 |
try { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
171 |
// In order to play nice with the `Executor` we will only send at-most `batchSize` before |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
172 |
// rescheduing ourselves and relinquishing the current thread. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
173 |
int leftInBatch = batchSize; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
174 |
do { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
175 |
T next; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
176 |
boolean hasNext; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
177 |
try { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
178 |
next = iterator.next(); // We have already checked `hasNext` when subscribing, so we can fall back to testing -after- `next` is called. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
179 |
hasNext = iterator.hasNext(); // Need to keep track of End-of-Stream |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
180 |
} catch (final Throwable t) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
181 |
terminateDueTo(t); // If `next` or `hasNext` throws (they can, since it is user-provided), we need to treat the stream as errored as per rule 1.4 |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
182 |
return; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
183 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
184 |
subscriber.onNext(next); // Then we signal the next element downstream to the `Subscriber` |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
185 |
if (!hasNext) { // If we are at End-of-Stream |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
186 |
doCancel(); // We need to consider this `Subscription` as cancelled as per rule 1.6 |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
187 |
subscriber.onComplete(); // Then we signal `onComplete` as per rule 1.2 and 1.5 |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
188 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
189 |
} while (!cancelled // This makes sure that rule 1.8 is upheld, i.e. we need to stop signalling "eventually" |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
190 |
&& --leftInBatch > 0 // This makes sure that we only send `batchSize` number of elements in one go (so we can yield to other Runnables) |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
191 |
&& --demand > 0); // This makes sure that rule 1.1 is upheld (sending more than was demanded) |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
192 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
193 |
if (!cancelled && demand > 0) // If the `Subscription` is still alive and well, and we have demand to satisfy, we signal ourselves to send more data |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
194 |
signal(Send.Instance); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
195 |
} catch(final Throwable t) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
196 |
// We can only get here if `onNext` or `onComplete` threw, and they are not allowed to according to 2.13, so we can only cancel and log here. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
197 |
doCancel(); // Make sure that we are cancelled, since we cannot do anything else since the `Subscriber` is faulty. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
198 |
(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext or onComplete.", t)).printStackTrace(System.err); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
199 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
200 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
201 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
202 |
// This is a helper method to ensure that we always `cancel` when we signal `onError` as per rule 1.6 |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
203 |
private void terminateDueTo(final Throwable t) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
204 |
cancelled = true; // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6 |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
205 |
try { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
206 |
subscriber.onError(t); // Then we signal the error downstream, to the `Subscriber` |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
207 |
} catch(final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.9, and all we can do is to log it. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
208 |
(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
209 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
210 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
211 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
212 |
// What `signal` does is that it sends signals to the `Subscription` asynchronously |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
213 |
private void signal(final Signal signal) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
214 |
if (inboundSignals.offer(signal)) // No need to null-check here as ConcurrentLinkedQueue does this for us |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
215 |
tryScheduleToExecute(); // Then we try to schedule it for execution, if it isn't already |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
216 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
217 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
218 |
// This is the main "event loop" if you so will |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
219 |
@Override public final void run() { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
220 |
if(on.get()) { // establishes a happens-before relationship with the end of the previous run |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
221 |
try { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
222 |
final Signal s = inboundSignals.poll(); // We take a signal off the queue |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
223 |
if (!cancelled) { // to make sure that we follow rule 1.8, 3.6 and 3.7 |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
224 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
225 |
// Below we simply unpack the `Signal`s and invoke the corresponding methods |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
226 |
if (s instanceof Request) |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
227 |
doRequest(((Request)s).n); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
228 |
else if (s == Send.Instance) |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
229 |
doSend(); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
230 |
else if (s == Cancel.Instance) |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
231 |
doCancel(); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
232 |
else if (s == Subscribe.Instance) |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
233 |
doSubscribe(); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
234 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
235 |
} finally { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
236 |
on.set(false); // establishes a happens-before relationship with the beginning of the next run |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
237 |
if(!inboundSignals.isEmpty()) // If we still have signals to process |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
238 |
tryScheduleToExecute(); // Then we try to schedule ourselves to execute again |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
239 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
240 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
241 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
242 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
243 |
// This method makes sure that this `Subscription` is only running on one Thread at a time, |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
244 |
// this is important to make sure that we follow rule 1.3 |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
245 |
private final void tryScheduleToExecute() { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
246 |
if(on.compareAndSet(false, true)) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
247 |
try { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
248 |
executor.execute(this); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
249 |
} catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
250 |
if (!cancelled) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
251 |
doCancel(); // First of all, this failure is not recoverable, so we need to follow rule 1.4 and 1.6 |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
252 |
try { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
253 |
terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t)); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
254 |
} finally { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
255 |
inboundSignals.clear(); // We're not going to need these anymore |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
256 |
// This subscription is cancelled by now, but letting it become schedulable again means |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
257 |
// that we can drain the inboundSignals queue if anything arrives after clearing |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
258 |
on.set(false); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
259 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
260 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
261 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
262 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
263 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
264 |
|
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
265 |
// Our implementation of `Subscription.request` sends a signal to the Subscription that more elements are in demand |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
266 |
@Override public void request(final long n) { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
267 |
signal(new Request(n)); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
268 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
269 |
// Our implementation of `Subscription.cancel` sends a signal to the Subscription that the `Subscriber` is not interested in any more elements |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
270 |
@Override public void cancel() { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
271 |
signal(Cancel.Instance); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
272 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
273 |
// The reason for the `init` method is that we want to ensure the `SubscriptionImpl` |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
274 |
// is completely constructed before it is exposed to the thread pool, therefor this |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
275 |
// method is only intended to be invoked once, and immediately after the constructor has |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
276 |
// finished. |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
277 |
void init() { |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
278 |
signal(Subscribe.Instance); |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
279 |
} |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
280 |
}; |
3ae57bbf9585
8226602: Test convenience reactive primitives from java.net.http with RS TCK
prappo
parents:
diff
changeset
|
281 |
} |