1 /* |
|
2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. Oracle designates this |
|
8 * particular file as subject to the "Classpath" exception as provided |
|
9 * by Oracle in the LICENSE file that accompanied this code. |
|
10 * |
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 * version 2 for more details (a copy is included in the LICENSE file that |
|
15 * accompanied this code). |
|
16 * |
|
17 * You should have received a copy of the GNU General Public License version |
|
18 * 2 along with this work; if not, write to the Free Software Foundation, |
|
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 * |
|
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
22 * or visit www.oracle.com if you need additional information or have any |
|
23 * questions. |
|
24 */ |
|
25 |
|
26 package jdk.incubator.http.internal.websocket; |
|
27 |
|
28 import jdk.incubator.http.WebSocket; |
|
29 import jdk.incubator.http.internal.common.Demand; |
|
30 import jdk.incubator.http.internal.common.Log; |
|
31 import jdk.incubator.http.internal.common.MinimalFuture; |
|
32 import jdk.incubator.http.internal.common.SequentialScheduler; |
|
33 import jdk.incubator.http.internal.common.Utils; |
|
34 import jdk.incubator.http.internal.websocket.OpeningHandshake.Result; |
|
35 |
|
36 import java.io.IOException; |
|
37 import java.lang.ref.Reference; |
|
38 import java.net.ProtocolException; |
|
39 import java.net.URI; |
|
40 import java.nio.ByteBuffer; |
|
41 import java.util.Objects; |
|
42 import java.util.concurrent.CompletableFuture; |
|
43 import java.util.concurrent.CompletionStage; |
|
44 import java.util.concurrent.TimeoutException; |
|
45 import java.util.concurrent.atomic.AtomicBoolean; |
|
46 import java.util.concurrent.atomic.AtomicReference; |
|
47 import java.util.function.Function; |
|
48 |
|
49 import static java.util.Objects.requireNonNull; |
|
50 import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture; |
|
51 import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY; |
|
52 import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE; |
|
53 import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient; |
|
54 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.BINARY; |
|
55 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.CLOSE; |
|
56 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.ERROR; |
|
57 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.IDLE; |
|
58 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.OPEN; |
|
59 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.PING; |
|
60 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.PONG; |
|
61 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.TEXT; |
|
62 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.WAITING; |
|
63 |
|
64 /* |
|
65 * A WebSocket client. |
|
66 */ |
|
67 public final class WebSocketImpl implements WebSocket { |
|
68 |
|
69 enum State { |
|
70 OPEN, |
|
71 IDLE, |
|
72 WAITING, |
|
73 TEXT, |
|
74 BINARY, |
|
75 PING, |
|
76 PONG, |
|
77 CLOSE, |
|
78 ERROR; |
|
79 } |
|
80 |
|
81 private volatile boolean inputClosed; |
|
82 private volatile boolean outputClosed; |
|
83 |
|
84 private final AtomicReference<State> state = new AtomicReference<>(OPEN); |
|
85 |
|
86 /* Components of calls to Listener's methods */ |
|
87 private MessagePart part; |
|
88 private ByteBuffer binaryData; |
|
89 private CharSequence text; |
|
90 private int statusCode; |
|
91 private String reason; |
|
92 private final AtomicReference<Throwable> error = new AtomicReference<>(); |
|
93 |
|
94 private final URI uri; |
|
95 private final String subprotocol; |
|
96 private final Listener listener; |
|
97 |
|
98 private final AtomicBoolean outstandingSend = new AtomicBoolean(); |
|
99 private final Transport<WebSocket> transport; |
|
100 private final SequentialScheduler receiveScheduler = new SequentialScheduler(new ReceiveTask()); |
|
101 private final Demand demand = new Demand(); |
|
102 |
|
103 public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) { |
|
104 Function<Result, WebSocket> newWebSocket = r -> { |
|
105 WebSocket ws = newInstance(b.getUri(), |
|
106 r.subprotocol, |
|
107 b.getListener(), |
|
108 r.transport); |
|
109 // Make sure we don't release the builder until this lambda |
|
110 // has been executed. The builder has a strong reference to |
|
111 // the HttpClientFacade, and we want to keep that live until |
|
112 // after the raw channel is created and passed to WebSocketImpl. |
|
113 Reference.reachabilityFence(b); |
|
114 return ws; |
|
115 }; |
|
116 OpeningHandshake h; |
|
117 try { |
|
118 h = new OpeningHandshake(b); |
|
119 } catch (Throwable e) { |
|
120 return failedFuture(e); |
|
121 } |
|
122 return h.send().thenApply(newWebSocket); |
|
123 } |
|
124 |
|
125 /* Exposed for testing purposes */ |
|
126 static WebSocketImpl newInstance(URI uri, |
|
127 String subprotocol, |
|
128 Listener listener, |
|
129 TransportFactory transport) { |
|
130 WebSocketImpl ws = new WebSocketImpl(uri, subprotocol, listener, transport); |
|
131 // This initialisation is outside of the constructor for the sake of |
|
132 // safe publication of WebSocketImpl.this |
|
133 ws.signalOpen(); |
|
134 return ws; |
|
135 } |
|
136 |
|
137 private WebSocketImpl(URI uri, |
|
138 String subprotocol, |
|
139 Listener listener, |
|
140 TransportFactory transportFactory) { |
|
141 this.uri = requireNonNull(uri); |
|
142 this.subprotocol = requireNonNull(subprotocol); |
|
143 this.listener = requireNonNull(listener); |
|
144 this.transport = transportFactory.createTransport( |
|
145 () -> WebSocketImpl.this, // What about escape of WebSocketImpl.this? |
|
146 new SignallingMessageConsumer()); |
|
147 } |
|
148 |
|
149 @Override |
|
150 public CompletableFuture<WebSocket> sendText(CharSequence message, |
|
151 boolean isLast) { |
|
152 Objects.requireNonNull(message); |
|
153 if (!outstandingSend.compareAndSet(false, true)) { |
|
154 return failedFuture(new IllegalStateException("Send pending")); |
|
155 } |
|
156 CompletableFuture<WebSocket> cf = transport.sendText(message, isLast); |
|
157 return cf.whenComplete((r, e) -> outstandingSend.set(false)); |
|
158 } |
|
159 |
|
160 @Override |
|
161 public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, |
|
162 boolean isLast) { |
|
163 Objects.requireNonNull(message); |
|
164 if (!outstandingSend.compareAndSet(false, true)) { |
|
165 return failedFuture(new IllegalStateException("Send pending")); |
|
166 } |
|
167 CompletableFuture<WebSocket> cf = transport.sendBinary(message, isLast); |
|
168 // Optimize? |
|
169 // if (cf.isDone()) { |
|
170 // outstandingSend.set(false); |
|
171 // } else { |
|
172 // cf.whenComplete((r, e) -> outstandingSend.set(false)); |
|
173 // } |
|
174 return cf.whenComplete((r, e) -> outstandingSend.set(false)); |
|
175 } |
|
176 |
|
177 @Override |
|
178 public CompletableFuture<WebSocket> sendPing(ByteBuffer message) { |
|
179 return transport.sendPing(message); |
|
180 } |
|
181 |
|
182 @Override |
|
183 public CompletableFuture<WebSocket> sendPong(ByteBuffer message) { |
|
184 return transport.sendPong(message); |
|
185 } |
|
186 |
|
187 @Override |
|
188 public CompletableFuture<WebSocket> sendClose(int statusCode, String reason) { |
|
189 Objects.requireNonNull(reason); |
|
190 if (!isLegalToSendFromClient(statusCode)) { |
|
191 return failedFuture(new IllegalArgumentException("statusCode")); |
|
192 } |
|
193 return sendClose0(statusCode, reason); |
|
194 } |
|
195 |
|
196 /* |
|
197 * Sends a Close message, then shuts down the output since no more |
|
198 * messages are expected to be sent after this. |
|
199 */ |
|
200 private CompletableFuture<WebSocket> sendClose0(int statusCode, String reason ) { |
|
201 outputClosed = true; |
|
202 return transport.sendClose(statusCode, reason) |
|
203 .whenComplete((result, error) -> { |
|
204 try { |
|
205 transport.closeOutput(); |
|
206 } catch (IOException e) { |
|
207 Log.logError(e); |
|
208 } |
|
209 Throwable cause = Utils.getCompletionCause(error); |
|
210 if (cause instanceof TimeoutException) { |
|
211 try { |
|
212 transport.closeInput(); |
|
213 } catch (IOException e) { |
|
214 Log.logError(e); |
|
215 } |
|
216 } |
|
217 }); |
|
218 } |
|
219 |
|
220 @Override |
|
221 public void request(long n) { |
|
222 if (demand.increase(n)) { |
|
223 receiveScheduler.runOrSchedule(); |
|
224 } |
|
225 } |
|
226 |
|
227 @Override |
|
228 public String getSubprotocol() { |
|
229 return subprotocol; |
|
230 } |
|
231 |
|
232 @Override |
|
233 public boolean isOutputClosed() { |
|
234 return outputClosed; |
|
235 } |
|
236 |
|
237 @Override |
|
238 public boolean isInputClosed() { |
|
239 return inputClosed; |
|
240 } |
|
241 |
|
242 @Override |
|
243 public void abort() { |
|
244 inputClosed = true; |
|
245 outputClosed = true; |
|
246 receiveScheduler.stop(); |
|
247 close(); |
|
248 } |
|
249 |
|
250 @Override |
|
251 public String toString() { |
|
252 return super.toString() |
|
253 + "[uri=" + uri |
|
254 + (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "") |
|
255 + "]"; |
|
256 } |
|
257 |
|
258 /* |
|
259 * The assumptions about order is as follows: |
|
260 * |
|
261 * - state is never changed more than twice inside the `run` method: |
|
262 * x --(1)--> IDLE --(2)--> y (otherwise we're loosing events, or |
|
263 * overwriting parts of messages creating a mess since there's no |
|
264 * queueing) |
|
265 * - OPEN is always the first state |
|
266 * - no messages are requested/delivered before onOpen is called (this |
|
267 * is implemented by making WebSocket instance accessible first in |
|
268 * onOpen) |
|
269 * - after the state has been observed as CLOSE/ERROR, the scheduler |
|
270 * is stopped |
|
271 */ |
|
272 private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask { |
|
273 |
|
274 // Transport only asked here and nowhere else because we must make sure |
|
275 // onOpen is invoked first and no messages become pending before onOpen |
|
276 // finishes |
|
277 |
|
278 @Override |
|
279 public void run() { |
|
280 while (true) { |
|
281 State s = state.get(); |
|
282 try { |
|
283 switch (s) { |
|
284 case OPEN: |
|
285 processOpen(); |
|
286 tryChangeState(OPEN, IDLE); |
|
287 break; |
|
288 case TEXT: |
|
289 processText(); |
|
290 tryChangeState(TEXT, IDLE); |
|
291 break; |
|
292 case BINARY: |
|
293 processBinary(); |
|
294 tryChangeState(BINARY, IDLE); |
|
295 break; |
|
296 case PING: |
|
297 processPing(); |
|
298 tryChangeState(PING, IDLE); |
|
299 break; |
|
300 case PONG: |
|
301 processPong(); |
|
302 tryChangeState(PONG, IDLE); |
|
303 break; |
|
304 case CLOSE: |
|
305 processClose(); |
|
306 return; |
|
307 case ERROR: |
|
308 processError(); |
|
309 return; |
|
310 case IDLE: |
|
311 if (demand.tryDecrement() |
|
312 && tryChangeState(IDLE, WAITING)) { |
|
313 transport.request(1); |
|
314 } |
|
315 return; |
|
316 case WAITING: |
|
317 // For debugging spurious signalling: when there was a |
|
318 // signal, but apparently nothing has changed |
|
319 return; |
|
320 default: |
|
321 throw new InternalError(String.valueOf(s)); |
|
322 } |
|
323 } catch (Throwable t) { |
|
324 signalError(t); |
|
325 } |
|
326 } |
|
327 } |
|
328 |
|
329 private void processError() throws IOException { |
|
330 transport.closeInput(); |
|
331 receiveScheduler.stop(); |
|
332 Throwable err = error.get(); |
|
333 if (err instanceof FailWebSocketException) { |
|
334 int code1 = ((FailWebSocketException) err).getStatusCode(); |
|
335 err = new ProtocolException().initCause(err); |
|
336 sendClose0(code1, "") |
|
337 .whenComplete( |
|
338 (r, e) -> { |
|
339 if (e != null) { |
|
340 Log.logError(e); |
|
341 } |
|
342 }); |
|
343 } |
|
344 listener.onError(WebSocketImpl.this, err); |
|
345 } |
|
346 |
|
347 private void processClose() throws IOException { |
|
348 transport.closeInput(); |
|
349 receiveScheduler.stop(); |
|
350 CompletionStage<?> readyToClose; |
|
351 readyToClose = listener.onClose(WebSocketImpl.this, statusCode, reason); |
|
352 if (readyToClose == null) { |
|
353 readyToClose = MinimalFuture.completedFuture(null); |
|
354 } |
|
355 int code; |
|
356 if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) { |
|
357 code = NORMAL_CLOSURE; |
|
358 } else { |
|
359 code = statusCode; |
|
360 } |
|
361 readyToClose.whenComplete((r, e) -> { |
|
362 sendClose0(code, "") |
|
363 .whenComplete((r1, e1) -> { |
|
364 if (e1 != null) { |
|
365 Log.logError(e1); |
|
366 } |
|
367 }); |
|
368 }); |
|
369 } |
|
370 |
|
371 private void processPong() { |
|
372 listener.onPong(WebSocketImpl.this, binaryData); |
|
373 } |
|
374 |
|
375 private void processPing() { |
|
376 // Let's make a full copy of this tiny data. What we want here |
|
377 // is to rule out a possibility the shared data we send might be |
|
378 // corrupted by processing in the listener. |
|
379 ByteBuffer slice = binaryData.slice(); |
|
380 ByteBuffer copy = ByteBuffer.allocate(binaryData.remaining()) |
|
381 .put(binaryData) |
|
382 .flip(); |
|
383 // Non-exclusive send; |
|
384 CompletableFuture<WebSocket> pongSent = transport.sendPong(copy); |
|
385 pongSent.whenComplete( |
|
386 (r, e) -> { |
|
387 if (e != null) { |
|
388 signalError(Utils.getCompletionCause(e)); |
|
389 } |
|
390 } |
|
391 ); |
|
392 listener.onPing(WebSocketImpl.this, slice); |
|
393 } |
|
394 |
|
395 private void processBinary() { |
|
396 listener.onBinary(WebSocketImpl.this, binaryData, part); |
|
397 } |
|
398 |
|
399 private void processText() { |
|
400 listener.onText(WebSocketImpl.this, text, part); |
|
401 } |
|
402 |
|
403 private void processOpen() { |
|
404 listener.onOpen(WebSocketImpl.this); |
|
405 } |
|
406 } |
|
407 |
|
408 private void signalOpen() { |
|
409 receiveScheduler.runOrSchedule(); |
|
410 } |
|
411 |
|
412 private void signalError(Throwable error) { |
|
413 inputClosed = true; |
|
414 outputClosed = true; |
|
415 if (!this.error.compareAndSet(null, error) || !trySetState(ERROR)) { |
|
416 Log.logError(error); |
|
417 } else { |
|
418 close(); |
|
419 } |
|
420 } |
|
421 |
|
422 private void close() { |
|
423 try { |
|
424 try { |
|
425 transport.closeInput(); |
|
426 } finally { |
|
427 transport.closeOutput(); |
|
428 } |
|
429 } catch (Throwable t) { |
|
430 Log.logError(t); |
|
431 } |
|
432 } |
|
433 |
|
434 /* |
|
435 * Signals a Close event (might not correspond to anything happened on the |
|
436 * channel, i.e. might be synthetic). |
|
437 */ |
|
438 private void signalClose(int statusCode, String reason) { |
|
439 inputClosed = true; |
|
440 this.statusCode = statusCode; |
|
441 this.reason = reason; |
|
442 if (!trySetState(CLOSE)) { |
|
443 Log.logTrace("Close: {0}, ''{1}''", statusCode, reason); |
|
444 } else { |
|
445 try { |
|
446 transport.closeInput(); |
|
447 } catch (Throwable t) { |
|
448 Log.logError(t); |
|
449 } |
|
450 } |
|
451 } |
|
452 |
|
453 private class SignallingMessageConsumer implements MessageStreamConsumer { |
|
454 |
|
455 @Override |
|
456 public void onText(CharSequence data, MessagePart part) { |
|
457 transport.acknowledgeReception(); |
|
458 text = data; |
|
459 WebSocketImpl.this.part = part; |
|
460 tryChangeState(WAITING, TEXT); |
|
461 } |
|
462 |
|
463 @Override |
|
464 public void onBinary(ByteBuffer data, MessagePart part) { |
|
465 transport.acknowledgeReception(); |
|
466 binaryData = data; |
|
467 WebSocketImpl.this.part = part; |
|
468 tryChangeState(WAITING, BINARY); |
|
469 } |
|
470 |
|
471 @Override |
|
472 public void onPing(ByteBuffer data) { |
|
473 transport.acknowledgeReception(); |
|
474 binaryData = data; |
|
475 tryChangeState(WAITING, PING); |
|
476 } |
|
477 |
|
478 @Override |
|
479 public void onPong(ByteBuffer data) { |
|
480 transport.acknowledgeReception(); |
|
481 binaryData = data; |
|
482 tryChangeState(WAITING, PONG); |
|
483 } |
|
484 |
|
485 @Override |
|
486 public void onClose(int statusCode, CharSequence reason) { |
|
487 transport.acknowledgeReception(); |
|
488 signalClose(statusCode, reason.toString()); |
|
489 } |
|
490 |
|
491 @Override |
|
492 public void onComplete() { |
|
493 transport.acknowledgeReception(); |
|
494 signalClose(CLOSED_ABNORMALLY, ""); |
|
495 } |
|
496 |
|
497 @Override |
|
498 public void onError(Throwable error) { |
|
499 signalError(error); |
|
500 } |
|
501 } |
|
502 |
|
503 private boolean trySetState(State newState) { |
|
504 while (true) { |
|
505 State currentState = state.get(); |
|
506 if (currentState == ERROR || currentState == CLOSE) { |
|
507 return false; |
|
508 } else if (state.compareAndSet(currentState, newState)) { |
|
509 receiveScheduler.runOrSchedule(); |
|
510 return true; |
|
511 } |
|
512 } |
|
513 } |
|
514 |
|
515 private boolean tryChangeState(State expectedState, State newState) { |
|
516 State witness = state.compareAndExchange(expectedState, newState); |
|
517 if (witness == expectedState) { |
|
518 receiveScheduler.runOrSchedule(); |
|
519 return true; |
|
520 } |
|
521 // This should be the only reason for inability to change the state from |
|
522 // IDLE to WAITING: the state has changed to terminal |
|
523 if (witness != ERROR && witness != CLOSE) { |
|
524 throw new InternalError(); |
|
525 } |
|
526 return false; |
|
527 } |
|
528 |
|
529 /* Exposed for testing purposes */ |
|
530 protected final Transport<WebSocket> transport() { |
|
531 return transport; |
|
532 } |
|
533 } |
|