src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java
branchhttp-client-branch
changeset 56320 f82729ca8660
parent 56318 2a96e88888b2
child 56322 316be30d078d
equal deleted inserted replaced
56319:bb9e25a8ea04 56320:f82729ca8660
   102     private final AtomicBoolean outputClosed = new AtomicBoolean();
   102     private final AtomicBoolean outputClosed = new AtomicBoolean();
   103 
   103 
   104     private final AtomicReference<State> state = new AtomicReference<>(OPEN);
   104     private final AtomicReference<State> state = new AtomicReference<>(OPEN);
   105 
   105 
   106     /* Components of calls to Listener's methods */
   106     /* Components of calls to Listener's methods */
   107     private MessagePart part;
   107     private boolean last;
   108     private ByteBuffer binaryData;
   108     private ByteBuffer binaryData;
   109     private CharSequence text;
   109     private CharSequence text;
   110     private int statusCode;
   110     private int statusCode;
   111     private String reason;
   111     private String reason;
   112     private final AtomicReference<Throwable> error = new AtomicReference<>();
   112     private final AtomicReference<Throwable> error = new AtomicReference<>();
   188 
   188 
   189     // FIXME: add to action handling of errors -> signalError()
   189     // FIXME: add to action handling of errors -> signalError()
   190 
   190 
   191     @Override
   191     @Override
   192     public CompletableFuture<WebSocket> sendText(CharSequence message,
   192     public CompletableFuture<WebSocket> sendText(CharSequence message,
   193                                                  boolean isLast) {
   193                                                  boolean last) {
   194         Objects.requireNonNull(message);
   194         Objects.requireNonNull(message);
   195         long id = 0;
   195         long id = 0;
   196         if (debug.isLoggable(Level.DEBUG)) {
   196         if (debug.isLoggable(Level.DEBUG)) {
   197             id = sendCounter.incrementAndGet();
   197             id = sendCounter.incrementAndGet();
   198             debug.log(Level.DEBUG, "enter send text %s payload length=%s last=%s",
   198             debug.log(Level.DEBUG, "enter send text %s payload length=%s last=%s",
   199                       id, message.length(), isLast);
   199                       id, message.length(), last);
   200         }
   200         }
   201         CompletableFuture<WebSocket> result;
   201         CompletableFuture<WebSocket> result;
   202         if (!setPendingTextOrBinary()) {
   202         if (!setPendingTextOrBinary()) {
   203             result = failedFuture(new IllegalStateException("Send pending"));
   203             result = failedFuture(new IllegalStateException("Send pending"));
   204         } else {
   204         } else {
   205             result = transport.sendText(message, isLast, this,
   205             result = transport.sendText(message, last, this,
   206                                         (r, e) -> clearPendingTextOrBinary());
   206                                         (r, e) -> clearPendingTextOrBinary());
   207         }
   207         }
   208         debug.log(Level.DEBUG, "exit send text %s returned %s", id, result);
   208         debug.log(Level.DEBUG, "exit send text %s returned %s", id, result);
   209 
   209 
   210         return replaceNull(result);
   210         return replaceNull(result);
   211     }
   211     }
   212 
   212 
   213     @Override
   213     @Override
   214     public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
   214     public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
   215                                                    boolean isLast) {
   215                                                    boolean last) {
   216         Objects.requireNonNull(message);
   216         Objects.requireNonNull(message);
   217         long id = 0;
   217         long id = 0;
   218         if (debug.isLoggable(Level.DEBUG)) {
   218         if (debug.isLoggable(Level.DEBUG)) {
   219             id = sendCounter.incrementAndGet();
   219             id = sendCounter.incrementAndGet();
   220             debug.log(Level.DEBUG, "enter send binary %s payload=%s last=%s",
   220             debug.log(Level.DEBUG, "enter send binary %s payload=%s last=%s",
   221                       id, message, isLast);
   221                       id, message, last);
   222         }
   222         }
   223         CompletableFuture<WebSocket> result;
   223         CompletableFuture<WebSocket> result;
   224         if (!setPendingTextOrBinary()) {
   224         if (!setPendingTextOrBinary()) {
   225             result = failedFuture(new IllegalStateException("Send pending"));
   225             result = failedFuture(new IllegalStateException("Send pending"));
   226         } else {
   226         } else {
   227             result = transport.sendBinary(message, isLast, this,
   227             result = transport.sendBinary(message, last, this,
   228                                           (r, e) -> clearPendingTextOrBinary());
   228                                           (r, e) -> clearPendingTextOrBinary());
   229         }
   229         }
   230         debug.log(Level.DEBUG, "exit send binary %s returned %s", id, result);
   230         debug.log(Level.DEBUG, "exit send binary %s returned %s", id, result);
   231         return replaceNull(result);
   231         return replaceNull(result);
   232     }
   232     }
   619 
   619 
   620         private void processBinary() {
   620         private void processBinary() {
   621             long id = 0;
   621             long id = 0;
   622             if (debug.isLoggable(Level.DEBUG)) {
   622             if (debug.isLoggable(Level.DEBUG)) {
   623                 id = receiveCounter.incrementAndGet();
   623                 id = receiveCounter.incrementAndGet();
   624                 debug.log(Level.DEBUG, "enter onBinary %s payload=%s part=%s",
   624                 debug.log(Level.DEBUG, "enter onBinary %s payload=%s last=%s",
   625                           id, binaryData, part);
   625                           id, binaryData, last);
   626             }
   626             }
   627             CompletionStage<?> cs = null;
   627             CompletionStage<?> cs = null;
   628             try {
   628             try {
   629                 cs = listener.onBinary(WebSocketImpl.this, binaryData, part);
   629                 cs = listener.onBinary(WebSocketImpl.this, binaryData, last);
   630             } finally {
   630             } finally {
   631                 debug.log(Level.DEBUG, "exit onBinary %s returned %s", id, cs);
   631                 debug.log(Level.DEBUG, "exit onBinary %s returned %s", id, cs);
   632             }
   632             }
   633         }
   633         }
   634 
   634 
   635         private void processText() {
   635         private void processText() {
   636             long id = 0;
   636             long id = 0;
   637             if (debug.isLoggable(Level.DEBUG)) {
   637             if (debug.isLoggable(Level.DEBUG)) {
   638                 id = receiveCounter.incrementAndGet();
   638                 id = receiveCounter.incrementAndGet();
   639                 debug.log(Level.DEBUG,
   639                 debug.log(Level.DEBUG,
   640                           "enter onText %s payload.length=%s part=%s",
   640                           "enter onText %s payload.length=%s last=%s",
   641                           id, text.length(), part);
   641                           id, text.length(), last);
   642             }
   642             }
   643             CompletionStage<?> cs = null;
   643             CompletionStage<?> cs = null;
   644             try {
   644             try {
   645                 cs = listener.onText(WebSocketImpl.this, text, part);
   645                 cs = listener.onText(WebSocketImpl.this, text, last);
   646             } finally {
   646             } finally {
   647                 debug.log(Level.DEBUG, "exit onText %s returned %s", id, cs);
   647                 debug.log(Level.DEBUG, "exit onText %s returned %s", id, cs);
   648             }
   648             }
   649         }
   649         }
   650 
   650 
   775     }
   775     }
   776 
   776 
   777     private class SignallingMessageConsumer implements MessageStreamConsumer {
   777     private class SignallingMessageConsumer implements MessageStreamConsumer {
   778 
   778 
   779         @Override
   779         @Override
   780         public void onText(CharSequence data, MessagePart part) {
   780         public void onText(CharSequence data, boolean last) {
   781             transport.acknowledgeReception();
   781             transport.acknowledgeReception();
   782             text = data;
   782             text = data;
   783             WebSocketImpl.this.part = part;
   783             WebSocketImpl.this.last = last;
   784             tryChangeState(WAITING, TEXT);
   784             tryChangeState(WAITING, TEXT);
   785         }
   785         }
   786 
   786 
   787         @Override
   787         @Override
   788         public void onBinary(ByteBuffer data, MessagePart part) {
   788         public void onBinary(ByteBuffer data, boolean last) {
   789             transport.acknowledgeReception();
   789             transport.acknowledgeReception();
   790             binaryData = data;
   790             binaryData = data;
   791             WebSocketImpl.this.part = part;
   791             WebSocketImpl.this.last = last;
   792             tryChangeState(WAITING, BINARY);
   792             tryChangeState(WAITING, BINARY);
   793         }
   793         }
   794 
   794 
   795         @Override
   795         @Override
   796         public void onPing(ByteBuffer data) {
   796         public void onPing(ByteBuffer data) {