103 Subscriber<? super List<ByteBuffer>> downReader, |
103 Subscriber<? super List<ByteBuffer>> downReader, |
104 Subscriber<? super List<ByteBuffer>> downWriter) |
104 Subscriber<? super List<ByteBuffer>> downWriter) |
105 { |
105 { |
106 this.tubeName = String.valueOf(downWriter); |
106 this.tubeName = String.valueOf(downWriter); |
107 this.reader = new Reader(); |
107 this.reader = new Reader(); |
108 this.reader.subscribe(downReader); |
|
109 this.writer = new Writer(); |
108 this.writer = new Writer(); |
110 this.writer.subscribe(downWriter); |
|
111 this.engine = engine; |
109 this.engine = engine; |
112 this.exec = exec; |
110 this.exec = exec; |
113 this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); |
111 this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); |
114 this.cf = CompletableFuture.allOf(reader.completion(), writer.completion()) |
112 this.cf = CompletableFuture.allOf(reader.completion(), writer.completion()) |
115 .thenRun(this::normalStop); |
113 .thenRun(this::normalStop); |
116 this.alpnCF = new MinimalFuture<>(); |
114 this.alpnCF = new MinimalFuture<>(); |
|
115 |
|
116 // connect the Reader to the downReader and the |
|
117 // Writer to the downWriter. |
|
118 connect(downReader, downWriter); |
|
119 |
117 //Monitor.add(this::monitor); |
120 //Monitor.add(this::monitor); |
|
121 } |
|
122 |
|
123 /** |
|
124 * Connects the read sink (downReader) to the SSLFlowDelegate Reader, |
|
125 * and the write sink (downWriter) to the SSLFlowDelegate Writer. |
|
126 * Called from within the constructor. Overwritten by SSLTube. |
|
127 * |
|
128 * @param downReader The left hand side read sink (typically, the |
|
129 * HttpConnection read subscriber). |
|
130 * @param downWriter The right hand side write sink (typically |
|
131 * the SocketTube write subscriber). |
|
132 */ |
|
133 void connect(Subscriber<? super List<ByteBuffer>> downReader, |
|
134 Subscriber<? super List<ByteBuffer>> downWriter) { |
|
135 this.reader.subscribe(downReader); |
|
136 this.writer.subscribe(downWriter); |
118 } |
137 } |
119 |
138 |
120 /** |
139 /** |
121 * Returns a CompletableFuture<String> which completes after |
140 * Returns a CompletableFuture<String> which completes after |
122 * the initial handshake completes, and which contains the negotiated |
141 * the initial handshake completes, and which contains the negotiated |