24 */ |
24 */ |
25 |
25 |
26 package jdk.incubator.http.internal.websocket; |
26 package jdk.incubator.http.internal.websocket; |
27 |
27 |
28 import jdk.incubator.http.WebSocket; |
28 import jdk.incubator.http.WebSocket; |
|
29 import jdk.incubator.http.internal.common.Demand; |
29 import jdk.incubator.http.internal.common.Log; |
30 import jdk.incubator.http.internal.common.Log; |
30 import jdk.incubator.http.internal.common.MinimalFuture; |
31 import jdk.incubator.http.internal.common.MinimalFuture; |
31 import jdk.incubator.http.internal.common.Pair; |
32 import jdk.incubator.http.internal.common.Pair; |
32 import jdk.incubator.http.internal.common.SequentialScheduler; |
33 import jdk.incubator.http.internal.common.SequentialScheduler; |
33 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter; |
34 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter; |
50 import java.util.concurrent.CompletionStage; |
51 import java.util.concurrent.CompletionStage; |
51 import java.util.concurrent.ConcurrentLinkedQueue; |
52 import java.util.concurrent.ConcurrentLinkedQueue; |
52 import java.util.concurrent.TimeUnit; |
53 import java.util.concurrent.TimeUnit; |
53 import java.util.concurrent.TimeoutException; |
54 import java.util.concurrent.TimeoutException; |
54 import java.util.concurrent.atomic.AtomicBoolean; |
55 import java.util.concurrent.atomic.AtomicBoolean; |
55 import java.util.concurrent.atomic.AtomicInteger; |
|
56 import java.util.concurrent.atomic.AtomicReference; |
56 import java.util.concurrent.atomic.AtomicReference; |
57 import java.util.function.Consumer; |
57 import java.util.function.Consumer; |
58 import java.util.function.Function; |
58 import java.util.function.Function; |
59 |
59 |
60 import static java.util.Objects.requireNonNull; |
60 import static java.util.Objects.requireNonNull; |
61 import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture; |
61 import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture; |
62 import static jdk.incubator.http.internal.common.Pair.pair; |
62 import static jdk.incubator.http.internal.common.Pair.pair; |
63 import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY; |
63 import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY; |
64 import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE; |
64 import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE; |
65 import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient; |
65 import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient; |
|
66 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.BINARY; |
|
67 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.CLOSE; |
|
68 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.ERROR; |
|
69 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.IDLE; |
|
70 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.OPEN; |
|
71 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.PING; |
|
72 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.PONG; |
|
73 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.TEXT; |
|
74 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.WAITING; |
66 |
75 |
67 /* |
76 /* |
68 * A WebSocket client. |
77 * A WebSocket client. |
69 */ |
78 */ |
70 public final class WebSocketImpl implements WebSocket { |
79 public final class WebSocketImpl implements WebSocket { |
71 |
80 |
72 private static final int IDLE = 0; |
81 enum State { |
73 private static final int OPEN = 1; |
82 OPEN, |
74 private static final int TEXT = 2; |
83 IDLE, |
75 private static final int BINARY = 4; |
84 WAITING, |
76 private static final int PING = 8; |
85 TEXT, |
77 private static final int PONG = 16; |
86 BINARY, |
78 private static final int CLOSE = 32; |
87 PING, |
79 private static final int ERROR = 64; |
88 PONG, |
|
89 CLOSE, |
|
90 ERROR; |
|
91 } |
80 |
92 |
81 private volatile boolean inputClosed; |
93 private volatile boolean inputClosed; |
82 private volatile boolean outputClosed; |
94 private volatile boolean outputClosed; |
83 |
95 |
84 /* Which of the listener's methods to call next? */ |
96 private final AtomicReference<State> state = new AtomicReference<>(OPEN); |
85 private final AtomicInteger state = new AtomicInteger(OPEN); |
|
86 |
97 |
87 /* Components of calls to Listener's methods */ |
98 /* Components of calls to Listener's methods */ |
88 private MessagePart part; |
99 private MessagePart part; |
89 private ByteBuffer binaryData; |
100 private ByteBuffer binaryData; |
90 private CharSequence text; |
101 private CharSequence text; |
102 queue = new ConcurrentLinkedQueue<>(); |
113 queue = new ConcurrentLinkedQueue<>(); |
103 private final Context context = new OutgoingMessage.Context(); |
114 private final Context context = new OutgoingMessage.Context(); |
104 private final Transmitter transmitter; |
115 private final Transmitter transmitter; |
105 private final Receiver receiver; |
116 private final Receiver receiver; |
106 private final SequentialScheduler receiveScheduler = new SequentialScheduler(new ReceiveTask()); |
117 private final SequentialScheduler receiveScheduler = new SequentialScheduler(new ReceiveTask()); |
|
118 private final Demand demand = new Demand(); |
107 |
119 |
108 public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) { |
120 public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) { |
109 Function<Result, WebSocket> newWebSocket = r -> { |
121 Function<Result, WebSocket> newWebSocket = r -> { |
110 WebSocket ws = newInstance(b.getUri(), |
122 WebSocket ws = newInstance(b.getUri(), |
111 r.subprotocol, |
123 r.subprotocol, |
140 } |
152 } |
141 |
153 |
142 private WebSocketImpl(URI uri, |
154 private WebSocketImpl(URI uri, |
143 String subprotocol, |
155 String subprotocol, |
144 Listener listener, |
156 Listener listener, |
145 TransportSupplier transport) |
157 TransportSupplier transport) { |
146 { |
|
147 this.uri = requireNonNull(uri); |
158 this.uri = requireNonNull(uri); |
148 this.subprotocol = requireNonNull(subprotocol); |
159 this.subprotocol = requireNonNull(subprotocol); |
149 this.listener = requireNonNull(listener); |
160 this.listener = requireNonNull(listener); |
150 this.transmitter = transport.transmitter(); |
161 this.transmitter = transport.transmitter(); |
151 this.receiver = transport.receiver(new SignallingMessageConsumer()); |
162 this.receiver = transport.receiver(new SignallingMessageConsumer()); |
217 * fashion in respect to other messages accepted through this method. No |
228 * fashion in respect to other messages accepted through this method. No |
218 * further messages will be accepted until the returned CompletableFuture |
229 * further messages will be accepted until the returned CompletableFuture |
219 * completes. This method is used to enforce "one outstanding send |
230 * completes. This method is used to enforce "one outstanding send |
220 * operation" policy. |
231 * operation" policy. |
221 */ |
232 */ |
222 private CompletableFuture<WebSocket> enqueueExclusively(OutgoingMessage m) |
233 private CompletableFuture<WebSocket> enqueueExclusively(OutgoingMessage m) { |
223 { |
|
224 if (!outstandingSend.compareAndSet(false, true)) { |
234 if (!outstandingSend.compareAndSet(false, true)) { |
225 return failedFuture(new IllegalStateException("Send pending")); |
235 return failedFuture(new IllegalStateException("Send pending")); |
226 } |
236 } |
227 return enqueue(m).whenComplete((r, e) -> outstandingSend.set(false)); |
237 return enqueue(m).whenComplete((r, e) -> outstandingSend.set(false)); |
228 } |
238 } |
336 * - after the state has been observed as CLOSE/ERROR, the scheduler |
346 * - after the state has been observed as CLOSE/ERROR, the scheduler |
337 * is stopped |
347 * is stopped |
338 */ |
348 */ |
339 private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask { |
349 private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask { |
340 |
350 |
|
351 // Receiver only asked here and nowhere else because we must make sure |
|
352 // onOpen is invoked first and no messages become pending before onOpen |
|
353 // finishes |
|
354 |
341 @Override |
355 @Override |
342 public void run() { |
356 public void run() { |
343 final int s = state.getAndSet(IDLE); |
357 while (true) { |
344 try { |
358 State s = state.get(); |
345 switch (s) { |
359 try { |
346 case OPEN: |
360 switch (s) { |
347 processOpen(); |
361 case OPEN: |
348 break; |
362 processOpen(); |
349 case TEXT: |
363 tryChangeState(OPEN, IDLE); |
350 processText(); |
364 break; |
351 break; |
365 case TEXT: |
352 case BINARY: |
366 processText(); |
353 processBinary(); |
367 tryChangeState(TEXT, IDLE); |
354 break; |
368 break; |
355 case PING: |
369 case BINARY: |
356 processPing(); |
370 processBinary(); |
357 break; |
371 tryChangeState(BINARY, IDLE); |
358 case PONG: |
372 break; |
359 processPong(); |
373 case PING: |
360 break; |
374 processPing(); |
361 case CLOSE: |
375 tryChangeState(PING, IDLE); |
362 processClose(); |
376 break; |
363 break; |
377 case PONG: |
364 case ERROR: |
378 processPong(); |
365 processError(); |
379 tryChangeState(PONG, IDLE); |
366 break; |
380 break; |
367 case IDLE: |
381 case CLOSE: |
368 // For debugging spurious signalling: when there was a |
382 processClose(); |
369 // signal, but apparently nothing has changed |
383 return; |
370 break; |
384 case ERROR: |
371 default: |
385 processError(); |
372 throw new InternalError(String.valueOf(s)); |
386 return; |
|
387 case IDLE: |
|
388 if (demand.tryDecrement() |
|
389 && tryChangeState(IDLE, WAITING)) { |
|
390 receiver.request(1); |
|
391 } |
|
392 return; |
|
393 case WAITING: |
|
394 // For debugging spurious signalling: when there was a |
|
395 // signal, but apparently nothing has changed |
|
396 return; |
|
397 default: |
|
398 throw new InternalError(String.valueOf(s)); |
|
399 } |
|
400 } catch (Throwable t) { |
|
401 signalError(t); |
373 } |
402 } |
374 } catch (Throwable t) { |
|
375 signalError(t); |
|
376 } |
403 } |
377 } |
404 } |
378 |
405 |
379 private void processError() throws IOException { |
406 private void processError() throws IOException { |
380 receiver.close(); |
407 receiver.close(); |
487 */ |
514 */ |
488 private void signalClose(int statusCode, String reason) { |
515 private void signalClose(int statusCode, String reason) { |
489 inputClosed = true; |
516 inputClosed = true; |
490 this.statusCode = statusCode; |
517 this.statusCode = statusCode; |
491 this.reason = reason; |
518 this.reason = reason; |
492 if (!tryChangeState(CLOSE)) { |
519 if (!trySetState(CLOSE)) { |
493 Log.logTrace("Close: {0}, ''{1}''", statusCode, reason); |
520 Log.logTrace("Close: {0}, ''{1}''", statusCode, reason); |
494 } else { |
521 } else { |
495 try { |
522 try { |
496 receiver.close(); |
523 receiver.close(); |
497 } catch (Throwable t) { |
524 } catch (Throwable t) { |
505 @Override |
532 @Override |
506 public void onText(CharSequence data, MessagePart part) { |
533 public void onText(CharSequence data, MessagePart part) { |
507 receiver.acknowledge(); |
534 receiver.acknowledge(); |
508 text = data; |
535 text = data; |
509 WebSocketImpl.this.part = part; |
536 WebSocketImpl.this.part = part; |
510 tryChangeState(TEXT); |
537 tryChangeState(WAITING, TEXT); |
511 } |
538 } |
512 |
539 |
513 @Override |
540 @Override |
514 public void onBinary(ByteBuffer data, MessagePart part) { |
541 public void onBinary(ByteBuffer data, MessagePart part) { |
515 receiver.acknowledge(); |
542 receiver.acknowledge(); |
516 binaryData = data; |
543 binaryData = data; |
517 WebSocketImpl.this.part = part; |
544 WebSocketImpl.this.part = part; |
518 tryChangeState(BINARY); |
545 tryChangeState(WAITING, BINARY); |
519 } |
546 } |
520 |
547 |
521 @Override |
548 @Override |
522 public void onPing(ByteBuffer data) { |
549 public void onPing(ByteBuffer data) { |
523 receiver.acknowledge(); |
550 receiver.acknowledge(); |
524 binaryData = data; |
551 binaryData = data; |
525 tryChangeState(PING); |
552 tryChangeState(WAITING, PING); |
526 } |
553 } |
527 |
554 |
528 @Override |
555 @Override |
529 public void onPong(ByteBuffer data) { |
556 public void onPong(ByteBuffer data) { |
530 receiver.acknowledge(); |
557 receiver.acknowledge(); |
531 binaryData = data; |
558 binaryData = data; |
532 tryChangeState(PONG); |
559 tryChangeState(WAITING, PONG); |
533 } |
560 } |
534 |
561 |
535 @Override |
562 @Override |
536 public void onClose(int statusCode, CharSequence reason) { |
563 public void onClose(int statusCode, CharSequence reason) { |
537 receiver.acknowledge(); |
564 receiver.acknowledge(); |
538 signalClose(statusCode, reason.toString()); |
565 signalClose(statusCode, reason.toString()); |
539 } |
566 } |
540 |
567 |
541 @Override |
568 @Override |
542 public void onComplete() { |
569 public void onComplete() { |
|
570 receiver.acknowledge(); |
543 signalClose(CLOSED_ABNORMALLY, ""); |
571 signalClose(CLOSED_ABNORMALLY, ""); |
544 } |
572 } |
545 |
573 |
546 @Override |
574 @Override |
547 public void onError(Throwable error) { |
575 public void onError(Throwable error) { |
548 signalError(error); |
576 signalError(error); |
549 } |
577 } |
550 } |
578 } |
551 |
579 |
552 private boolean tryChangeState(int newState) { |
580 private boolean trySetState(State newState) { |
553 while (true) { |
581 while (true) { |
554 int currentState = state.get(); |
582 State currentState = state.get(); |
555 if (currentState == ERROR || currentState == CLOSE) { |
583 if (currentState == ERROR || currentState == CLOSE) { |
556 return false; |
584 return false; |
557 } else if (state.compareAndSet(currentState, newState)) { |
585 } else if (state.compareAndSet(currentState, newState)) { |
558 receiveScheduler.runOrSchedule(); |
586 receiveScheduler.runOrSchedule(); |
559 return true; |
587 return true; |
560 } |
588 } |
561 } |
589 } |
562 } |
590 } |
|
591 |
|
592 private boolean tryChangeState(State expectedState, State newState) { |
|
593 State witness = state.compareAndExchange(expectedState, newState); |
|
594 if (witness == expectedState) { |
|
595 receiveScheduler.runOrSchedule(); |
|
596 return true; |
|
597 } |
|
598 // This should be the only reason for inability to change the state from |
|
599 // IDLE to WAITING: the state has changed to terminal |
|
600 if (witness != ERROR && witness != CLOSE) { |
|
601 throw new InternalError(); |
|
602 } |
|
603 return false; |
|
604 } |
563 } |
605 } |