171 @Override |
171 @Override |
172 public CompletableFuture<WebSocket> sendText(CharSequence message, |
172 public CompletableFuture<WebSocket> sendText(CharSequence message, |
173 boolean last) { |
173 boolean last) { |
174 Objects.requireNonNull(message); |
174 Objects.requireNonNull(message); |
175 long id = 0; |
175 long id = 0; |
176 if (debug.isLoggable(Level.DEBUG)) { |
176 if (debug.on()) { |
177 id = sendCounter.incrementAndGet(); |
177 id = sendCounter.incrementAndGet(); |
178 debug.log(Level.DEBUG, "enter send text %s payload length=%s last=%s", |
178 debug.log("enter send text %s payload length=%s last=%s", |
179 id, message.length(), last); |
179 id, message.length(), last); |
180 } |
180 } |
181 CompletableFuture<WebSocket> result; |
181 CompletableFuture<WebSocket> result; |
182 if (!setPendingTextOrBinary()) { |
182 if (!setPendingTextOrBinary()) { |
183 result = failedFuture(new IllegalStateException("Send pending")); |
183 result = failedFuture(new IllegalStateException("Send pending")); |
184 } else { |
184 } else { |
185 result = transport.sendText(message, last, this, |
185 result = transport.sendText(message, last, this, |
186 (r, e) -> clearPendingTextOrBinary()); |
186 (r, e) -> clearPendingTextOrBinary()); |
187 } |
187 } |
188 if (debug.isLoggable(Level.DEBUG)) { |
188 if (debug.on()) { |
189 debug.log(Level.DEBUG, "exit send text %s returned %s", id, result); |
189 debug.log("exit send text %s returned %s", id, result); |
190 } |
190 } |
191 |
191 |
192 return replaceNull(result); |
192 return replaceNull(result); |
193 } |
193 } |
194 |
194 |
195 @Override |
195 @Override |
196 public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, |
196 public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, |
197 boolean last) { |
197 boolean last) { |
198 Objects.requireNonNull(message); |
198 Objects.requireNonNull(message); |
199 long id = 0; |
199 long id = 0; |
200 if (debug.isLoggable(Level.DEBUG)) { |
200 if (debug.on()) { |
201 id = sendCounter.incrementAndGet(); |
201 id = sendCounter.incrementAndGet(); |
202 debug.log(Level.DEBUG, "enter send binary %s payload=%s last=%s", |
202 debug.log("enter send binary %s payload=%s last=%s", |
203 id, message, last); |
203 id, message, last); |
204 } |
204 } |
205 CompletableFuture<WebSocket> result; |
205 CompletableFuture<WebSocket> result; |
206 if (!setPendingTextOrBinary()) { |
206 if (!setPendingTextOrBinary()) { |
207 result = failedFuture(new IllegalStateException("Send pending")); |
207 result = failedFuture(new IllegalStateException("Send pending")); |
208 } else { |
208 } else { |
209 result = transport.sendBinary(message, last, this, |
209 result = transport.sendBinary(message, last, this, |
210 (r, e) -> clearPendingTextOrBinary()); |
210 (r, e) -> clearPendingTextOrBinary()); |
211 } |
211 } |
212 if (debug.isLoggable(Level.DEBUG)) { |
212 if (debug.on()) { |
213 debug.log(Level.DEBUG, "exit send binary %s returned %s", id, result); |
213 debug.log("exit send binary %s returned %s", id, result); |
214 } |
214 } |
215 return replaceNull(result); |
215 return replaceNull(result); |
216 } |
216 } |
217 |
217 |
218 private void clearPendingTextOrBinary() { |
218 private void clearPendingTextOrBinary() { |
235 |
235 |
236 @Override |
236 @Override |
237 public CompletableFuture<WebSocket> sendPing(ByteBuffer message) { |
237 public CompletableFuture<WebSocket> sendPing(ByteBuffer message) { |
238 Objects.requireNonNull(message); |
238 Objects.requireNonNull(message); |
239 long id = 0; |
239 long id = 0; |
240 if (debug.isLoggable(Level.DEBUG)) { |
240 if (debug.on()) { |
241 id = sendCounter.incrementAndGet(); |
241 id = sendCounter.incrementAndGet(); |
242 debug.log(Level.DEBUG, "enter send ping %s payload=%s", id, message); |
242 debug.log("enter send ping %s payload=%s", id, message); |
243 } |
243 } |
244 CompletableFuture<WebSocket> result; |
244 CompletableFuture<WebSocket> result; |
245 if (!setPendingPingOrPong()) { |
245 if (!setPendingPingOrPong()) { |
246 result = failedFuture(new IllegalStateException("Send pending")); |
246 result = failedFuture(new IllegalStateException("Send pending")); |
247 } else { |
247 } else { |
248 result = transport.sendPing(message, this, |
248 result = transport.sendPing(message, this, |
249 (r, e) -> clearPendingPingOrPong()); |
249 (r, e) -> clearPendingPingOrPong()); |
250 } |
250 } |
251 if (debug.isLoggable(Level.DEBUG)) { |
251 if (debug.on()) { |
252 debug.log(Level.DEBUG, "exit send ping %s returned %s", id, result); |
252 debug.log("exit send ping %s returned %s", id, result); |
253 } |
253 } |
254 return replaceNull(result); |
254 return replaceNull(result); |
255 } |
255 } |
256 |
256 |
257 @Override |
257 @Override |
258 public CompletableFuture<WebSocket> sendPong(ByteBuffer message) { |
258 public CompletableFuture<WebSocket> sendPong(ByteBuffer message) { |
259 Objects.requireNonNull(message); |
259 Objects.requireNonNull(message); |
260 long id = 0; |
260 long id = 0; |
261 if (debug.isLoggable(Level.DEBUG)) { |
261 if (debug.on()) { |
262 id = sendCounter.incrementAndGet(); |
262 id = sendCounter.incrementAndGet(); |
263 debug.log(Level.DEBUG, "enter send pong %s payload=%s", id, message); |
263 debug.log("enter send pong %s payload=%s", id, message); |
264 } |
264 } |
265 CompletableFuture<WebSocket> result; |
265 CompletableFuture<WebSocket> result; |
266 if (!setPendingPingOrPong()) { |
266 if (!setPendingPingOrPong()) { |
267 result = failedFuture(new IllegalStateException("Send pending")); |
267 result = failedFuture(new IllegalStateException("Send pending")); |
268 } else { |
268 } else { |
269 result = transport.sendPong(message, this, |
269 result = transport.sendPong(message, this, |
270 (r, e) -> clearPendingPingOrPong()); |
270 (r, e) -> clearPendingPingOrPong()); |
271 } |
271 } |
272 if (debug.isLoggable(Level.DEBUG)) { |
272 if (debug.on()) { |
273 debug.log(Level.DEBUG, "exit send pong %s returned %s", id, result); |
273 debug.log("exit send pong %s returned %s", id, result); |
274 } |
274 } |
275 return replaceNull(result); |
275 return replaceNull(result); |
276 } |
276 } |
277 |
277 |
278 private boolean setPendingPingOrPong() { |
278 private boolean setPendingPingOrPong() { |
340 (r, e) -> processCloseError(e)); |
339 (r, e) -> processCloseError(e)); |
341 } |
340 } |
342 |
341 |
343 private void processCloseError(Throwable e) { |
342 private void processCloseError(Throwable e) { |
344 if (e == null) { |
343 if (e == null) { |
345 debug.log(Level.DEBUG, "send close completed successfully"); |
344 debug.log("send close completed successfully"); |
346 } else { |
345 } else { |
347 debug.log(Level.DEBUG, "send close completed with error", e); |
346 debug.log("send close completed with error", e); |
348 } |
347 } |
349 outputClosed.set(true); |
348 outputClosed.set(true); |
350 try { |
349 try { |
351 transport.closeOutput(); |
350 transport.closeOutput(); |
352 } catch (IOException ignored) { } |
351 } catch (IOException ignored) { } |
353 } |
352 } |
354 |
353 |
355 @Override |
354 @Override |
356 public void request(long n) { |
355 public void request(long n) { |
357 if (debug.isLoggable(Level.DEBUG)) { |
356 if (debug.on()) { |
358 debug.log(Level.DEBUG, "request %s", n); |
357 debug.log("request %s", n); |
359 } |
358 } |
360 if (demand.increase(n)) { |
359 if (demand.increase(n)) { |
361 receiveScheduler.runOrSchedule(); |
360 receiveScheduler.runOrSchedule(); |
362 } |
361 } |
363 } |
362 } |
470 } |
469 } |
471 } catch (Throwable t) { |
470 } catch (Throwable t) { |
472 signalError(t); |
471 signalError(t); |
473 } |
472 } |
474 } |
473 } |
475 if (debug.isLoggable(Level.DEBUG)) { |
474 if (debug.on()) { |
476 debug.log(Level.DEBUG, "exit receive task"); |
475 debug.log("exit receive task"); |
477 } |
476 } |
478 } |
477 } |
479 |
478 |
480 private void processError() throws IOException { |
479 private void processError() throws IOException { |
481 if (debug.isLoggable(Level.DEBUG)) { |
480 if (debug.on()) { |
482 debug.log(Level.DEBUG, "processError"); |
481 debug.log("processError"); |
483 } |
482 } |
484 transport.closeInput(); |
483 transport.closeInput(); |
485 receiveScheduler.stop(); |
484 receiveScheduler.stop(); |
486 Throwable err = error.get(); |
485 Throwable err = error.get(); |
487 if (err instanceof FailWebSocketException) { |
486 if (err instanceof FailWebSocketException) { |
488 int code1 = ((FailWebSocketException) err).getStatusCode(); |
487 int code1 = ((FailWebSocketException) err).getStatusCode(); |
489 err = new ProtocolException().initCause(err); |
488 err = new ProtocolException().initCause(err); |
490 if (debug.isLoggable(Level.DEBUG)) { |
489 if (debug.on()) { |
491 debug.log(Level.DEBUG, "failing %s with error=%s statusCode=%s", |
490 debug.log("failing %s with error=%s statusCode=%s", |
492 WebSocketImpl.this, err, code1); |
491 WebSocketImpl.this, err, code1); |
493 } |
492 } |
494 sendCloseSilently(code1); |
493 sendCloseSilently(code1); |
495 } |
494 } |
496 long id = 0; |
495 long id = 0; |
497 if (debug.isLoggable(Level.DEBUG)) { |
496 if (debug.on()) { |
498 id = receiveCounter.incrementAndGet(); |
497 id = receiveCounter.incrementAndGet(); |
499 debug.log(Level.DEBUG, "enter onError %s error=%s", id, err); |
498 debug.log("enter onError %s error=%s", id, err); |
500 } |
499 } |
501 try { |
500 try { |
502 listener.onError(WebSocketImpl.this, err); |
501 listener.onError(WebSocketImpl.this, err); |
503 } finally { |
502 } finally { |
504 if (debug.isLoggable(Level.DEBUG)) { |
503 if (debug.on()) { |
505 debug.log(Level.DEBUG, "exit onError %s", id); |
504 debug.log("exit onError %s", id); |
506 } |
505 } |
507 } |
506 } |
508 } |
507 } |
509 |
508 |
510 private void processClose() throws IOException { |
509 private void processClose() throws IOException { |
511 debug.log(Level.DEBUG, "processClose"); |
510 debug.log("processClose"); |
512 transport.closeInput(); |
511 transport.closeInput(); |
513 receiveScheduler.stop(); |
512 receiveScheduler.stop(); |
514 CompletionStage<?> cs = null; // when the listener is ready to close |
513 CompletionStage<?> cs = null; // when the listener is ready to close |
515 long id = 0; |
514 long id = 0; |
516 if (debug.isLoggable(Level.DEBUG)) { |
515 if (debug.on()) { |
517 id = receiveCounter.incrementAndGet(); |
516 id = receiveCounter.incrementAndGet(); |
518 debug.log(Level.DEBUG, |
517 debug.log("enter onClose %s statusCode=%s reason.length=%s", |
519 "enter onClose %s statusCode=%s reason.length=%s", |
|
520 id, statusCode, reason.length()); |
518 id, statusCode, reason.length()); |
521 } |
519 } |
522 try { |
520 try { |
523 cs = listener.onClose(WebSocketImpl.this, statusCode, reason); |
521 cs = listener.onClose(WebSocketImpl.this, statusCode, reason); |
524 } finally { |
522 } finally { |
525 debug.log(Level.DEBUG, "exit onClose %s returned %s", id, cs); |
523 debug.log("exit onClose %s returned %s", id, cs); |
526 } |
524 } |
527 if (cs == null) { |
525 if (cs == null) { |
528 cs = DONE; |
526 cs = DONE; |
529 } |
527 } |
530 int code; |
528 int code; |
531 if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) { |
529 if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) { |
532 code = NORMAL_CLOSURE; |
530 code = NORMAL_CLOSURE; |
533 debug.log(Level.DEBUG, "using statusCode %s instead of %s", |
531 debug.log("using statusCode %s instead of %s", |
534 statusCode, code); |
532 statusCode, code); |
535 |
533 |
536 } else { |
534 } else { |
537 code = statusCode; |
535 code = statusCode; |
538 } |
536 } |
539 cs.whenComplete((r, e) -> { |
537 cs.whenComplete((r, e) -> { |
540 if (debug.isLoggable(Level.DEBUG)) { |
538 if (debug.on()) { |
541 debug.log(Level.DEBUG, |
539 debug.log("CompletionStage returned by onClose completed result=%s error=%s", |
542 "CompletionStage returned by onClose completed result=%s error=%s", |
|
543 r, e); |
540 r, e); |
544 } |
541 } |
545 sendCloseSilently(code); |
542 sendCloseSilently(code); |
546 }); |
543 }); |
547 } |
544 } |
548 |
545 |
549 private void processPong() { |
546 private void processPong() { |
550 long id = 0; |
547 long id = 0; |
551 if (debug.isLoggable(Level.DEBUG)) { |
548 if (debug.on()) { |
552 id = receiveCounter.incrementAndGet(); |
549 id = receiveCounter.incrementAndGet(); |
553 debug.log(Level.DEBUG, "enter onPong %s payload=%s", |
550 debug.log("enter onPong %s payload=%s", |
554 id, binaryData); |
551 id, binaryData); |
555 } |
552 } |
556 CompletionStage<?> cs = null; |
553 CompletionStage<?> cs = null; |
557 try { |
554 try { |
558 cs = listener.onPong(WebSocketImpl.this, binaryData); |
555 cs = listener.onPong(WebSocketImpl.this, binaryData); |
559 } finally { |
556 } finally { |
560 if (debug.isLoggable(Level.DEBUG)) { |
557 if (debug.on()) { |
561 debug.log(Level.DEBUG, "exit onPong %s returned %s", id, cs); |
558 debug.log("exit onPong %s returned %s", id, cs); |
562 } |
559 } |
563 } |
560 } |
564 } |
561 } |
565 |
562 |
566 private void processPing() { |
563 private void processPing() { |
567 if (debug.isLoggable(Level.DEBUG)) { |
564 if (debug.on()) { |
568 debug.log(Level.DEBUG, "processPing"); |
565 debug.log("processPing"); |
569 } |
566 } |
570 // A full copy of this (small) data is made. This way sending a |
567 // A full copy of this (small) data is made. This way sending a |
571 // replying Pong could be done in parallel with the listener |
568 // replying Pong could be done in parallel with the listener |
572 // handling this Ping. |
569 // handling this Ping. |
573 ByteBuffer slice = binaryData.slice(); |
570 ByteBuffer slice = binaryData.slice(); |
586 WebSocketImpl.this, |
583 WebSocketImpl.this, |
587 reporter); |
584 reporter); |
588 } |
585 } |
589 } |
586 } |
590 long id = 0; |
587 long id = 0; |
591 if (debug.isLoggable(Level.DEBUG)) { |
588 if (debug.on()) { |
592 id = receiveCounter.incrementAndGet(); |
589 id = receiveCounter.incrementAndGet(); |
593 debug.log(Level.DEBUG, "enter onPing %s payload=%s", id, slice); |
590 debug.log("enter onPing %s payload=%s", id, slice); |
594 } |
591 } |
595 CompletionStage<?> cs = null; |
592 CompletionStage<?> cs = null; |
596 try { |
593 try { |
597 cs = listener.onPing(WebSocketImpl.this, slice); |
594 cs = listener.onPing(WebSocketImpl.this, slice); |
598 } finally { |
595 } finally { |
599 if (debug.isLoggable(Level.DEBUG)) { |
596 if (debug.on()) { |
600 debug.log(Level.DEBUG, "exit onPing %s returned %s", id, cs); |
597 debug.log("exit onPing %s returned %s", id, cs); |
601 } |
598 } |
602 } |
599 } |
603 } |
600 } |
604 |
601 |
605 private void processBinary() { |
602 private void processBinary() { |
606 long id = 0; |
603 long id = 0; |
607 if (debug.isLoggable(Level.DEBUG)) { |
604 if (debug.on()) { |
608 id = receiveCounter.incrementAndGet(); |
605 id = receiveCounter.incrementAndGet(); |
609 debug.log(Level.DEBUG, "enter onBinary %s payload=%s last=%s", |
606 debug.log("enter onBinary %s payload=%s last=%s", |
610 id, binaryData, last); |
607 id, binaryData, last); |
611 } |
608 } |
612 CompletionStage<?> cs = null; |
609 CompletionStage<?> cs = null; |
613 try { |
610 try { |
614 cs = listener.onBinary(WebSocketImpl.this, binaryData, last); |
611 cs = listener.onBinary(WebSocketImpl.this, binaryData, last); |
615 } finally { |
612 } finally { |
616 if (debug.isLoggable(Level.DEBUG)) { |
613 if (debug.on()) { |
617 debug.log(Level.DEBUG, "exit onBinary %s returned %s", id, cs); |
614 debug.log("exit onBinary %s returned %s", id, cs); |
618 } |
615 } |
619 } |
616 } |
620 } |
617 } |
621 |
618 |
622 private void processText() { |
619 private void processText() { |
623 long id = 0; |
620 long id = 0; |
624 if (debug.isLoggable(Level.DEBUG)) { |
621 if (debug.on()) { |
625 id = receiveCounter.incrementAndGet(); |
622 id = receiveCounter.incrementAndGet(); |
626 debug.log(Level.DEBUG, |
623 debug.log("enter onText %s payload.length=%s last=%s", |
627 "enter onText %s payload.length=%s last=%s", |
|
628 id, text.length(), last); |
624 id, text.length(), last); |
629 } |
625 } |
630 CompletionStage<?> cs = null; |
626 CompletionStage<?> cs = null; |
631 try { |
627 try { |
632 cs = listener.onText(WebSocketImpl.this, text, last); |
628 cs = listener.onText(WebSocketImpl.this, text, last); |
633 } finally { |
629 } finally { |
634 if (debug.isLoggable(Level.DEBUG)) { |
630 if (debug.on()) { |
635 debug.log(Level.DEBUG, "exit onText %s returned %s", id, cs); |
631 debug.log("exit onText %s returned %s", id, cs); |
636 } |
632 } |
637 } |
633 } |
638 } |
634 } |
639 |
635 |
640 private void processOpen() { |
636 private void processOpen() { |
641 long id = 0; |
637 long id = 0; |
642 if (debug.isLoggable(Level.DEBUG)) { |
638 if (debug.on()) { |
643 id = receiveCounter.incrementAndGet(); |
639 id = receiveCounter.incrementAndGet(); |
644 debug.log(Level.DEBUG, "enter onOpen %s", id); |
640 debug.log("enter onOpen %s", id); |
645 } |
641 } |
646 try { |
642 try { |
647 listener.onOpen(WebSocketImpl.this); |
643 listener.onOpen(WebSocketImpl.this); |
648 } finally { |
644 } finally { |
649 if (debug.isLoggable(Level.DEBUG)) { |
645 if (debug.on()) { |
650 debug.log(Level.DEBUG, "exit onOpen %s", id); |
646 debug.log("exit onOpen %s", id); |
651 } |
647 } |
652 } |
648 } |
653 } |
649 } |
654 } |
650 } |
655 |
651 |
656 private void sendCloseSilently(int statusCode) { |
652 private void sendCloseSilently(int statusCode) { |
657 sendClose0(statusCode, "").whenComplete((r, e) -> { |
653 sendClose0(statusCode, "").whenComplete((r, e) -> { |
658 if (e != null) { |
654 if (e != null) { |
659 if (debug.isLoggable(Level.DEBUG)) { |
655 if (debug.on()) { |
660 debug.log(Level.DEBUG, "automatic closure completed with error", |
656 debug.log("automatic closure completed with error", |
661 (Object) e); |
657 (Object) e); |
662 } |
658 } |
663 } |
659 } |
664 }); |
660 }); |
665 } |
661 } |
696 } else if (lastAutomaticPong.compareAndSet(message, copy)) { |
692 } else if (lastAutomaticPong.compareAndSet(message, copy)) { |
697 swapped = true; |
693 swapped = true; |
698 break; |
694 break; |
699 } |
695 } |
700 } |
696 } |
701 if (debug.isLoggable(Level.DEBUG)) { |
697 if (debug.on()) { |
702 debug.log(Level.DEBUG, "swapped automatic pong from %s to %s", |
698 debug.log("swapped automatic pong from %s to %s", |
703 message, copy); |
699 message, copy); |
704 } |
700 } |
705 return swapped; |
701 return swapped; |
706 } |
702 } |
707 |
703 |
708 private void signalOpen() { |
704 private void signalOpen() { |
709 debug.log(Level.DEBUG, "signalOpen"); |
705 debug.log("signalOpen"); |
710 receiveScheduler.runOrSchedule(); |
706 receiveScheduler.runOrSchedule(); |
711 } |
707 } |
712 |
708 |
713 private void signalError(Throwable error) { |
709 private void signalError(Throwable error) { |
714 if (debug.isLoggable(Level.DEBUG)) { |
710 if (debug.on()) { |
715 debug.log(Level.DEBUG, "signalError %s", (Object) error); |
711 debug.log("signalError %s", (Object) error); |
716 } |
712 } |
717 inputClosed = true; |
713 inputClosed = true; |
718 outputClosed.set(true); |
714 outputClosed.set(true); |
719 if (!this.error.compareAndSet(null, error) || !trySetState(ERROR)) { |
715 if (!this.error.compareAndSet(null, error) || !trySetState(ERROR)) { |
720 if (debug.isLoggable(Level.DEBUG)) { |
716 if (debug.on()) { |
721 debug.log(Level.DEBUG, "signalError", error); |
717 debug.log("signalError", error); |
722 } |
718 } |
723 Log.logError(error); |
719 Log.logError(error); |
724 } else { |
720 } else { |
725 close(); |
721 close(); |
726 } |
722 } |
727 } |
723 } |
728 |
724 |
729 private void close() { |
725 private void close() { |
730 if (debug.isLoggable(Level.DEBUG)) { |
726 if (debug.on()) { |
731 debug.log(Level.DEBUG, "close"); |
727 debug.log("close"); |
732 } |
728 } |
733 Throwable first = null; |
729 Throwable first = null; |
734 try { |
730 try { |
735 transport.closeInput(); |
731 transport.closeInput(); |
736 } catch (Throwable t1) { |
732 } catch (Throwable t1) { |