74 * | SSLFlowDelegate | |
74 * | SSLFlowDelegate | |
75 * downReader | | upstreamReader |
75 * downReader | | upstreamReader |
76 * <--------------- | | <-------------- |
76 * <--------------- | | <-------------- |
77 * supplied to constructor | | obtained from this |
77 * supplied to constructor | | obtained from this |
78 * +------------------+ |
78 * +------------------+ |
|
79 * |
|
80 * Errors are reported to the downReader Flow.Subscriber |
|
81 * |
79 * } |
82 * } |
80 * </pre> |
83 * </pre> |
81 */ |
84 */ |
82 public class SSLFlowDelegate { |
85 public class SSLFlowDelegate { |
83 |
86 |
88 final Executor exec; |
91 final Executor exec; |
89 final Reader reader; |
92 final Reader reader; |
90 final Writer writer; |
93 final Writer writer; |
91 final SSLEngine engine; |
94 final SSLEngine engine; |
92 final String tubeName; // hack |
95 final String tubeName; // hack |
93 private final CompletableFuture<Void> cf; |
|
94 final CompletableFuture<String> alpnCF; // completes on initial handshake |
96 final CompletableFuture<String> alpnCF; // completes on initial handshake |
95 final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; |
97 final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; |
96 volatile boolean close_notify_received; |
98 volatile boolean close_notify_received; |
|
99 volatile Flow.Subscriber<?> downReader; |
|
100 static AtomicInteger scount = new AtomicInteger(1); |
|
101 final int id; |
97 |
102 |
98 /** |
103 /** |
99 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each |
104 * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each |
100 * Flow.Subscriber requires an associated {@link CompletableFuture} |
105 * Flow.Subscriber requires an associated {@link CompletableFuture} |
101 * for errors that need to be signaled from downstream to upstream. |
106 * for errors that need to be signaled from downstream to upstream. |
103 public SSLFlowDelegate(SSLEngine engine, |
108 public SSLFlowDelegate(SSLEngine engine, |
104 Executor exec, |
109 Executor exec, |
105 Subscriber<? super List<ByteBuffer>> downReader, |
110 Subscriber<? super List<ByteBuffer>> downReader, |
106 Subscriber<? super List<ByteBuffer>> downWriter) |
111 Subscriber<? super List<ByteBuffer>> downWriter) |
107 { |
112 { |
|
113 this.id = scount.getAndIncrement(); |
108 this.tubeName = String.valueOf(downWriter); |
114 this.tubeName = String.valueOf(downWriter); |
109 this.reader = new Reader(); |
115 this.reader = new Reader(); |
110 this.writer = new Writer(); |
116 this.writer = new Writer(); |
111 this.engine = engine; |
117 this.engine = engine; |
112 this.exec = exec; |
118 this.exec = exec; |
113 this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); |
119 this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); |
114 CompletableFuture<Void> cs = CompletableFuture.allOf( |
120 CompletableFuture.anyOf(reader.completion(), writer.completion()) |
115 reader.completion(), writer.completion()).thenRun(this::normalStop); |
121 .exceptionally(this::stopOnError); |
116 this.cf = MinimalFuture.of(cs); |
122 |
|
123 CompletableFuture.allOf(reader.completion(), writer.completion()) |
|
124 .thenRun(this::normalStop); |
117 this.alpnCF = new MinimalFuture<>(); |
125 this.alpnCF = new MinimalFuture<>(); |
|
126 this.downReader = downReader; |
118 |
127 |
119 // connect the Reader to the downReader and the |
128 // connect the Reader to the downReader and the |
120 // Writer to the downWriter. |
129 // Writer to the downWriter. |
121 connect(downReader, downWriter); |
130 connect(downReader, downWriter); |
122 |
131 |
123 //Monitor.add(this::monitor); |
132 Monitor.add(this::monitor); |
124 } |
133 } |
125 |
134 |
126 /** |
135 /** |
127 * Returns true if the SSLFlowDelegate has detected a TLS |
136 * Returns true if the SSLFlowDelegate has detected a TLS |
128 * close_notify from the server. |
137 * close_notify from the server. |
142 * @param downWriter The right hand side write sink (typically |
151 * @param downWriter The right hand side write sink (typically |
143 * the SocketTube write subscriber). |
152 * the SocketTube write subscriber). |
144 */ |
153 */ |
145 void connect(Subscriber<? super List<ByteBuffer>> downReader, |
154 void connect(Subscriber<? super List<ByteBuffer>> downReader, |
146 Subscriber<? super List<ByteBuffer>> downWriter) { |
155 Subscriber<? super List<ByteBuffer>> downWriter) { |
|
156 this.downReader = downReader; |
147 this.reader.subscribe(downReader); |
157 this.reader.subscribe(downReader); |
148 this.writer.subscribe(downWriter); |
158 this.writer.subscribe(downWriter); |
149 } |
159 } |
150 |
160 |
151 /** |
161 /** |
166 alpnCF.complete(alpn); |
176 alpnCF.complete(alpn); |
167 } |
177 } |
168 |
178 |
169 public String monitor() { |
179 public String monitor() { |
170 StringBuilder sb = new StringBuilder(); |
180 StringBuilder sb = new StringBuilder(); |
171 sb.append("SSL: HS state: " + states(handshakeState)); |
181 sb.append("SSL: id ").append(id); |
|
182 sb.append(" HS state: " + states(handshakeState)); |
172 sb.append(" Engine state: " + engine.getHandshakeStatus().toString()); |
183 sb.append(" Engine state: " + engine.getHandshakeStatus().toString()); |
173 sb.append(" LL : "); |
184 sb.append(" LL : "); |
174 for (String s: stateList) { |
185 for (String s: stateList) { |
175 sb.append(s).append(" "); |
186 sb.append(s).append(" "); |
176 } |
187 } |
604 } |
605 } |
605 } |
606 } |
606 |
607 |
607 private void handleError(Throwable t) { |
608 private void handleError(Throwable t) { |
608 debug.log(Level.DEBUG, "handleError", t); |
609 debug.log(Level.DEBUG, "handleError", t); |
609 cf.completeExceptionally(t); |
610 downReader.onError(t); |
610 // no-op if already completed |
611 // no-op if already completed |
611 alpnCF.completeExceptionally(t); |
612 alpnCF.completeExceptionally(t); |
612 reader.stop(); |
613 reader.stop(); |
613 writer.stop(); |
614 writer.stop(); |
614 } |
615 } |
615 |
616 |
616 private void normalStop() { |
617 private void normalStop() { |
|
618 stopOnError(null); |
|
619 } |
|
620 |
|
621 private Void stopOnError(Throwable t) { |
617 reader.stop(); |
622 reader.stop(); |
618 writer.stop(); |
623 writer.stop(); |
|
624 return null; |
619 } |
625 } |
620 |
626 |
621 private void cleanList(List<ByteBuffer> l) { |
627 private void cleanList(List<ByteBuffer> l) { |
622 synchronized (l) { |
628 synchronized (l) { |
623 Iterator<ByteBuffer> iter = l.iterator(); |
629 Iterator<ByteBuffer> iter = l.iterator(); |