src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java
branchhttp-client-branch
changeset 56089 42208b2f224e
parent 56088 38fac6d0521d
child 56090 5c7fb702948a
equal deleted inserted replaced
56088:38fac6d0521d 56089:42208b2f224e
     1 /*
       
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http.internal.websocket;
       
    27 
       
    28 import jdk.incubator.http.WebSocket;
       
    29 import jdk.incubator.http.internal.common.Demand;
       
    30 import jdk.incubator.http.internal.common.Log;
       
    31 import jdk.incubator.http.internal.common.MinimalFuture;
       
    32 import jdk.incubator.http.internal.common.SequentialScheduler;
       
    33 import jdk.incubator.http.internal.common.Utils;
       
    34 import jdk.incubator.http.internal.websocket.OpeningHandshake.Result;
       
    35 
       
    36 import java.io.IOException;
       
    37 import java.lang.ref.Reference;
       
    38 import java.net.ProtocolException;
       
    39 import java.net.URI;
       
    40 import java.nio.ByteBuffer;
       
    41 import java.util.Objects;
       
    42 import java.util.concurrent.CompletableFuture;
       
    43 import java.util.concurrent.CompletionStage;
       
    44 import java.util.concurrent.TimeoutException;
       
    45 import java.util.concurrent.atomic.AtomicBoolean;
       
    46 import java.util.concurrent.atomic.AtomicReference;
       
    47 import java.util.function.Function;
       
    48 
       
    49 import static java.util.Objects.requireNonNull;
       
    50 import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
       
    51 import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
       
    52 import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
       
    53 import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
       
    54 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.BINARY;
       
    55 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.CLOSE;
       
    56 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.ERROR;
       
    57 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.IDLE;
       
    58 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.OPEN;
       
    59 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.PING;
       
    60 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.PONG;
       
    61 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.TEXT;
       
    62 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.WAITING;
       
    63 
       
    64 /*
       
    65  * A WebSocket client.
       
    66  */
       
    67 public final class WebSocketImpl implements WebSocket {
       
    68 
       
    69     enum State {
       
    70         OPEN,
       
    71         IDLE,
       
    72         WAITING,
       
    73         TEXT,
       
    74         BINARY,
       
    75         PING,
       
    76         PONG,
       
    77         CLOSE,
       
    78         ERROR;
       
    79     }
       
    80 
       
    81     private volatile boolean inputClosed;
       
    82     private volatile boolean outputClosed;
       
    83 
       
    84     private final AtomicReference<State> state = new AtomicReference<>(OPEN);
       
    85 
       
    86     /* Components of calls to Listener's methods */
       
    87     private MessagePart part;
       
    88     private ByteBuffer binaryData;
       
    89     private CharSequence text;
       
    90     private int statusCode;
       
    91     private String reason;
       
    92     private final AtomicReference<Throwable> error = new AtomicReference<>();
       
    93 
       
    94     private final URI uri;
       
    95     private final String subprotocol;
       
    96     private final Listener listener;
       
    97 
       
    98     private final AtomicBoolean outstandingSend = new AtomicBoolean();
       
    99     private final Transport<WebSocket> transport;
       
   100     private final SequentialScheduler receiveScheduler = new SequentialScheduler(new ReceiveTask());
       
   101     private final Demand demand = new Demand();
       
   102 
       
   103     public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
       
   104         Function<Result, WebSocket> newWebSocket = r -> {
       
   105             WebSocket ws = newInstance(b.getUri(),
       
   106                                        r.subprotocol,
       
   107                                        b.getListener(),
       
   108                                        r.transport);
       
   109             // Make sure we don't release the builder until this lambda
       
   110             // has been executed. The builder has a strong reference to
       
   111             // the HttpClientFacade, and we want to keep that live until
       
   112             // after the raw channel is created and passed to WebSocketImpl.
       
   113             Reference.reachabilityFence(b);
       
   114             return ws;
       
   115         };
       
   116         OpeningHandshake h;
       
   117         try {
       
   118             h = new OpeningHandshake(b);
       
   119         } catch (Throwable e) {
       
   120             return failedFuture(e);
       
   121         }
       
   122         return h.send().thenApply(newWebSocket);
       
   123     }
       
   124 
       
   125     /* Exposed for testing purposes */
       
   126     static WebSocketImpl newInstance(URI uri,
       
   127                                      String subprotocol,
       
   128                                      Listener listener,
       
   129                                      TransportFactory transport) {
       
   130         WebSocketImpl ws = new WebSocketImpl(uri, subprotocol, listener, transport);
       
   131         // This initialisation is outside of the constructor for the sake of
       
   132         // safe publication of WebSocketImpl.this
       
   133         ws.signalOpen();
       
   134         return ws;
       
   135     }
       
   136 
       
   137     private WebSocketImpl(URI uri,
       
   138                           String subprotocol,
       
   139                           Listener listener,
       
   140                           TransportFactory transportFactory) {
       
   141         this.uri = requireNonNull(uri);
       
   142         this.subprotocol = requireNonNull(subprotocol);
       
   143         this.listener = requireNonNull(listener);
       
   144         this.transport = transportFactory.createTransport(
       
   145                 () -> WebSocketImpl.this, // What about escape of WebSocketImpl.this?
       
   146                 new SignallingMessageConsumer());
       
   147     }
       
   148 
       
   149     @Override
       
   150     public CompletableFuture<WebSocket> sendText(CharSequence message,
       
   151                                                  boolean isLast) {
       
   152         Objects.requireNonNull(message);
       
   153         if (!outstandingSend.compareAndSet(false, true)) {
       
   154             return failedFuture(new IllegalStateException("Send pending"));
       
   155         }
       
   156         CompletableFuture<WebSocket> cf = transport.sendText(message, isLast);
       
   157         return cf.whenComplete((r, e) -> outstandingSend.set(false));
       
   158     }
       
   159 
       
   160     @Override
       
   161     public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
       
   162                                                    boolean isLast) {
       
   163         Objects.requireNonNull(message);
       
   164         if (!outstandingSend.compareAndSet(false, true)) {
       
   165             return failedFuture(new IllegalStateException("Send pending"));
       
   166         }
       
   167         CompletableFuture<WebSocket> cf = transport.sendBinary(message, isLast);
       
   168         // Optimize?
       
   169         //     if (cf.isDone()) {
       
   170         //         outstandingSend.set(false);
       
   171         //     } else {
       
   172         //         cf.whenComplete((r, e) -> outstandingSend.set(false));
       
   173         //     }
       
   174         return cf.whenComplete((r, e) -> outstandingSend.set(false));
       
   175     }
       
   176 
       
   177     @Override
       
   178     public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
       
   179         return transport.sendPing(message);
       
   180     }
       
   181 
       
   182     @Override
       
   183     public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
       
   184         return transport.sendPong(message);
       
   185     }
       
   186 
       
   187     @Override
       
   188     public CompletableFuture<WebSocket> sendClose(int statusCode, String reason) {
       
   189         Objects.requireNonNull(reason);
       
   190         if (!isLegalToSendFromClient(statusCode)) {
       
   191             return failedFuture(new IllegalArgumentException("statusCode"));
       
   192         }
       
   193         return sendClose0(statusCode, reason);
       
   194     }
       
   195 
       
   196     /*
       
   197      * Sends a Close message, then shuts down the output since no more
       
   198      * messages are expected to be sent after this.
       
   199      */
       
   200     private CompletableFuture<WebSocket> sendClose0(int statusCode, String reason ) {
       
   201         outputClosed = true;
       
   202         return transport.sendClose(statusCode, reason)
       
   203                 .whenComplete((result, error) -> {
       
   204                     try {
       
   205                         transport.closeOutput();
       
   206                     } catch (IOException e) {
       
   207                         Log.logError(e);
       
   208                     }
       
   209                     Throwable cause = Utils.getCompletionCause(error);
       
   210                     if (cause instanceof TimeoutException) {
       
   211                         try {
       
   212                             transport.closeInput();
       
   213                         } catch (IOException e) {
       
   214                             Log.logError(e);
       
   215                         }
       
   216                     }
       
   217                 });
       
   218     }
       
   219 
       
   220     @Override
       
   221     public void request(long n) {
       
   222         if (demand.increase(n)) {
       
   223             receiveScheduler.runOrSchedule();
       
   224         }
       
   225     }
       
   226 
       
   227     @Override
       
   228     public String getSubprotocol() {
       
   229         return subprotocol;
       
   230     }
       
   231 
       
   232     @Override
       
   233     public boolean isOutputClosed() {
       
   234         return outputClosed;
       
   235     }
       
   236 
       
   237     @Override
       
   238     public boolean isInputClosed() {
       
   239         return inputClosed;
       
   240     }
       
   241 
       
   242     @Override
       
   243     public void abort() {
       
   244         inputClosed = true;
       
   245         outputClosed = true;
       
   246         receiveScheduler.stop();
       
   247         close();
       
   248     }
       
   249 
       
   250     @Override
       
   251     public String toString() {
       
   252         return super.toString()
       
   253                 + "[uri=" + uri
       
   254                 + (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "")
       
   255                 + "]";
       
   256     }
       
   257 
       
   258     /*
       
   259      * The assumptions about order is as follows:
       
   260      *
       
   261      *     - state is never changed more than twice inside the `run` method:
       
   262      *       x --(1)--> IDLE --(2)--> y (otherwise we're loosing events, or
       
   263      *       overwriting parts of messages creating a mess since there's no
       
   264      *       queueing)
       
   265      *     - OPEN is always the first state
       
   266      *     - no messages are requested/delivered before onOpen is called (this
       
   267      *       is implemented by making WebSocket instance accessible first in
       
   268      *       onOpen)
       
   269      *     - after the state has been observed as CLOSE/ERROR, the scheduler
       
   270      *       is stopped
       
   271      */
       
   272     private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask {
       
   273 
       
   274         // Transport only asked here and nowhere else because we must make sure
       
   275         // onOpen is invoked first and no messages become pending before onOpen
       
   276         // finishes
       
   277 
       
   278         @Override
       
   279         public void run() {
       
   280             while (true) {
       
   281                 State s = state.get();
       
   282                 try {
       
   283                     switch (s) {
       
   284                         case OPEN:
       
   285                             processOpen();
       
   286                             tryChangeState(OPEN, IDLE);
       
   287                             break;
       
   288                         case TEXT:
       
   289                             processText();
       
   290                             tryChangeState(TEXT, IDLE);
       
   291                             break;
       
   292                         case BINARY:
       
   293                             processBinary();
       
   294                             tryChangeState(BINARY, IDLE);
       
   295                             break;
       
   296                         case PING:
       
   297                             processPing();
       
   298                             tryChangeState(PING, IDLE);
       
   299                             break;
       
   300                         case PONG:
       
   301                             processPong();
       
   302                             tryChangeState(PONG, IDLE);
       
   303                             break;
       
   304                         case CLOSE:
       
   305                             processClose();
       
   306                             return;
       
   307                         case ERROR:
       
   308                             processError();
       
   309                             return;
       
   310                         case IDLE:
       
   311                             if (demand.tryDecrement()
       
   312                                     && tryChangeState(IDLE, WAITING)) {
       
   313                                 transport.request(1);
       
   314                             }
       
   315                             return;
       
   316                         case WAITING:
       
   317                             // For debugging spurious signalling: when there was a
       
   318                             // signal, but apparently nothing has changed
       
   319                             return;
       
   320                         default:
       
   321                             throw new InternalError(String.valueOf(s));
       
   322                     }
       
   323                 } catch (Throwable t) {
       
   324                     signalError(t);
       
   325                 }
       
   326             }
       
   327         }
       
   328 
       
   329         private void processError() throws IOException {
       
   330             transport.closeInput();
       
   331             receiveScheduler.stop();
       
   332             Throwable err = error.get();
       
   333             if (err instanceof FailWebSocketException) {
       
   334                 int code1 = ((FailWebSocketException) err).getStatusCode();
       
   335                 err = new ProtocolException().initCause(err);
       
   336                 sendClose0(code1, "")
       
   337                         .whenComplete(
       
   338                                 (r, e) -> {
       
   339                                     if (e != null) {
       
   340                                         Log.logError(e);
       
   341                                     }
       
   342                                 });
       
   343             }
       
   344             listener.onError(WebSocketImpl.this, err);
       
   345         }
       
   346 
       
   347         private void processClose() throws IOException {
       
   348             transport.closeInput();
       
   349             receiveScheduler.stop();
       
   350             CompletionStage<?> readyToClose;
       
   351             readyToClose = listener.onClose(WebSocketImpl.this, statusCode, reason);
       
   352             if (readyToClose == null) {
       
   353                 readyToClose = MinimalFuture.completedFuture(null);
       
   354             }
       
   355             int code;
       
   356             if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
       
   357                 code = NORMAL_CLOSURE;
       
   358             } else {
       
   359                 code = statusCode;
       
   360             }
       
   361             readyToClose.whenComplete((r, e) -> {
       
   362                 sendClose0(code, "")
       
   363                         .whenComplete((r1, e1) -> {
       
   364                             if (e1 != null) {
       
   365                                 Log.logError(e1);
       
   366                             }
       
   367                         });
       
   368             });
       
   369         }
       
   370 
       
   371         private void processPong() {
       
   372             listener.onPong(WebSocketImpl.this, binaryData);
       
   373         }
       
   374 
       
   375         private void processPing() {
       
   376             // Let's make a full copy of this tiny data. What we want here
       
   377             // is to rule out a possibility the shared data we send might be
       
   378             // corrupted by processing in the listener.
       
   379             ByteBuffer slice = binaryData.slice();
       
   380             ByteBuffer copy = ByteBuffer.allocate(binaryData.remaining())
       
   381                     .put(binaryData)
       
   382                     .flip();
       
   383             // Non-exclusive send;
       
   384             CompletableFuture<WebSocket> pongSent = transport.sendPong(copy);
       
   385             pongSent.whenComplete(
       
   386                     (r, e) -> {
       
   387                         if (e != null) {
       
   388                             signalError(Utils.getCompletionCause(e));
       
   389                         }
       
   390                     }
       
   391             );
       
   392             listener.onPing(WebSocketImpl.this, slice);
       
   393         }
       
   394 
       
   395         private void processBinary() {
       
   396             listener.onBinary(WebSocketImpl.this, binaryData, part);
       
   397         }
       
   398 
       
   399         private void processText() {
       
   400             listener.onText(WebSocketImpl.this, text, part);
       
   401         }
       
   402 
       
   403         private void processOpen() {
       
   404             listener.onOpen(WebSocketImpl.this);
       
   405         }
       
   406     }
       
   407 
       
   408     private void signalOpen() {
       
   409         receiveScheduler.runOrSchedule();
       
   410     }
       
   411 
       
   412     private void signalError(Throwable error) {
       
   413         inputClosed = true;
       
   414         outputClosed = true;
       
   415         if (!this.error.compareAndSet(null, error) || !trySetState(ERROR)) {
       
   416             Log.logError(error);
       
   417         } else {
       
   418             close();
       
   419         }
       
   420     }
       
   421 
       
   422     private void close() {
       
   423         try {
       
   424             try {
       
   425                 transport.closeInput();
       
   426             } finally {
       
   427                 transport.closeOutput();
       
   428             }
       
   429         } catch (Throwable t) {
       
   430             Log.logError(t);
       
   431         }
       
   432     }
       
   433 
       
   434     /*
       
   435      * Signals a Close event (might not correspond to anything happened on the
       
   436      * channel, i.e. might be synthetic).
       
   437      */
       
   438     private void signalClose(int statusCode, String reason) {
       
   439         inputClosed = true;
       
   440         this.statusCode = statusCode;
       
   441         this.reason = reason;
       
   442         if (!trySetState(CLOSE)) {
       
   443             Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
       
   444         } else {
       
   445             try {
       
   446                 transport.closeInput();
       
   447             } catch (Throwable t) {
       
   448                 Log.logError(t);
       
   449             }
       
   450         }
       
   451     }
       
   452 
       
   453     private class SignallingMessageConsumer implements MessageStreamConsumer {
       
   454 
       
   455         @Override
       
   456         public void onText(CharSequence data, MessagePart part) {
       
   457             transport.acknowledgeReception();
       
   458             text = data;
       
   459             WebSocketImpl.this.part = part;
       
   460             tryChangeState(WAITING, TEXT);
       
   461         }
       
   462 
       
   463         @Override
       
   464         public void onBinary(ByteBuffer data, MessagePart part) {
       
   465             transport.acknowledgeReception();
       
   466             binaryData = data;
       
   467             WebSocketImpl.this.part = part;
       
   468             tryChangeState(WAITING, BINARY);
       
   469         }
       
   470 
       
   471         @Override
       
   472         public void onPing(ByteBuffer data) {
       
   473             transport.acknowledgeReception();
       
   474             binaryData = data;
       
   475             tryChangeState(WAITING, PING);
       
   476         }
       
   477 
       
   478         @Override
       
   479         public void onPong(ByteBuffer data) {
       
   480             transport.acknowledgeReception();
       
   481             binaryData = data;
       
   482             tryChangeState(WAITING, PONG);
       
   483         }
       
   484 
       
   485         @Override
       
   486         public void onClose(int statusCode, CharSequence reason) {
       
   487             transport.acknowledgeReception();
       
   488             signalClose(statusCode, reason.toString());
       
   489         }
       
   490 
       
   491         @Override
       
   492         public void onComplete() {
       
   493             transport.acknowledgeReception();
       
   494             signalClose(CLOSED_ABNORMALLY, "");
       
   495         }
       
   496 
       
   497         @Override
       
   498         public void onError(Throwable error) {
       
   499             signalError(error);
       
   500         }
       
   501     }
       
   502 
       
   503     private boolean trySetState(State newState) {
       
   504         while (true) {
       
   505             State currentState = state.get();
       
   506             if (currentState == ERROR || currentState == CLOSE) {
       
   507                 return false;
       
   508             } else if (state.compareAndSet(currentState, newState)) {
       
   509                 receiveScheduler.runOrSchedule();
       
   510                 return true;
       
   511             }
       
   512         }
       
   513     }
       
   514 
       
   515     private boolean tryChangeState(State expectedState, State newState) {
       
   516         State witness = state.compareAndExchange(expectedState, newState);
       
   517         if (witness == expectedState) {
       
   518             receiveScheduler.runOrSchedule();
       
   519             return true;
       
   520         }
       
   521         // This should be the only reason for inability to change the state from
       
   522         // IDLE to WAITING: the state has changed to terminal
       
   523         if (witness != ERROR && witness != CLOSE) {
       
   524             throw new InternalError();
       
   525         }
       
   526         return false;
       
   527     }
       
   528 
       
   529     /* Exposed for testing purposes */
       
   530     protected final Transport<WebSocket> transport() {
       
   531         return transport;
       
   532     }
       
   533 }