src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java
branchhttp-client-branch
changeset 56294 181bc33917e4
parent 56293 7e21161251dc
child 56295 898dfb226bd0
equal deleted inserted replaced
56293:7e21161251dc 56294:181bc33917e4
    31 import jdk.internal.net.http.common.SequentialScheduler;
    31 import jdk.internal.net.http.common.SequentialScheduler;
    32 import jdk.internal.net.http.common.Utils;
    32 import jdk.internal.net.http.common.Utils;
    33 import jdk.internal.net.http.websocket.OpeningHandshake.Result;
    33 import jdk.internal.net.http.websocket.OpeningHandshake.Result;
    34 
    34 
    35 import java.io.IOException;
    35 import java.io.IOException;
       
    36 import java.io.InterruptedIOException;
    36 import java.lang.ref.Reference;
    37 import java.lang.ref.Reference;
    37 import java.net.ProtocolException;
    38 import java.net.ProtocolException;
    38 import java.net.URI;
    39 import java.net.URI;
    39 import java.net.http.WebSocket;
    40 import java.net.http.WebSocket;
    40 import java.nio.ByteBuffer;
    41 import java.nio.ByteBuffer;
    41 import java.util.Objects;
    42 import java.util.Objects;
    42 import java.util.concurrent.CompletableFuture;
    43 import java.util.concurrent.CompletableFuture;
    43 import java.util.concurrent.CompletionStage;
    44 import java.util.concurrent.CompletionStage;
       
    45 import java.util.concurrent.TimeUnit;
    44 import java.util.concurrent.TimeoutException;
    46 import java.util.concurrent.TimeoutException;
    45 import java.util.concurrent.atomic.AtomicBoolean;
    47 import java.util.concurrent.atomic.AtomicBoolean;
    46 import java.util.concurrent.atomic.AtomicLong;
    48 import java.util.concurrent.atomic.AtomicLong;
    47 import java.util.concurrent.atomic.AtomicReference;
    49 import java.util.concurrent.atomic.AtomicReference;
    48 import java.util.function.BiConsumer;
    50 import java.util.function.BiConsumer;
    49 import java.util.function.Function;
    51 import java.util.function.Function;
    50 
    52 
    51 import static java.util.Objects.requireNonNull;
    53 import static java.util.Objects.requireNonNull;
       
    54 import static jdk.internal.net.http.common.MinimalFuture.completedFuture;
    52 import static jdk.internal.net.http.common.MinimalFuture.failedFuture;
    55 import static jdk.internal.net.http.common.MinimalFuture.failedFuture;
    53 import static jdk.internal.net.http.websocket.StatusCodes.CLOSED_ABNORMALLY;
    56 import static jdk.internal.net.http.websocket.StatusCodes.CLOSED_ABNORMALLY;
    54 import static jdk.internal.net.http.websocket.StatusCodes.NO_STATUS_CODE;
    57 import static jdk.internal.net.http.websocket.StatusCodes.NO_STATUS_CODE;
    55 import static jdk.internal.net.http.websocket.StatusCodes.isLegalToSendFromClient;
    58 import static jdk.internal.net.http.websocket.StatusCodes.isLegalToSendFromClient;
    56 import static jdk.internal.net.http.websocket.WebSocketImpl.State.BINARY;
    59 import static jdk.internal.net.http.websocket.WebSocketImpl.State.BINARY;
    67  * A WebSocket client.
    70  * A WebSocket client.
    68  */
    71  */
    69 public final class WebSocketImpl implements WebSocket {
    72 public final class WebSocketImpl implements WebSocket {
    70 
    73 
    71     private final static boolean DEBUG = true;
    74     private final static boolean DEBUG = true;
    72     private final AtomicLong counter = new AtomicLong();
    75     private final AtomicLong sendCounter = new AtomicLong();
       
    76     private final AtomicLong receiveCounter = new AtomicLong();
    73 
    77 
    74     enum State {
    78     enum State {
    75         OPEN,
    79         OPEN,
    76         IDLE,
    80         IDLE,
    77         WAITING,
    81         WAITING,
    82         CLOSE,
    86         CLOSE,
    83         ERROR;
    87         ERROR;
    84     }
    88     }
    85 
    89 
    86     private final MinimalFuture<WebSocket> DONE = MinimalFuture.completedFuture(this);
    90     private final MinimalFuture<WebSocket> DONE = MinimalFuture.completedFuture(this);
       
    91     private final long closeTimeout;
    87     private volatile boolean inputClosed;
    92     private volatile boolean inputClosed;
    88     private volatile boolean outputClosed;
    93     private volatile boolean outputClosed;
    89 
    94 
    90     private final AtomicReference<State> state = new AtomicReference<>(OPEN);
    95     private final AtomicReference<State> state = new AtomicReference<>(OPEN);
    91 
    96 
   148         this.uri = requireNonNull(uri);
   153         this.uri = requireNonNull(uri);
   149         this.subprotocol = requireNonNull(subprotocol);
   154         this.subprotocol = requireNonNull(subprotocol);
   150         this.listener = requireNonNull(listener);
   155         this.listener = requireNonNull(listener);
   151         this.transport = transportFactory.createTransport(
   156         this.transport = transportFactory.createTransport(
   152                 new SignallingMessageConsumer());
   157                 new SignallingMessageConsumer());
       
   158         closeTimeout = readCloseTimeout();
       
   159     }
       
   160 
       
   161     private static int readCloseTimeout() {
       
   162         String property = "jdk.httpclient.websocket.closeTimeout";
       
   163         int defaultValue = 30;
       
   164         String value = Utils.getNetProperty(property);
       
   165         int v;
       
   166         if (value == null) {
       
   167             v = defaultValue;
       
   168         } else {
       
   169             try {
       
   170                 v = Integer.parseUnsignedInt(value);
       
   171             } catch (NumberFormatException ignored) {
       
   172                 v = defaultValue;
       
   173             }
       
   174         }
       
   175         if (DEBUG) {
       
   176             System.out.printf("[WebSocket] %s=%s, using value %s%n",
       
   177                               property, value, v);
       
   178         }
       
   179         return v;
   153     }
   180     }
   154 
   181 
   155     // FIXME: add to action handling of errors -> signalError()
   182     // FIXME: add to action handling of errors -> signalError()
   156 
   183 
   157     @Override
   184     @Override
   158     public CompletableFuture<WebSocket> sendText(CharSequence message,
   185     public CompletableFuture<WebSocket> sendText(CharSequence message,
   159                                                  boolean isLast) {
   186                                                  boolean isLast) {
   160         Objects.requireNonNull(message);
   187         Objects.requireNonNull(message);
   161         long id;
   188         long id;
   162         if (DEBUG) {
   189         if (DEBUG) {
   163             id = counter.incrementAndGet();
   190             id = sendCounter.incrementAndGet();
   164             System.out.printf("[WebSocket] %s send text: payload length=%s last=%s%n",
   191             System.out.printf("[WebSocket] %s send text: payload length=%s last=%s%n",
   165                               id, message.length(), isLast);
   192                               id, message.length(), isLast);
   166         }
   193         }
   167         CompletableFuture<WebSocket> result;
   194         CompletableFuture<WebSocket> result;
   168         if (!outstandingSend.compareAndSet(false, true)) {
   195         if (!outstandingSend.compareAndSet(false, true)) {
   182     public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
   209     public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
   183                                                    boolean isLast) {
   210                                                    boolean isLast) {
   184         Objects.requireNonNull(message);
   211         Objects.requireNonNull(message);
   185         long id;
   212         long id;
   186         if (DEBUG) {
   213         if (DEBUG) {
   187             id = counter.incrementAndGet();
   214             id = sendCounter.incrementAndGet();
   188             System.out.printf("[WebSocket] %s send binary: payload=%s last=%s%n",
   215             System.out.printf("[WebSocket] %s send binary: payload=%s last=%s%n",
   189                               id, message, isLast);
   216                               id, message, isLast);
   190         }
   217         }
   191         CompletableFuture<WebSocket> result;
   218         CompletableFuture<WebSocket> result;
   192         if (!outstandingSend.compareAndSet(false, true)) {
   219         if (!outstandingSend.compareAndSet(false, true)) {
   215     @Override
   242     @Override
   216     public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
   243     public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
   217         Objects.requireNonNull(message);
   244         Objects.requireNonNull(message);
   218         long id;
   245         long id;
   219         if (DEBUG) {
   246         if (DEBUG) {
   220             id = counter.incrementAndGet();
   247             id = sendCounter.incrementAndGet();
   221             System.out.printf("[WebSocket] %s send ping: payload=%s%n",
   248             System.out.printf("[WebSocket] %s send ping: payload=%s%n",
   222                               id, message);
   249                               id, message);
   223         }
   250         }
   224         CompletableFuture<WebSocket> result = transport.sendPing(message, this,
   251         CompletableFuture<WebSocket> result = transport.sendPing(message, this,
   225                                                                  (r, e) -> { });
   252                                                                  (r, e) -> { });
   233     @Override
   260     @Override
   234     public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
   261     public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
   235         Objects.requireNonNull(message);
   262         Objects.requireNonNull(message);
   236         long id;
   263         long id;
   237         if (DEBUG) {
   264         if (DEBUG) {
   238             id = counter.incrementAndGet();
   265             id = sendCounter.incrementAndGet();
   239             System.out.printf("[WebSocket] %s send pong: payload=%s%n",
   266             System.out.printf("[WebSocket] %s send pong: payload=%s%n",
   240                               id, message);
   267                               id, message);
   241         }
   268         }
   242         CompletableFuture<WebSocket> result = transport.sendPong(message, this,
   269         CompletableFuture<WebSocket> result = transport.sendPong(message, this,
   243                                                                  (r, e) -> { });
   270                                                                  (r, e) -> { });
   252     public CompletableFuture<WebSocket> sendClose(int statusCode,
   279     public CompletableFuture<WebSocket> sendClose(int statusCode,
   253                                                   String reason) {
   280                                                   String reason) {
   254         Objects.requireNonNull(reason);
   281         Objects.requireNonNull(reason);
   255         long id;
   282         long id;
   256         if (DEBUG) {
   283         if (DEBUG) {
   257             id = counter.incrementAndGet();
   284             id = sendCounter.incrementAndGet();
   258             System.out.printf("[WebSocket] %s send close: statusCode=%s, reason.length=%s%n",
   285             System.out.printf("[WebSocket] %s send close: statusCode=%s, reason.length=%s%n",
   259                               id, statusCode, reason);
   286                               id, statusCode, reason);
   260         }
   287         }
   261         CompletableFuture<WebSocket> result;
   288         CompletableFuture<WebSocket> result;
   262         if (!isLegalToSendFromClient(statusCode)) {
   289         if (!isLegalToSendFromClient(statusCode)) {
   270                               id, result);
   297                               id, result);
   271         }
   298         }
   272         return replaceNull(result);
   299         return replaceNull(result);
   273     }
   300     }
   274 
   301 
   275     /*
       
   276      * Sends a Close message, then shuts down the output since no more
       
   277      * messages are expected to be sent at this point.
       
   278      */
       
   279     private CompletableFuture<WebSocket> sendClose0(int statusCode,
   302     private CompletableFuture<WebSocket> sendClose0(int statusCode,
   280                                                     String reason) {
   303                                                     String reason) {
   281         outputClosed = true;
   304         outputClosed = true;
   282         BiConsumer<WebSocket, Throwable> closer = (r, e) -> {
       
   283             Throwable cause = Utils.getCompletionCause(e);
       
   284             if (cause instanceof IllegalArgumentException) {
       
   285                 // or pre=check it (isLegalToSendFromClient(statusCode))
       
   286                 return;
       
   287             }
       
   288             try {
       
   289                 transport.closeOutput();
       
   290             } catch (IOException ex) {
       
   291                 Log.logError(ex);
       
   292             }
       
   293             if (cause instanceof TimeoutException) { // FIXME: it is not the case anymore
       
   294                 if (DEBUG) {
       
   295                     System.out.println("[WebSocket] sendClose0 error: " + e);
       
   296                 }
       
   297                 try {
       
   298                     transport.closeInput();
       
   299                 } catch (IOException ex) {
       
   300                     Log.logError(ex);
       
   301                 }
       
   302             }
       
   303         };
       
   304         CompletableFuture<WebSocket> cf
   305         CompletableFuture<WebSocket> cf
   305                 = transport.sendClose(statusCode, reason, this, closer);
   306                 = transport.sendClose(statusCode, reason, this, (r, e) -> { });
   306         return cf;
   307         CompletableFuture<WebSocket> closeOrTimeout
       
   308                 = replaceNull(cf).orTimeout(closeTimeout, TimeUnit.SECONDS);
       
   309         // The snippet below, whose purpose might not be immediately obvious,
       
   310         // is a trick used to complete a dependant stage with an IOException.
       
   311         // A checked IOException cannot be thrown from inside the BiConsumer
       
   312         // supplied to the handle method. Instead a CompletionStage completed
       
   313         // exceptionally with this IOException is returned.
       
   314         return closeOrTimeout.handle(this::processCloseOutcome)
       
   315                              .thenCompose(Function.identity());
       
   316     }
       
   317 
       
   318     private CompletionStage<WebSocket> processCloseOutcome(WebSocket webSocket,
       
   319                                                            Throwable e) {
       
   320         if (DEBUG) {
       
   321             System.out.printf("[WebSocket] send close completed, error=%s%n", e);
       
   322             if (e != null) {
       
   323                 e.printStackTrace(System.out);
       
   324             }
       
   325         }
       
   326         if (e == null) {
       
   327             return completedFuture(webSocket);
       
   328         }
       
   329         Throwable cause = Utils.getCompletionCause(e);
       
   330         if (cause instanceof IllegalArgumentException) {
       
   331             return failedFuture(cause);
       
   332         }
       
   333         try {
       
   334             transport.closeOutput();
       
   335         } catch (IOException ignored) { }
       
   336 
       
   337         if (cause instanceof TimeoutException) {
       
   338             inputClosed = true;
       
   339             try {
       
   340                 transport.closeInput();
       
   341             } catch (IOException ignored) { }
       
   342             return failedFuture(new InterruptedIOException(
       
   343                     "Could not send close within a reasonable timeout"));
       
   344         }
       
   345         return failedFuture(cause);
   307     }
   346     }
   308 
   347 
   309     @Override
   348     @Override
   310     public void request(long n) {
   349     public void request(long n) {
   311         if (DEBUG) {
   350         if (DEBUG) {
   370         // onOpen is invoked first and no messages become pending before onOpen
   409         // onOpen is invoked first and no messages become pending before onOpen
   371         // finishes
   410         // finishes
   372 
   411 
   373         @Override
   412         @Override
   374         public void run() {
   413         public void run() {
       
   414             if (DEBUG) {
       
   415                 System.out.printf("[WebSocket] enter receive task%n");
       
   416             }
       
   417             loop:
   375             while (true) {
   418             while (true) {
   376                 State s = state.get();
   419                 State s = state.get();
       
   420                 if (DEBUG) {
       
   421                     System.out.printf("[WebSocket] receive state: %s%n", s);
       
   422                 }
   377                 try {
   423                 try {
   378                     switch (s) {
   424                     switch (s) {
   379                         case OPEN:
   425                         case OPEN:
   380                             processOpen();
   426                             processOpen();
   381                             tryChangeState(OPEN, IDLE);
   427                             tryChangeState(OPEN, IDLE);
   396                             processPong();
   442                             processPong();
   397                             tryChangeState(PONG, IDLE);
   443                             tryChangeState(PONG, IDLE);
   398                             break;
   444                             break;
   399                         case CLOSE:
   445                         case CLOSE:
   400                             processClose();
   446                             processClose();
   401                             return;
   447                             break loop;
   402                         case ERROR:
   448                         case ERROR:
   403                             processError();
   449                             processError();
   404                             return;
   450                             break loop;
   405                         case IDLE:
   451                         case IDLE:
   406                             if (demand.tryDecrement()
   452                             if (demand.tryDecrement()
   407                                     && tryChangeState(IDLE, WAITING)) {
   453                                     && tryChangeState(IDLE, WAITING)) {
   408                                 transport.request(1);
   454                                 transport.request(1);
   409                             }
   455                             }
   410                             return;
   456                             break loop;
   411                         case WAITING:
   457                         case WAITING:
   412                             // For debugging spurious signalling: when there was a
   458                             // For debugging spurious signalling: when there was a
   413                             // signal, but apparently nothing has changed
   459                             // signal, but apparently nothing has changed
   414                             return;
   460                             break loop;
   415                         default:
   461                         default:
   416                             throw new InternalError(String.valueOf(s));
   462                             throw new InternalError(String.valueOf(s));
   417                     }
   463                     }
   418                 } catch (Throwable t) {
   464                 } catch (Throwable t) {
   419                     signalError(t);
   465                     signalError(t);
   420                 }
   466                 }
       
   467             }
       
   468             if (DEBUG) {
       
   469                 System.out.printf("[WebSocket] exit receive task%n");
   421             }
   470             }
   422         }
   471         }
   423 
   472 
   424         private void processError() throws IOException {
   473         private void processError() throws IOException {
   425             if (DEBUG) {
   474             if (DEBUG) {
   429             receiveScheduler.stop();
   478             receiveScheduler.stop();
   430             Throwable err = error.get();
   479             Throwable err = error.get();
   431             if (err instanceof FailWebSocketException) {
   480             if (err instanceof FailWebSocketException) {
   432                 int code1 = ((FailWebSocketException) err).getStatusCode();
   481                 int code1 = ((FailWebSocketException) err).getStatusCode();
   433                 err = new ProtocolException().initCause(err);
   482                 err = new ProtocolException().initCause(err);
   434                 sendClose0(code1, "")
   483                 if (DEBUG) {
       
   484                     System.out.printf("[WebSocket] failing %s with error=%s statusCode=%s%n",
       
   485                                       WebSocketImpl.this, err, code1);
       
   486                 }
       
   487                 sendClose0(code1, "") // TODO handle errors from here
   435                         .whenComplete(
   488                         .whenComplete(
   436                                 (r, e) -> {
   489                                 (r, e) -> {
   437                                     if (e != null) {
   490                                     if (e != null) {
   438                                         Log.logError(e);
   491                                         Log.logError(e);
   439                                     }
   492                                     }
   440                                 });
   493                                 });
   441             }
   494             }
   442             listener.onError(WebSocketImpl.this, err);
   495             long id;
       
   496             if (DEBUG) {
       
   497                 id = receiveCounter.incrementAndGet();
       
   498                 System.out.printf("[WebSocket] enter onError %s error=%s%n",
       
   499                                   id, err);
       
   500             }
       
   501             try {
       
   502                 listener.onError(WebSocketImpl.this, err);
       
   503             } finally {
       
   504                 if (DEBUG) {
       
   505                     System.out.printf("[WebSocket] exit onError %s%n", id);
       
   506                 }
       
   507             }
   443         }
   508         }
   444 
   509 
   445         private void processClose() throws IOException {
   510         private void processClose() throws IOException {
   446             if (DEBUG) {
   511             if (DEBUG) {
   447                 System.out.println("[WebSocket] processClose");
   512                 System.out.println("[WebSocket] processClose");
   448             }
   513             }
   449             transport.closeInput();
   514             transport.closeInput();
   450             receiveScheduler.stop();
   515             receiveScheduler.stop();
   451             CompletionStage<?> readyToClose;
   516             CompletionStage<?> cs = null; // when the listener is ready to close
   452             readyToClose = listener.onClose(WebSocketImpl.this, statusCode, reason);
   517             long id;
   453             if (readyToClose == null) {
   518             if (DEBUG) {
   454                 readyToClose = DONE;
   519                 id = receiveCounter.incrementAndGet();
       
   520                 System.out.printf("[WebSocket] enter onClose %s statusCode=%s reason.length=%s%n",
       
   521                                   id, statusCode, reason.length());
       
   522             }
       
   523             try {
       
   524                 cs = listener.onClose(WebSocketImpl.this, statusCode, reason);
       
   525             } finally {
       
   526                 System.out.printf("[WebSocket] exit onClose %s returned %s%n",
       
   527                                   id, cs);
       
   528             }
       
   529             if (cs == null) {
       
   530                 cs = DONE;
   455             }
   531             }
   456             int code;
   532             int code;
   457             if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
   533             if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
   458                 code = NORMAL_CLOSURE;
   534                 code = NORMAL_CLOSURE;
   459                 if (DEBUG) {
   535                 if (DEBUG) {
   461                                       statusCode, code);
   537                                       statusCode, code);
   462                 }
   538                 }
   463             } else {
   539             } else {
   464                 code = statusCode;
   540                 code = statusCode;
   465             }
   541             }
   466             readyToClose.whenComplete((r, e) -> {
   542             cs.whenComplete((r, e) -> { // TODO log
   467                 sendClose0(code, "") // FIXME errors from here?
   543                 sendClose0(code, "") // TODO handle errors from here
   468                         .whenComplete((r1, e1) -> {
   544                         .whenComplete((r1, e1) -> {
   469                             if (DEBUG) {
   545                             if (DEBUG) {
   470                                 if (e1 != null) {
   546                                 if (e1 != null) {
   471                                     e1.printStackTrace(System.out);
   547                                     e1.printStackTrace(System.out);
   472                                 }
   548                                 }
   474                         });
   550                         });
   475             });
   551             });
   476         }
   552         }
   477 
   553 
   478         private void processPong() {
   554         private void processPong() {
   479             listener.onPong(WebSocketImpl.this, binaryData);
   555             long id;
       
   556             if (DEBUG) {
       
   557                 id = receiveCounter.incrementAndGet();
       
   558                 System.out.printf("[WebSocket] enter onPong %s payload=%s%n",
       
   559                                   id, binaryData);
       
   560             }
       
   561             CompletionStage<?> cs = null;
       
   562             try {
       
   563                 cs = listener.onPong(WebSocketImpl.this, binaryData);
       
   564             } finally {
       
   565                 System.out.printf("[WebSocket] exit onPong %s returned %s%n",
       
   566                                   id, cs);
       
   567             }
   480         }
   568         }
   481 
   569 
   482         private void processPing() {
   570         private void processPing() {
   483             // Let's make a full copy of this tiny data. What we want here
   571             if (DEBUG) {
   484             // is to rule out a possibility the shared data we send might be
   572                 System.out.printf("[WebSocket] processPing%n");
   485             // corrupted by processing in the listener.
   573             }
   486             ByteBuffer slice = binaryData.slice();
   574             ByteBuffer slice = binaryData.slice();
       
   575             // A full copy of this (small) data is made. This way sending a
       
   576             // replying Pong could be done in parallel with the listener
       
   577             // handling this Ping.
   487             ByteBuffer copy = ByteBuffer.allocate(binaryData.remaining())
   578             ByteBuffer copy = ByteBuffer.allocate(binaryData.remaining())
   488                     .put(binaryData)
   579                     .put(binaryData)
   489                     .flip();
   580                     .flip();
   490             // Non-exclusive send;
   581             // Non-exclusive send;
   491             BiConsumer<WebSocketImpl, Throwable> reporter = (r, e) -> {
   582             BiConsumer<WebSocketImpl, Throwable> reporter = (r, e) -> {
   492                 if (e != null) { // Better error handing. What if already closed?
   583                 if (e != null) { // TODO: better error handing. What if already closed?
   493                     signalError(Utils.getCompletionCause(e));
   584                     signalError(Utils.getCompletionCause(e));
   494                 }
   585                 }
   495             };
   586             };
   496             transport.sendPong(copy, WebSocketImpl.this, reporter);
   587             transport.sendPong(copy, WebSocketImpl.this, reporter);
   497             listener.onPing(WebSocketImpl.this, slice);
   588             long id;
       
   589             if (DEBUG) {
       
   590                 id = receiveCounter.incrementAndGet();
       
   591                 System.out.printf("[WebSocket] enter onPing %s payload=%s%n",
       
   592                                   id, slice);
       
   593             }
       
   594             CompletionStage<?> cs = null;
       
   595             try {
       
   596                 cs = listener.onPing(WebSocketImpl.this, slice);
       
   597             } finally {
       
   598                 if (DEBUG) {
       
   599                     System.out.printf("[WebSocket] exit onPing %s returned %s%n",
       
   600                                       id, cs);
       
   601                 }
       
   602             }
   498         }
   603         }
   499 
   604 
   500         private void processBinary() {
   605         private void processBinary() {
   501             listener.onBinary(WebSocketImpl.this, binaryData, part);
   606             long id;
       
   607             if (DEBUG) {
       
   608                 id = receiveCounter.incrementAndGet();
       
   609                 System.out.printf("[WebSocket] enter onBinary %s payload=%s, part=%s%n",
       
   610                                   id, binaryData, part);
       
   611             }
       
   612             CompletionStage<?> cs = null;
       
   613             try {
       
   614                 cs = listener.onBinary(WebSocketImpl.this, binaryData, part);
       
   615             } finally {
       
   616                 if (DEBUG) {
       
   617                     System.out.printf("[WebSocket] exit onBinary %s returned %s%n",
       
   618                                       id, cs);
       
   619                 }
       
   620             }
   502         }
   621         }
   503 
   622 
   504         private void processText() {
   623         private void processText() {
   505             listener.onText(WebSocketImpl.this, text, part);
   624             long id;
       
   625             if (DEBUG) {
       
   626                 id = receiveCounter.incrementAndGet();
       
   627                 System.out.printf("[WebSocket] enter onText %s payload.length=%s part=%s%n",
       
   628                                   id, text.length(), part);
       
   629             }
       
   630             CompletionStage<?> cs = null;
       
   631             try {
       
   632                 cs = listener.onText(WebSocketImpl.this, text, part);
       
   633             } finally {
       
   634                 if (DEBUG) {
       
   635                     System.out.printf("[WebSocket] exit onText %s returned %s%n",
       
   636                                       id, cs);
       
   637                 }
       
   638             }
   506         }
   639         }
   507 
   640 
   508         private void processOpen() {
   641         private void processOpen() {
   509             listener.onOpen(WebSocketImpl.this);
   642             long id;
       
   643             if (DEBUG) {
       
   644                 id = receiveCounter.incrementAndGet();
       
   645                 System.out.printf("[WebSocket] enter onOpen %s%n", id);
       
   646             }
       
   647             try {
       
   648                 listener.onOpen(WebSocketImpl.this);
       
   649             } finally {
       
   650                 if (DEBUG) {
       
   651                     System.out.printf("[WebSocket] exit onOpen %s%n", id);
       
   652                 }
       
   653             }
   510         }
   654         }
   511     }
   655     }
   512 
   656 
   513     private void signalOpen() {
   657     private void signalOpen() {
   514         if (DEBUG) {
   658         if (DEBUG) {