23 * questions. |
23 * questions. |
24 */ |
24 */ |
25 |
25 |
26 package jdk.incubator.http.internal.websocket; |
26 package jdk.incubator.http.internal.websocket; |
27 |
27 |
28 import jdk.incubator.http.WebSocket; |
|
29 import jdk.incubator.http.internal.common.Log; |
|
30 import jdk.incubator.http.internal.common.Pair; |
|
31 import jdk.incubator.http.internal.websocket.OpeningHandshake.Result; |
|
32 import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary; |
|
33 import jdk.incubator.http.internal.websocket.OutgoingMessage.Close; |
|
34 import jdk.incubator.http.internal.websocket.OutgoingMessage.Context; |
|
35 import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping; |
|
36 import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong; |
|
37 import jdk.incubator.http.internal.websocket.OutgoingMessage.Text; |
|
38 |
|
39 import java.io.IOException; |
28 import java.io.IOException; |
40 import java.net.ProtocolException; |
29 import java.net.ProtocolException; |
41 import java.net.URI; |
30 import java.net.URI; |
42 import java.nio.ByteBuffer; |
31 import java.nio.ByteBuffer; |
43 import java.util.Queue; |
32 import java.util.Queue; |
45 import java.util.concurrent.CompletionStage; |
34 import java.util.concurrent.CompletionStage; |
46 import java.util.concurrent.ConcurrentLinkedQueue; |
35 import java.util.concurrent.ConcurrentLinkedQueue; |
47 import java.util.concurrent.atomic.AtomicBoolean; |
36 import java.util.concurrent.atomic.AtomicBoolean; |
48 import java.util.function.Consumer; |
37 import java.util.function.Consumer; |
49 import java.util.function.Function; |
38 import java.util.function.Function; |
|
39 import jdk.incubator.http.WebSocket; |
|
40 import jdk.incubator.http.internal.common.Log; |
|
41 import jdk.incubator.http.internal.common.Pair; |
|
42 import jdk.incubator.http.internal.common.SequentialScheduler; |
|
43 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter; |
|
44 import jdk.incubator.http.internal.websocket.OpeningHandshake.Result; |
|
45 import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary; |
|
46 import jdk.incubator.http.internal.websocket.OutgoingMessage.Close; |
|
47 import jdk.incubator.http.internal.websocket.OutgoingMessage.Context; |
|
48 import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping; |
|
49 import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong; |
|
50 import jdk.incubator.http.internal.websocket.OutgoingMessage.Text; |
50 |
51 |
51 import static java.util.Objects.requireNonNull; |
52 import static java.util.Objects.requireNonNull; |
52 import static java.util.concurrent.CompletableFuture.failedFuture; |
53 import static java.util.concurrent.CompletableFuture.failedFuture; |
53 import static jdk.incubator.http.internal.common.Pair.pair; |
54 import static jdk.incubator.http.internal.common.Pair.pair; |
|
55 import jdk.incubator.http.internal.common.Utils; |
54 import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY; |
56 import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY; |
55 import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE; |
57 import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE; |
56 import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient; |
58 import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient; |
57 |
59 |
58 /* |
60 /* |
70 * invoked. We keep track of this since only one of these methods is invoked |
72 * invoked. We keep track of this since only one of these methods is invoked |
71 * and it is invoked at most once. |
73 * and it is invoked at most once. |
72 */ |
74 */ |
73 private boolean lastMethodInvoked; |
75 private boolean lastMethodInvoked; |
74 private final AtomicBoolean outstandingSend = new AtomicBoolean(); |
76 private final AtomicBoolean outstandingSend = new AtomicBoolean(); |
75 private final CooperativeHandler sendHandler = |
77 private final SequentialScheduler sendScheduler; |
76 new CooperativeHandler(this::sendFirst); |
|
77 private final Queue<Pair<OutgoingMessage, CompletableFuture<WebSocket>>> |
78 private final Queue<Pair<OutgoingMessage, CompletableFuture<WebSocket>>> |
78 queue = new ConcurrentLinkedQueue<>(); |
79 queue = new ConcurrentLinkedQueue<>(); |
79 private final Context context = new OutgoingMessage.Context(); |
80 private final Context context = new OutgoingMessage.Context(); |
80 private final Transmitter transmitter; |
81 private final Transmitter transmitter; |
81 private final Receiver receiver; |
82 private final Receiver receiver; |
134 this.subprotocol = requireNonNull(subprotocol); |
135 this.subprotocol = requireNonNull(subprotocol); |
135 this.channel = requireNonNull(channel); |
136 this.channel = requireNonNull(channel); |
136 this.listener = requireNonNull(listener); |
137 this.listener = requireNonNull(listener); |
137 this.transmitter = new Transmitter(channel); |
138 this.transmitter = new Transmitter(channel); |
138 this.receiver = new Receiver(messageConsumerOf(listener), channel); |
139 this.receiver = new Receiver(messageConsumerOf(listener), channel); |
|
140 this.sendScheduler = new SequentialScheduler(new SendFirstTask()); |
139 |
141 |
140 // Set up the Closing Handshake action |
142 // Set up the Closing Handshake action |
141 CompletableFuture.allOf(closeReceived, closeSent) |
143 CompletableFuture.allOf(closeReceived, closeSent) |
142 .whenComplete((result, error) -> { |
144 .whenComplete((result, error) -> { |
143 try { |
145 try { |
323 boolean added = queue.add(pair(m, cf)); |
325 boolean added = queue.add(pair(m, cf)); |
324 if (!added) { |
326 if (!added) { |
325 // The queue is supposed to be unbounded |
327 // The queue is supposed to be unbounded |
326 throw new InternalError(); |
328 throw new InternalError(); |
327 } |
329 } |
328 sendHandler.handle(); |
330 sendScheduler.runOrSchedule(); |
329 return cf; |
331 return cf; |
330 } |
332 } |
331 |
333 |
332 /* |
334 /* |
333 * This is the main sending method. It may be run in different threads, |
335 * This is the main sending task. It may be run in different threads, |
334 * but never concurrently. |
336 * but never concurrently. |
335 */ |
337 */ |
336 private void sendFirst(Runnable whenSent) { |
338 private class SendFirstTask implements SequentialScheduler.RestartableTask { |
337 Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll(); |
339 @Override |
338 if (p == null) { |
340 public void run (DeferredCompleter taskCompleter){ |
339 whenSent.run(); |
341 Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll(); |
340 return; |
342 if (p == null) { |
341 } |
343 taskCompleter.complete(); |
342 OutgoingMessage message = p.first; |
344 return; |
343 CompletableFuture<WebSocket> cf = p.second; |
345 } |
344 try { |
346 OutgoingMessage message = p.first; |
345 message.contextualize(context); |
347 CompletableFuture<WebSocket> cf = p.second; |
346 Consumer<Exception> h = e -> { |
348 try { |
347 if (e == null) { |
349 message.contextualize(context); |
348 cf.complete(WebSocketImpl.this); |
350 Consumer<Exception> h = e -> { |
349 } else { |
351 if (e == null) { |
350 cf.completeExceptionally(e); |
352 cf.complete(WebSocketImpl.this); |
351 } |
353 } else { |
352 sendHandler.handle(); |
354 cf.completeExceptionally(e); |
353 whenSent.run(); |
355 } |
354 }; |
356 sendScheduler.runOrSchedule(); |
355 transmitter.send(message, h); |
357 taskCompleter.complete(); |
356 } catch (Exception t) { |
358 }; |
357 cf.completeExceptionally(t); |
359 transmitter.send(message, h); |
|
360 } catch (Exception t) { |
|
361 cf.completeExceptionally(t); |
|
362 } |
358 } |
363 } |
359 } |
364 } |
360 |
365 |
361 @Override |
366 @Override |
362 public void request(long n) { |
367 public void request(long n) { |
434 // Non-exclusive send; |
439 // Non-exclusive send; |
435 CompletableFuture<WebSocket> pongSent = enqueue(new Pong(copy)); |
440 CompletableFuture<WebSocket> pongSent = enqueue(new Pong(copy)); |
436 pongSent.whenComplete( |
441 pongSent.whenComplete( |
437 (r, error) -> { |
442 (r, error) -> { |
438 if (error != null) { |
443 if (error != null) { |
439 WebSocketImpl.this.signalError(error); |
444 WebSocketImpl.this.signalError( |
|
445 Utils.getCompletionCause(error)); |
440 } |
446 } |
441 } |
447 } |
442 ); |
448 ); |
443 synchronized (WebSocketImpl.this.lock) { |
449 synchronized (WebSocketImpl.this.lock) { |
444 try { |
450 try { |
480 Exception ex = (Exception) new ProtocolException().initCause(error); |
486 Exception ex = (Exception) new ProtocolException().initCause(error); |
481 int code = ((FailWebSocketException) error).getStatusCode(); |
487 int code = ((FailWebSocketException) error).getStatusCode(); |
482 enqueueClose(new Close(code, "")) |
488 enqueueClose(new Close(code, "")) |
483 .whenComplete((r, e) -> { |
489 .whenComplete((r, e) -> { |
484 if (e != null) { |
490 if (e != null) { |
485 ex.addSuppressed(e); |
491 ex.addSuppressed(Utils.getCompletionCause(e)); |
486 } |
492 } |
487 try { |
493 try { |
488 channel.close(); |
494 channel.close(); |
489 } catch (IOException e1) { |
495 } catch (IOException e1) { |
490 ex.addSuppressed(e1); |
496 ex.addSuppressed(e1); |