43 import java.util.concurrent.ConcurrentLinkedQueue; |
43 import java.util.concurrent.ConcurrentLinkedQueue; |
44 import java.util.concurrent.Executor; |
44 import java.util.concurrent.Executor; |
45 import java.util.concurrent.Flow; |
45 import java.util.concurrent.Flow; |
46 import java.util.concurrent.Flow.Subscriber; |
46 import java.util.concurrent.Flow.Subscriber; |
47 import java.util.concurrent.atomic.AtomicInteger; |
47 import java.util.concurrent.atomic.AtomicInteger; |
|
48 import java.util.function.Consumer; |
48 |
49 |
49 /** |
50 /** |
50 * Implements SSL using two SubscriberWrappers. |
51 * Implements SSL using two SubscriberWrappers. |
51 * |
52 * |
52 * <p> Constructor takes two Flow.Subscribers: one that receives the network |
53 * <p> Constructor takes two Flow.Subscribers: one that receives the network |
94 final CompletableFuture<String> alpnCF; // completes on initial handshake |
95 final CompletableFuture<String> alpnCF; // completes on initial handshake |
95 final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; |
96 final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; |
96 volatile boolean close_notify_received; |
97 volatile boolean close_notify_received; |
97 final CompletableFuture<Void> readerCF; |
98 final CompletableFuture<Void> readerCF; |
98 final CompletableFuture<Void> writerCF; |
99 final CompletableFuture<Void> writerCF; |
|
100 final Consumer<ByteBuffer> recycler; |
99 static AtomicInteger scount = new AtomicInteger(1); |
101 static AtomicInteger scount = new AtomicInteger(1); |
100 final int id; |
102 final int id; |
101 |
103 |
102 /** |
104 /** |
103 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each |
105 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each |
107 public SSLFlowDelegate(SSLEngine engine, |
109 public SSLFlowDelegate(SSLEngine engine, |
108 Executor exec, |
110 Executor exec, |
109 Subscriber<? super List<ByteBuffer>> downReader, |
111 Subscriber<? super List<ByteBuffer>> downReader, |
110 Subscriber<? super List<ByteBuffer>> downWriter) |
112 Subscriber<? super List<ByteBuffer>> downWriter) |
111 { |
113 { |
|
114 this(engine, exec, null, downReader, downWriter); |
|
115 } |
|
116 |
|
117 /** |
|
118 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each |
|
119 * Flow.Subscriber requires an associated {@link CompletableFuture} |
|
120 * for errors that need to be signaled from downstream to upstream. |
|
121 */ |
|
122 public SSLFlowDelegate(SSLEngine engine, |
|
123 Executor exec, |
|
124 Consumer<ByteBuffer> recycler, |
|
125 Subscriber<? super List<ByteBuffer>> downReader, |
|
126 Subscriber<? super List<ByteBuffer>> downWriter) |
|
127 { |
112 this.id = scount.getAndIncrement(); |
128 this.id = scount.getAndIncrement(); |
113 this.tubeName = String.valueOf(downWriter); |
129 this.tubeName = String.valueOf(downWriter); |
|
130 this.recycler = recycler; |
114 this.reader = new Reader(); |
131 this.reader = new Reader(); |
115 this.writer = new Writer(); |
132 this.writer = new Writer(); |
116 this.engine = engine; |
133 this.engine = engine; |
117 this.exec = exec; |
134 this.exec = exec; |
118 this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); |
135 this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); |
210 * OK: return generated buffers. |
227 * OK: return generated buffers. |
211 * |
228 * |
212 * Upstream subscription strategy is to try and keep no more than |
229 * Upstream subscription strategy is to try and keep no more than |
213 * TARGET_BUFSIZE bytes in readBuf |
230 * TARGET_BUFSIZE bytes in readBuf |
214 */ |
231 */ |
215 class Reader extends SubscriberWrapper { |
232 class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber { |
216 // Maximum record size is 16k. |
233 // Maximum record size is 16k. |
217 // Because SocketTube can feeds us up to 3 16K buffers, |
234 // Because SocketTube can feeds us up to 3 16K buffers, |
218 // then setting this size to 16K means that the readBuf |
235 // then setting this size to 16K means that the readBuf |
219 // can store up to 64K-1 (16K-1 + 3*16K) |
236 // can store up to 64K-1 (16K-1 + 3*16K) |
220 static final int TARGET_BUFSIZE = 16 * 1024; |
237 static final int TARGET_BUFSIZE = 16 * 1024; |
233 super(); |
250 super(); |
234 scheduler = SequentialScheduler.synchronizedScheduler( |
251 scheduler = SequentialScheduler.synchronizedScheduler( |
235 new ReaderDownstreamPusher()); |
252 new ReaderDownstreamPusher()); |
236 this.readBuf = ByteBuffer.allocate(1024); |
253 this.readBuf = ByteBuffer.allocate(1024); |
237 readBuf.limit(0); // keep in read mode |
254 readBuf.limit(0); // keep in read mode |
|
255 } |
|
256 |
|
257 @Override |
|
258 public boolean supportsRecycling() { |
|
259 return recycler != null; |
238 } |
260 } |
239 |
261 |
240 protected SchedulingAction enterScheduling() { |
262 protected SchedulingAction enterScheduling() { |
241 return enterReadScheduling(); |
263 return enterReadScheduling(); |
242 } |
264 } |