src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java
branchhttp-client-branch
changeset 56437 f8b3f053cfbb
parent 56427 7f1916397463
child 56451 9585061fdb04
equal deleted inserted replaced
56436:71b78a44c698 56437:f8b3f053cfbb
    25 
    25 
    26 package jdk.internal.net.http.websocket;
    26 package jdk.internal.net.http.websocket;
    27 
    27 
    28 import jdk.internal.net.http.common.Demand;
    28 import jdk.internal.net.http.common.Demand;
    29 import jdk.internal.net.http.common.Log;
    29 import jdk.internal.net.http.common.Log;
       
    30 import jdk.internal.net.http.common.Logger;
    30 import jdk.internal.net.http.common.MinimalFuture;
    31 import jdk.internal.net.http.common.MinimalFuture;
    31 import jdk.internal.net.http.common.SequentialScheduler;
    32 import jdk.internal.net.http.common.SequentialScheduler;
    32 import jdk.internal.net.http.common.Utils;
    33 import jdk.internal.net.http.common.Utils;
    33 import jdk.internal.net.http.websocket.OpeningHandshake.Result;
    34 import jdk.internal.net.http.websocket.OpeningHandshake.Result;
    34 
    35 
    71 /*
    72 /*
    72  * A WebSocket client.
    73  * A WebSocket client.
    73  */
    74  */
    74 public final class WebSocketImpl implements WebSocket {
    75 public final class WebSocketImpl implements WebSocket {
    75 
    76 
    76     private static final boolean DEBUG = Utils.DEBUG_WS;
    77     private static final Logger debug =
    77     private static final System.Logger debug =
    78             Utils.getWebSocketLogger("[WebSocket]"::toString, Utils.DEBUG_WS);
    78             Utils.getWebSocketLogger("[WebSocket]"::toString, DEBUG);
       
    79     private final AtomicLong sendCounter = new AtomicLong();
    79     private final AtomicLong sendCounter = new AtomicLong();
    80     private final AtomicLong receiveCounter = new AtomicLong();
    80     private final AtomicLong receiveCounter = new AtomicLong();
    81 
    81 
    82     enum State {
    82     enum State {
    83         OPEN,
    83         OPEN,
   171     @Override
   171     @Override
   172     public CompletableFuture<WebSocket> sendText(CharSequence message,
   172     public CompletableFuture<WebSocket> sendText(CharSequence message,
   173                                                  boolean last) {
   173                                                  boolean last) {
   174         Objects.requireNonNull(message);
   174         Objects.requireNonNull(message);
   175         long id = 0;
   175         long id = 0;
   176         if (debug.isLoggable(Level.DEBUG)) {
   176         if (debug.on()) {
   177             id = sendCounter.incrementAndGet();
   177             id = sendCounter.incrementAndGet();
   178             debug.log(Level.DEBUG, "enter send text %s payload length=%s last=%s",
   178             debug.log("enter send text %s payload length=%s last=%s",
   179                       id, message.length(), last);
   179                       id, message.length(), last);
   180         }
   180         }
   181         CompletableFuture<WebSocket> result;
   181         CompletableFuture<WebSocket> result;
   182         if (!setPendingTextOrBinary()) {
   182         if (!setPendingTextOrBinary()) {
   183             result = failedFuture(new IllegalStateException("Send pending"));
   183             result = failedFuture(new IllegalStateException("Send pending"));
   184         } else {
   184         } else {
   185             result = transport.sendText(message, last, this,
   185             result = transport.sendText(message, last, this,
   186                                         (r, e) -> clearPendingTextOrBinary());
   186                                         (r, e) -> clearPendingTextOrBinary());
   187         }
   187         }
   188         if (debug.isLoggable(Level.DEBUG)) {
   188         if (debug.on()) {
   189             debug.log(Level.DEBUG, "exit send text %s returned %s", id, result);
   189             debug.log("exit send text %s returned %s", id, result);
   190         }
   190         }
   191 
   191 
   192         return replaceNull(result);
   192         return replaceNull(result);
   193     }
   193     }
   194 
   194 
   195     @Override
   195     @Override
   196     public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
   196     public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
   197                                                    boolean last) {
   197                                                    boolean last) {
   198         Objects.requireNonNull(message);
   198         Objects.requireNonNull(message);
   199         long id = 0;
   199         long id = 0;
   200         if (debug.isLoggable(Level.DEBUG)) {
   200         if (debug.on()) {
   201             id = sendCounter.incrementAndGet();
   201             id = sendCounter.incrementAndGet();
   202             debug.log(Level.DEBUG, "enter send binary %s payload=%s last=%s",
   202             debug.log("enter send binary %s payload=%s last=%s",
   203                       id, message, last);
   203                       id, message, last);
   204         }
   204         }
   205         CompletableFuture<WebSocket> result;
   205         CompletableFuture<WebSocket> result;
   206         if (!setPendingTextOrBinary()) {
   206         if (!setPendingTextOrBinary()) {
   207             result = failedFuture(new IllegalStateException("Send pending"));
   207             result = failedFuture(new IllegalStateException("Send pending"));
   208         } else {
   208         } else {
   209             result = transport.sendBinary(message, last, this,
   209             result = transport.sendBinary(message, last, this,
   210                                           (r, e) -> clearPendingTextOrBinary());
   210                                           (r, e) -> clearPendingTextOrBinary());
   211         }
   211         }
   212         if (debug.isLoggable(Level.DEBUG)) {
   212         if (debug.on()) {
   213             debug.log(Level.DEBUG, "exit send binary %s returned %s", id, result);
   213             debug.log("exit send binary %s returned %s", id, result);
   214         }
   214         }
   215         return replaceNull(result);
   215         return replaceNull(result);
   216     }
   216     }
   217 
   217 
   218     private void clearPendingTextOrBinary() {
   218     private void clearPendingTextOrBinary() {
   235 
   235 
   236     @Override
   236     @Override
   237     public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
   237     public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
   238         Objects.requireNonNull(message);
   238         Objects.requireNonNull(message);
   239         long id = 0;
   239         long id = 0;
   240         if (debug.isLoggable(Level.DEBUG)) {
   240         if (debug.on()) {
   241             id = sendCounter.incrementAndGet();
   241             id = sendCounter.incrementAndGet();
   242             debug.log(Level.DEBUG, "enter send ping %s payload=%s", id, message);
   242             debug.log("enter send ping %s payload=%s", id, message);
   243         }
   243         }
   244         CompletableFuture<WebSocket> result;
   244         CompletableFuture<WebSocket> result;
   245         if (!setPendingPingOrPong()) {
   245         if (!setPendingPingOrPong()) {
   246             result = failedFuture(new IllegalStateException("Send pending"));
   246             result = failedFuture(new IllegalStateException("Send pending"));
   247         } else {
   247         } else {
   248             result = transport.sendPing(message, this,
   248             result = transport.sendPing(message, this,
   249                                         (r, e) -> clearPendingPingOrPong());
   249                                         (r, e) -> clearPendingPingOrPong());
   250         }
   250         }
   251         if (debug.isLoggable(Level.DEBUG)) {
   251         if (debug.on()) {
   252             debug.log(Level.DEBUG, "exit send ping %s returned %s", id, result);
   252             debug.log("exit send ping %s returned %s", id, result);
   253         }
   253         }
   254         return replaceNull(result);
   254         return replaceNull(result);
   255     }
   255     }
   256 
   256 
   257     @Override
   257     @Override
   258     public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
   258     public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
   259         Objects.requireNonNull(message);
   259         Objects.requireNonNull(message);
   260         long id = 0;
   260         long id = 0;
   261         if (debug.isLoggable(Level.DEBUG)) {
   261         if (debug.on()) {
   262             id = sendCounter.incrementAndGet();
   262             id = sendCounter.incrementAndGet();
   263             debug.log(Level.DEBUG, "enter send pong %s payload=%s", id, message);
   263             debug.log("enter send pong %s payload=%s", id, message);
   264         }
   264         }
   265         CompletableFuture<WebSocket> result;
   265         CompletableFuture<WebSocket> result;
   266         if (!setPendingPingOrPong()) {
   266         if (!setPendingPingOrPong()) {
   267             result = failedFuture(new IllegalStateException("Send pending"));
   267             result = failedFuture(new IllegalStateException("Send pending"));
   268         } else {
   268         } else {
   269             result =  transport.sendPong(message, this,
   269             result =  transport.sendPong(message, this,
   270                                          (r, e) -> clearPendingPingOrPong());
   270                                          (r, e) -> clearPendingPingOrPong());
   271         }
   271         }
   272         if (debug.isLoggable(Level.DEBUG)) {
   272         if (debug.on()) {
   273             debug.log(Level.DEBUG, "exit send pong %s returned %s", id, result);
   273             debug.log("exit send pong %s returned %s", id, result);
   274         }
   274         }
   275         return replaceNull(result);
   275         return replaceNull(result);
   276     }
   276     }
   277 
   277 
   278     private boolean setPendingPingOrPong() {
   278     private boolean setPendingPingOrPong() {
   286     @Override
   286     @Override
   287     public CompletableFuture<WebSocket> sendClose(int statusCode,
   287     public CompletableFuture<WebSocket> sendClose(int statusCode,
   288                                                   String reason) {
   288                                                   String reason) {
   289         Objects.requireNonNull(reason);
   289         Objects.requireNonNull(reason);
   290         long id = 0;
   290         long id = 0;
   291         if (debug.isLoggable(Level.DEBUG)) {
   291         if (debug.on()) {
   292             id = sendCounter.incrementAndGet();
   292             id = sendCounter.incrementAndGet();
   293             debug.log(Level.DEBUG,
   293             debug.log("enter send close %s statusCode=%s reason.length=%s",
   294                       "enter send close %s statusCode=%s reason.length=%s",
       
   295                       id, statusCode, reason.length());
   294                       id, statusCode, reason.length());
   296         }
   295         }
   297         CompletableFuture<WebSocket> result;
   296         CompletableFuture<WebSocket> result;
   298         // Close message is the only type of message whose validity is checked
   297         // Close message is the only type of message whose validity is checked
   299         // in the corresponding send method. This is made in order to close the
   298         // in the corresponding send method. This is made in order to close the
   306         } else if (!outputClosed.compareAndSet(false, true)){
   305         } else if (!outputClosed.compareAndSet(false, true)){
   307             result = failedFuture(new IOException("Output closed"));
   306             result = failedFuture(new IOException("Output closed"));
   308         } else {
   307         } else {
   309             result = sendClose0(statusCode, reason);
   308             result = sendClose0(statusCode, reason);
   310         }
   309         }
   311         if (debug.isLoggable(Level.DEBUG)) {
   310         if (debug.on()) {
   312             debug.log(Level.DEBUG, "exit send close %s returned %s", id, result);
   311             debug.log("exit send close %s returned %s", id, result);
   313         }
   312         }
   314         return replaceNull(result);
   313         return replaceNull(result);
   315     }
   314     }
   316 
   315 
   317     private static boolean isLegalReason(String reason) {
   316     private static boolean isLegalReason(String reason) {
   340                                    (r, e) -> processCloseError(e));
   339                                    (r, e) -> processCloseError(e));
   341     }
   340     }
   342 
   341 
   343     private void processCloseError(Throwable e) {
   342     private void processCloseError(Throwable e) {
   344         if (e == null) {
   343         if (e == null) {
   345             debug.log(Level.DEBUG, "send close completed successfully");
   344             debug.log("send close completed successfully");
   346         } else {
   345         } else {
   347             debug.log(Level.DEBUG, "send close completed with error", e);
   346             debug.log("send close completed with error", e);
   348         }
   347         }
   349         outputClosed.set(true);
   348         outputClosed.set(true);
   350         try {
   349         try {
   351             transport.closeOutput();
   350             transport.closeOutput();
   352         } catch (IOException ignored) { }
   351         } catch (IOException ignored) { }
   353     }
   352     }
   354 
   353 
   355     @Override
   354     @Override
   356     public void request(long n) {
   355     public void request(long n) {
   357         if (debug.isLoggable(Level.DEBUG)) {
   356         if (debug.on()) {
   358             debug.log(Level.DEBUG, "request %s", n);
   357             debug.log("request %s", n);
   359         }
   358         }
   360         if (demand.increase(n)) {
   359         if (demand.increase(n)) {
   361             receiveScheduler.runOrSchedule();
   360             receiveScheduler.runOrSchedule();
   362         }
   361         }
   363     }
   362     }
   377         return inputClosed;
   376         return inputClosed;
   378     }
   377     }
   379 
   378 
   380     @Override
   379     @Override
   381     public void abort() {
   380     public void abort() {
   382         if (debug.isLoggable(Level.DEBUG)) {
   381         if (debug.on()) {
   383             debug.log(Level.DEBUG, "abort");
   382             debug.log("abort");
   384         }
   383         }
   385         inputClosed = true;
   384         inputClosed = true;
   386         outputClosed.set(true);
   385         outputClosed.set(true);
   387         receiveScheduler.stop();
   386         receiveScheduler.stop();
   388         close();
   387         close();
   416         // onOpen is invoked first and no messages become pending before onOpen
   415         // onOpen is invoked first and no messages become pending before onOpen
   417         // finishes
   416         // finishes
   418 
   417 
   419         @Override
   418         @Override
   420         public void run() {
   419         public void run() {
   421             if (debug.isLoggable(Level.DEBUG)) {
   420             if (debug.on()) {
   422                 debug.log(Level.DEBUG, "enter receive task");
   421                 debug.log("enter receive task");
   423             }
   422             }
   424             loop:
   423             loop:
   425             while (!receiveScheduler.isStopped()) {
   424             while (!receiveScheduler.isStopped()) {
   426                 State s = state.get();
   425                 State s = state.get();
   427                 if (debug.isLoggable(Level.DEBUG)) {
   426                 if (debug.on()) {
   428                     debug.log(Level.DEBUG, "receive state: %s", s);
   427                     debug.log("receive state: %s", s);
   429                 }
   428                 }
   430                 try {
   429                 try {
   431                     switch (s) {
   430                     switch (s) {
   432                         case OPEN:
   431                         case OPEN:
   433                             processOpen();
   432                             processOpen();
   470                     }
   469                     }
   471                 } catch (Throwable t) {
   470                 } catch (Throwable t) {
   472                     signalError(t);
   471                     signalError(t);
   473                 }
   472                 }
   474             }
   473             }
   475             if (debug.isLoggable(Level.DEBUG)) {
   474             if (debug.on()) {
   476                 debug.log(Level.DEBUG, "exit receive task");
   475                 debug.log("exit receive task");
   477             }
   476             }
   478         }
   477         }
   479 
   478 
   480         private void processError() throws IOException {
   479         private void processError() throws IOException {
   481             if (debug.isLoggable(Level.DEBUG)) {
   480             if (debug.on()) {
   482                 debug.log(Level.DEBUG, "processError");
   481                 debug.log("processError");
   483             }
   482             }
   484             transport.closeInput();
   483             transport.closeInput();
   485             receiveScheduler.stop();
   484             receiveScheduler.stop();
   486             Throwable err = error.get();
   485             Throwable err = error.get();
   487             if (err instanceof FailWebSocketException) {
   486             if (err instanceof FailWebSocketException) {
   488                 int code1 = ((FailWebSocketException) err).getStatusCode();
   487                 int code1 = ((FailWebSocketException) err).getStatusCode();
   489                 err = new ProtocolException().initCause(err);
   488                 err = new ProtocolException().initCause(err);
   490                 if (debug.isLoggable(Level.DEBUG)) {
   489                 if (debug.on()) {
   491                     debug.log(Level.DEBUG, "failing %s with error=%s statusCode=%s",
   490                     debug.log("failing %s with error=%s statusCode=%s",
   492                               WebSocketImpl.this, err, code1);
   491                               WebSocketImpl.this, err, code1);
   493                 }
   492                 }
   494                 sendCloseSilently(code1);
   493                 sendCloseSilently(code1);
   495             }
   494             }
   496             long id = 0;
   495             long id = 0;
   497             if (debug.isLoggable(Level.DEBUG)) {
   496             if (debug.on()) {
   498                 id = receiveCounter.incrementAndGet();
   497                 id = receiveCounter.incrementAndGet();
   499                 debug.log(Level.DEBUG, "enter onError %s error=%s", id, err);
   498                 debug.log("enter onError %s error=%s", id, err);
   500             }
   499             }
   501             try {
   500             try {
   502                 listener.onError(WebSocketImpl.this, err);
   501                 listener.onError(WebSocketImpl.this, err);
   503             } finally {
   502             } finally {
   504                 if (debug.isLoggable(Level.DEBUG)) {
   503                 if (debug.on()) {
   505                     debug.log(Level.DEBUG, "exit onError %s", id);
   504                     debug.log("exit onError %s", id);
   506                 }
   505                 }
   507             }
   506             }
   508         }
   507         }
   509 
   508 
   510         private void processClose() throws IOException {
   509         private void processClose() throws IOException {
   511             debug.log(Level.DEBUG, "processClose");
   510             debug.log("processClose");
   512             transport.closeInput();
   511             transport.closeInput();
   513             receiveScheduler.stop();
   512             receiveScheduler.stop();
   514             CompletionStage<?> cs = null; // when the listener is ready to close
   513             CompletionStage<?> cs = null; // when the listener is ready to close
   515             long id = 0;
   514             long id = 0;
   516             if (debug.isLoggable(Level.DEBUG)) {
   515             if (debug.on()) {
   517                 id = receiveCounter.incrementAndGet();
   516                 id = receiveCounter.incrementAndGet();
   518                 debug.log(Level.DEBUG,
   517                 debug.log("enter onClose %s statusCode=%s reason.length=%s",
   519                           "enter onClose %s statusCode=%s reason.length=%s",
       
   520                           id, statusCode, reason.length());
   518                           id, statusCode, reason.length());
   521             }
   519             }
   522             try {
   520             try {
   523                 cs = listener.onClose(WebSocketImpl.this, statusCode, reason);
   521                 cs = listener.onClose(WebSocketImpl.this, statusCode, reason);
   524             } finally {
   522             } finally {
   525                 debug.log(Level.DEBUG, "exit onClose %s returned %s", id, cs);
   523                 debug.log("exit onClose %s returned %s", id, cs);
   526             }
   524             }
   527             if (cs == null) {
   525             if (cs == null) {
   528                 cs = DONE;
   526                 cs = DONE;
   529             }
   527             }
   530             int code;
   528             int code;
   531             if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
   529             if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
   532                 code = NORMAL_CLOSURE;
   530                 code = NORMAL_CLOSURE;
   533                 debug.log(Level.DEBUG, "using statusCode %s instead of %s",
   531                 debug.log("using statusCode %s instead of %s",
   534                           statusCode, code);
   532                           statusCode, code);
   535 
   533 
   536             } else {
   534             } else {
   537                 code = statusCode;
   535                 code = statusCode;
   538             }
   536             }
   539             cs.whenComplete((r, e) -> {
   537             cs.whenComplete((r, e) -> {
   540                 if (debug.isLoggable(Level.DEBUG)) {
   538                 if (debug.on()) {
   541                     debug.log(Level.DEBUG,
   539                     debug.log("CompletionStage returned by onClose completed result=%s error=%s",
   542                               "CompletionStage returned by onClose completed result=%s error=%s",
       
   543                               r, e);
   540                               r, e);
   544                 }
   541                 }
   545                 sendCloseSilently(code);
   542                 sendCloseSilently(code);
   546             });
   543             });
   547         }
   544         }
   548 
   545 
   549         private void processPong() {
   546         private void processPong() {
   550             long id = 0;
   547             long id = 0;
   551             if (debug.isLoggable(Level.DEBUG)) {
   548             if (debug.on()) {
   552                 id = receiveCounter.incrementAndGet();
   549                 id = receiveCounter.incrementAndGet();
   553                 debug.log(Level.DEBUG, "enter onPong %s payload=%s",
   550                 debug.log("enter onPong %s payload=%s",
   554                           id, binaryData);
   551                           id, binaryData);
   555             }
   552             }
   556             CompletionStage<?> cs = null;
   553             CompletionStage<?> cs = null;
   557             try {
   554             try {
   558                 cs = listener.onPong(WebSocketImpl.this, binaryData);
   555                 cs = listener.onPong(WebSocketImpl.this, binaryData);
   559             } finally {
   556             } finally {
   560                 if (debug.isLoggable(Level.DEBUG)) {
   557                 if (debug.on()) {
   561                     debug.log(Level.DEBUG, "exit onPong %s returned %s", id, cs);
   558                     debug.log("exit onPong %s returned %s", id, cs);
   562                 }
   559                 }
   563             }
   560             }
   564         }
   561         }
   565 
   562 
   566         private void processPing() {
   563         private void processPing() {
   567             if (debug.isLoggable(Level.DEBUG)) {
   564             if (debug.on()) {
   568                 debug.log(Level.DEBUG, "processPing");
   565                 debug.log("processPing");
   569             }
   566             }
   570             // A full copy of this (small) data is made. This way sending a
   567             // A full copy of this (small) data is made. This way sending a
   571             // replying Pong could be done in parallel with the listener
   568             // replying Pong could be done in parallel with the listener
   572             // handling this Ping.
   569             // handling this Ping.
   573             ByteBuffer slice = binaryData.slice();
   570             ByteBuffer slice = binaryData.slice();
   586                                        WebSocketImpl.this,
   583                                        WebSocketImpl.this,
   587                                        reporter);
   584                                        reporter);
   588                 }
   585                 }
   589             }
   586             }
   590             long id = 0;
   587             long id = 0;
   591             if (debug.isLoggable(Level.DEBUG)) {
   588             if (debug.on()) {
   592                 id = receiveCounter.incrementAndGet();
   589                 id = receiveCounter.incrementAndGet();
   593                 debug.log(Level.DEBUG, "enter onPing %s payload=%s", id, slice);
   590                 debug.log("enter onPing %s payload=%s", id, slice);
   594             }
   591             }
   595             CompletionStage<?> cs = null;
   592             CompletionStage<?> cs = null;
   596             try {
   593             try {
   597                 cs = listener.onPing(WebSocketImpl.this, slice);
   594                 cs = listener.onPing(WebSocketImpl.this, slice);
   598             } finally {
   595             } finally {
   599                 if (debug.isLoggable(Level.DEBUG)) {
   596                 if (debug.on()) {
   600                     debug.log(Level.DEBUG, "exit onPing %s returned %s", id, cs);
   597                     debug.log("exit onPing %s returned %s", id, cs);
   601                 }
   598                 }
   602             }
   599             }
   603         }
   600         }
   604 
   601 
   605         private void processBinary() {
   602         private void processBinary() {
   606             long id = 0;
   603             long id = 0;
   607             if (debug.isLoggable(Level.DEBUG)) {
   604             if (debug.on()) {
   608                 id = receiveCounter.incrementAndGet();
   605                 id = receiveCounter.incrementAndGet();
   609                 debug.log(Level.DEBUG, "enter onBinary %s payload=%s last=%s",
   606                 debug.log("enter onBinary %s payload=%s last=%s",
   610                           id, binaryData, last);
   607                           id, binaryData, last);
   611             }
   608             }
   612             CompletionStage<?> cs = null;
   609             CompletionStage<?> cs = null;
   613             try {
   610             try {
   614                 cs = listener.onBinary(WebSocketImpl.this, binaryData, last);
   611                 cs = listener.onBinary(WebSocketImpl.this, binaryData, last);
   615             } finally {
   612             } finally {
   616                 if (debug.isLoggable(Level.DEBUG)) {
   613                 if (debug.on()) {
   617                     debug.log(Level.DEBUG, "exit onBinary %s returned %s", id, cs);
   614                     debug.log("exit onBinary %s returned %s", id, cs);
   618                 }
   615                 }
   619             }
   616             }
   620         }
   617         }
   621 
   618 
   622         private void processText() {
   619         private void processText() {
   623             long id = 0;
   620             long id = 0;
   624             if (debug.isLoggable(Level.DEBUG)) {
   621             if (debug.on()) {
   625                 id = receiveCounter.incrementAndGet();
   622                 id = receiveCounter.incrementAndGet();
   626                 debug.log(Level.DEBUG,
   623                 debug.log("enter onText %s payload.length=%s last=%s",
   627                           "enter onText %s payload.length=%s last=%s",
       
   628                           id, text.length(), last);
   624                           id, text.length(), last);
   629             }
   625             }
   630             CompletionStage<?> cs = null;
   626             CompletionStage<?> cs = null;
   631             try {
   627             try {
   632                 cs = listener.onText(WebSocketImpl.this, text, last);
   628                 cs = listener.onText(WebSocketImpl.this, text, last);
   633             } finally {
   629             } finally {
   634                 if (debug.isLoggable(Level.DEBUG)) {
   630                 if (debug.on()) {
   635                     debug.log(Level.DEBUG, "exit onText %s returned %s", id, cs);
   631                     debug.log("exit onText %s returned %s", id, cs);
   636                 }
   632                 }
   637             }
   633             }
   638         }
   634         }
   639 
   635 
   640         private void processOpen() {
   636         private void processOpen() {
   641             long id = 0;
   637             long id = 0;
   642             if (debug.isLoggable(Level.DEBUG)) {
   638             if (debug.on()) {
   643                 id = receiveCounter.incrementAndGet();
   639                 id = receiveCounter.incrementAndGet();
   644                 debug.log(Level.DEBUG, "enter onOpen %s", id);
   640                 debug.log("enter onOpen %s", id);
   645             }
   641             }
   646             try {
   642             try {
   647                 listener.onOpen(WebSocketImpl.this);
   643                 listener.onOpen(WebSocketImpl.this);
   648             } finally {
   644             } finally {
   649                 if (debug.isLoggable(Level.DEBUG)) {
   645                 if (debug.on()) {
   650                     debug.log(Level.DEBUG, "exit onOpen %s", id);
   646                     debug.log("exit onOpen %s", id);
   651                 }
   647                 }
   652             }
   648             }
   653         }
   649         }
   654     }
   650     }
   655 
   651 
   656     private void sendCloseSilently(int statusCode) {
   652     private void sendCloseSilently(int statusCode) {
   657         sendClose0(statusCode, "").whenComplete((r, e) -> {
   653         sendClose0(statusCode, "").whenComplete((r, e) -> {
   658             if (e != null) {
   654             if (e != null) {
   659                 if (debug.isLoggable(Level.DEBUG)) {
   655                 if (debug.on()) {
   660                     debug.log(Level.DEBUG, "automatic closure completed with error",
   656                     debug.log("automatic closure completed with error",
   661                               (Object) e);
   657                               (Object) e);
   662                 }
   658                 }
   663             }
   659             }
   664         });
   660         });
   665     }
   661     }
   696             } else if (lastAutomaticPong.compareAndSet(message, copy)) {
   692             } else if (lastAutomaticPong.compareAndSet(message, copy)) {
   697                 swapped = true;
   693                 swapped = true;
   698                 break;
   694                 break;
   699             }
   695             }
   700         }
   696         }
   701         if (debug.isLoggable(Level.DEBUG)) {
   697         if (debug.on()) {
   702             debug.log(Level.DEBUG, "swapped automatic pong from %s to %s",
   698             debug.log("swapped automatic pong from %s to %s",
   703                       message, copy);
   699                       message, copy);
   704         }
   700         }
   705         return swapped;
   701         return swapped;
   706     }
   702     }
   707 
   703 
   708     private void signalOpen() {
   704     private void signalOpen() {
   709         debug.log(Level.DEBUG, "signalOpen");
   705         debug.log("signalOpen");
   710         receiveScheduler.runOrSchedule();
   706         receiveScheduler.runOrSchedule();
   711     }
   707     }
   712 
   708 
   713     private void signalError(Throwable error) {
   709     private void signalError(Throwable error) {
   714         if (debug.isLoggable(Level.DEBUG)) {
   710         if (debug.on()) {
   715             debug.log(Level.DEBUG, "signalError %s", (Object) error);
   711             debug.log("signalError %s", (Object) error);
   716         }
   712         }
   717         inputClosed = true;
   713         inputClosed = true;
   718         outputClosed.set(true);
   714         outputClosed.set(true);
   719         if (!this.error.compareAndSet(null, error) || !trySetState(ERROR)) {
   715         if (!this.error.compareAndSet(null, error) || !trySetState(ERROR)) {
   720             if (debug.isLoggable(Level.DEBUG)) {
   716             if (debug.on()) {
   721                 debug.log(Level.DEBUG, "signalError", error);
   717                 debug.log("signalError", error);
   722             }
   718             }
   723             Log.logError(error);
   719             Log.logError(error);
   724         } else {
   720         } else {
   725             close();
   721             close();
   726         }
   722         }
   727     }
   723     }
   728 
   724 
   729     private void close() {
   725     private void close() {
   730         if (debug.isLoggable(Level.DEBUG)) {
   726         if (debug.on()) {
   731             debug.log(Level.DEBUG, "close");
   727             debug.log("close");
   732         }
   728         }
   733         Throwable first = null;
   729         Throwable first = null;
   734         try {
   730         try {
   735             transport.closeInput();
   731             transport.closeInput();
   736         } catch (Throwable t1) {
   732         } catch (Throwable t1) {
   750                     e = first;
   746                     e = first;
   751                 } else if (second != null) {
   747                 } else if (second != null) {
   752                     e = second;
   748                     e = second;
   753                 }
   749                 }
   754                 if (e != null) {
   750                 if (e != null) {
   755                     if (debug.isLoggable(Level.DEBUG)) {
   751                     if (debug.on()) {
   756                         debug.log(Level.DEBUG, "exception in close", e);
   752                         debug.log("exception in close", e);
   757                     }
   753                     }
   758                 }
   754                 }
   759             }
   755             }
   760         }
   756         }
   761     }
   757     }
   764         // FIXME: make sure no race reason & close are not intermixed
   760         // FIXME: make sure no race reason & close are not intermixed
   765         inputClosed = true;
   761         inputClosed = true;
   766         this.statusCode = statusCode;
   762         this.statusCode = statusCode;
   767         this.reason = reason;
   763         this.reason = reason;
   768         boolean managed = trySetState(CLOSE);
   764         boolean managed = trySetState(CLOSE);
   769         if (debug.isLoggable(Level.DEBUG)) {
   765         if (debug.on()) {
   770             debug.log(Level.DEBUG,
   766             debug.log("signalClose statusCode=%s reason.length=%s: %s",
   771                       "signalClose statusCode=%s reason.length=%s: %s",
       
   772                       statusCode, reason.length(), managed);
   767                       statusCode, reason.length(), managed);
   773         }
   768         }
   774         if (managed) {
   769         if (managed) {
   775             try {
   770             try {
   776                 transport.closeInput();
   771                 transport.closeInput();
   777             } catch (Throwable t) {
   772             } catch (Throwable t) {
   778                 if (debug.isLoggable(Level.DEBUG)) {
   773                 if (debug.on()) {
   779                     debug.log(Level.DEBUG, "exception closing input", (Object) t);
   774                     debug.log("exception closing input", (Object) t);
   780                 }
   775                 }
   781             }
   776             }
   782         }
   777         }
   783     }
   778     }
   784 
   779 
   843                 receiveScheduler.runOrSchedule();
   838                 receiveScheduler.runOrSchedule();
   844                 success = true;
   839                 success = true;
   845                 break;
   840                 break;
   846             }
   841             }
   847         }
   842         }
   848         if (debug.isLoggable(Level.DEBUG)) {
   843         if (debug.on()) {
   849             debug.log(Level.DEBUG, "set state %s (previous %s) %s",
   844             debug.log("set state %s (previous %s) %s",
   850                       newState, currentState, success);
   845                       newState, currentState, success);
   851         }
   846         }
   852         return success;
   847         return success;
   853     }
   848     }
   854 
   849 
   861         } else if (witness != ERROR && witness != CLOSE) {
   856         } else if (witness != ERROR && witness != CLOSE) {
   862             // This should be the only reason for inability to change the state
   857             // This should be the only reason for inability to change the state
   863             // from IDLE to WAITING: the state has changed to terminal
   858             // from IDLE to WAITING: the state has changed to terminal
   864             throw new InternalError();
   859             throw new InternalError();
   865         }
   860         }
   866         if (debug.isLoggable(Level.DEBUG)) {
   861         if (debug.on()) {
   867             debug.log(Level.DEBUG, "change state from %s to %s %s",
   862             debug.log("change state from %s to %s %s",
   868                       expectedState, newState, success);
   863                       expectedState, newState, success);
   869         }
   864         }
   870         return success;
   865         return success;
   871     }
   866     }
   872 
   867