102 private final AtomicBoolean outputClosed = new AtomicBoolean(); |
102 private final AtomicBoolean outputClosed = new AtomicBoolean(); |
103 |
103 |
104 private final AtomicReference<State> state = new AtomicReference<>(OPEN); |
104 private final AtomicReference<State> state = new AtomicReference<>(OPEN); |
105 |
105 |
106 /* Components of calls to Listener's methods */ |
106 /* Components of calls to Listener's methods */ |
107 private MessagePart part; |
107 private boolean last; |
108 private ByteBuffer binaryData; |
108 private ByteBuffer binaryData; |
109 private CharSequence text; |
109 private CharSequence text; |
110 private int statusCode; |
110 private int statusCode; |
111 private String reason; |
111 private String reason; |
112 private final AtomicReference<Throwable> error = new AtomicReference<>(); |
112 private final AtomicReference<Throwable> error = new AtomicReference<>(); |
188 |
188 |
189 // FIXME: add to action handling of errors -> signalError() |
189 // FIXME: add to action handling of errors -> signalError() |
190 |
190 |
191 @Override |
191 @Override |
192 public CompletableFuture<WebSocket> sendText(CharSequence message, |
192 public CompletableFuture<WebSocket> sendText(CharSequence message, |
193 boolean isLast) { |
193 boolean last) { |
194 Objects.requireNonNull(message); |
194 Objects.requireNonNull(message); |
195 long id = 0; |
195 long id = 0; |
196 if (debug.isLoggable(Level.DEBUG)) { |
196 if (debug.isLoggable(Level.DEBUG)) { |
197 id = sendCounter.incrementAndGet(); |
197 id = sendCounter.incrementAndGet(); |
198 debug.log(Level.DEBUG, "enter send text %s payload length=%s last=%s", |
198 debug.log(Level.DEBUG, "enter send text %s payload length=%s last=%s", |
199 id, message.length(), isLast); |
199 id, message.length(), last); |
200 } |
200 } |
201 CompletableFuture<WebSocket> result; |
201 CompletableFuture<WebSocket> result; |
202 if (!setPendingTextOrBinary()) { |
202 if (!setPendingTextOrBinary()) { |
203 result = failedFuture(new IllegalStateException("Send pending")); |
203 result = failedFuture(new IllegalStateException("Send pending")); |
204 } else { |
204 } else { |
205 result = transport.sendText(message, isLast, this, |
205 result = transport.sendText(message, last, this, |
206 (r, e) -> clearPendingTextOrBinary()); |
206 (r, e) -> clearPendingTextOrBinary()); |
207 } |
207 } |
208 debug.log(Level.DEBUG, "exit send text %s returned %s", id, result); |
208 debug.log(Level.DEBUG, "exit send text %s returned %s", id, result); |
209 |
209 |
210 return replaceNull(result); |
210 return replaceNull(result); |
211 } |
211 } |
212 |
212 |
213 @Override |
213 @Override |
214 public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, |
214 public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, |
215 boolean isLast) { |
215 boolean last) { |
216 Objects.requireNonNull(message); |
216 Objects.requireNonNull(message); |
217 long id = 0; |
217 long id = 0; |
218 if (debug.isLoggable(Level.DEBUG)) { |
218 if (debug.isLoggable(Level.DEBUG)) { |
219 id = sendCounter.incrementAndGet(); |
219 id = sendCounter.incrementAndGet(); |
220 debug.log(Level.DEBUG, "enter send binary %s payload=%s last=%s", |
220 debug.log(Level.DEBUG, "enter send binary %s payload=%s last=%s", |
221 id, message, isLast); |
221 id, message, last); |
222 } |
222 } |
223 CompletableFuture<WebSocket> result; |
223 CompletableFuture<WebSocket> result; |
224 if (!setPendingTextOrBinary()) { |
224 if (!setPendingTextOrBinary()) { |
225 result = failedFuture(new IllegalStateException("Send pending")); |
225 result = failedFuture(new IllegalStateException("Send pending")); |
226 } else { |
226 } else { |
227 result = transport.sendBinary(message, isLast, this, |
227 result = transport.sendBinary(message, last, this, |
228 (r, e) -> clearPendingTextOrBinary()); |
228 (r, e) -> clearPendingTextOrBinary()); |
229 } |
229 } |
230 debug.log(Level.DEBUG, "exit send binary %s returned %s", id, result); |
230 debug.log(Level.DEBUG, "exit send binary %s returned %s", id, result); |
231 return replaceNull(result); |
231 return replaceNull(result); |
232 } |
232 } |
619 |
619 |
620 private void processBinary() { |
620 private void processBinary() { |
621 long id = 0; |
621 long id = 0; |
622 if (debug.isLoggable(Level.DEBUG)) { |
622 if (debug.isLoggable(Level.DEBUG)) { |
623 id = receiveCounter.incrementAndGet(); |
623 id = receiveCounter.incrementAndGet(); |
624 debug.log(Level.DEBUG, "enter onBinary %s payload=%s part=%s", |
624 debug.log(Level.DEBUG, "enter onBinary %s payload=%s last=%s", |
625 id, binaryData, part); |
625 id, binaryData, last); |
626 } |
626 } |
627 CompletionStage<?> cs = null; |
627 CompletionStage<?> cs = null; |
628 try { |
628 try { |
629 cs = listener.onBinary(WebSocketImpl.this, binaryData, part); |
629 cs = listener.onBinary(WebSocketImpl.this, binaryData, last); |
630 } finally { |
630 } finally { |
631 debug.log(Level.DEBUG, "exit onBinary %s returned %s", id, cs); |
631 debug.log(Level.DEBUG, "exit onBinary %s returned %s", id, cs); |
632 } |
632 } |
633 } |
633 } |
634 |
634 |
635 private void processText() { |
635 private void processText() { |
636 long id = 0; |
636 long id = 0; |
637 if (debug.isLoggable(Level.DEBUG)) { |
637 if (debug.isLoggable(Level.DEBUG)) { |
638 id = receiveCounter.incrementAndGet(); |
638 id = receiveCounter.incrementAndGet(); |
639 debug.log(Level.DEBUG, |
639 debug.log(Level.DEBUG, |
640 "enter onText %s payload.length=%s part=%s", |
640 "enter onText %s payload.length=%s last=%s", |
641 id, text.length(), part); |
641 id, text.length(), last); |
642 } |
642 } |
643 CompletionStage<?> cs = null; |
643 CompletionStage<?> cs = null; |
644 try { |
644 try { |
645 cs = listener.onText(WebSocketImpl.this, text, part); |
645 cs = listener.onText(WebSocketImpl.this, text, last); |
646 } finally { |
646 } finally { |
647 debug.log(Level.DEBUG, "exit onText %s returned %s", id, cs); |
647 debug.log(Level.DEBUG, "exit onText %s returned %s", id, cs); |
648 } |
648 } |
649 } |
649 } |
650 |
650 |
775 } |
775 } |
776 |
776 |
777 private class SignallingMessageConsumer implements MessageStreamConsumer { |
777 private class SignallingMessageConsumer implements MessageStreamConsumer { |
778 |
778 |
779 @Override |
779 @Override |
780 public void onText(CharSequence data, MessagePart part) { |
780 public void onText(CharSequence data, boolean last) { |
781 transport.acknowledgeReception(); |
781 transport.acknowledgeReception(); |
782 text = data; |
782 text = data; |
783 WebSocketImpl.this.part = part; |
783 WebSocketImpl.this.last = last; |
784 tryChangeState(WAITING, TEXT); |
784 tryChangeState(WAITING, TEXT); |
785 } |
785 } |
786 |
786 |
787 @Override |
787 @Override |
788 public void onBinary(ByteBuffer data, MessagePart part) { |
788 public void onBinary(ByteBuffer data, boolean last) { |
789 transport.acknowledgeReception(); |
789 transport.acknowledgeReception(); |
790 binaryData = data; |
790 binaryData = data; |
791 WebSocketImpl.this.part = part; |
791 WebSocketImpl.this.last = last; |
792 tryChangeState(WAITING, BINARY); |
792 tryChangeState(WAITING, BINARY); |
793 } |
793 } |
794 |
794 |
795 @Override |
795 @Override |
796 public void onPing(ByteBuffer data) { |
796 public void onPing(ByteBuffer data) { |