src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java
branchhttp-client-branch
changeset 55922 77feac3903d9
parent 55907 f6a3a657416e
child 55973 4d9b002587db
equal deleted inserted replaced
55912:dfa9489d1cb1 55922:77feac3903d9
    24  */
    24  */
    25 
    25 
    26 package jdk.incubator.http.internal.websocket;
    26 package jdk.incubator.http.internal.websocket;
    27 
    27 
    28 import jdk.incubator.http.WebSocket;
    28 import jdk.incubator.http.WebSocket;
       
    29 import jdk.incubator.http.internal.common.Demand;
    29 import jdk.incubator.http.internal.common.Log;
    30 import jdk.incubator.http.internal.common.Log;
    30 import jdk.incubator.http.internal.common.MinimalFuture;
    31 import jdk.incubator.http.internal.common.MinimalFuture;
    31 import jdk.incubator.http.internal.common.Pair;
    32 import jdk.incubator.http.internal.common.Pair;
    32 import jdk.incubator.http.internal.common.SequentialScheduler;
    33 import jdk.incubator.http.internal.common.SequentialScheduler;
    33 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
    34 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
    50 import java.util.concurrent.CompletionStage;
    51 import java.util.concurrent.CompletionStage;
    51 import java.util.concurrent.ConcurrentLinkedQueue;
    52 import java.util.concurrent.ConcurrentLinkedQueue;
    52 import java.util.concurrent.TimeUnit;
    53 import java.util.concurrent.TimeUnit;
    53 import java.util.concurrent.TimeoutException;
    54 import java.util.concurrent.TimeoutException;
    54 import java.util.concurrent.atomic.AtomicBoolean;
    55 import java.util.concurrent.atomic.AtomicBoolean;
    55 import java.util.concurrent.atomic.AtomicInteger;
       
    56 import java.util.concurrent.atomic.AtomicReference;
    56 import java.util.concurrent.atomic.AtomicReference;
    57 import java.util.function.Consumer;
    57 import java.util.function.Consumer;
    58 import java.util.function.Function;
    58 import java.util.function.Function;
    59 
    59 
    60 import static java.util.Objects.requireNonNull;
    60 import static java.util.Objects.requireNonNull;
    61 import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
    61 import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
    62 import static jdk.incubator.http.internal.common.Pair.pair;
    62 import static jdk.incubator.http.internal.common.Pair.pair;
    63 import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
    63 import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
    64 import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
    64 import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
    65 import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
    65 import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
       
    66 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.BINARY;
       
    67 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.CLOSE;
       
    68 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.ERROR;
       
    69 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.IDLE;
       
    70 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.OPEN;
       
    71 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.PING;
       
    72 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.PONG;
       
    73 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.TEXT;
       
    74 import static jdk.incubator.http.internal.websocket.WebSocketImpl.State.WAITING;
    66 
    75 
    67 /*
    76 /*
    68  * A WebSocket client.
    77  * A WebSocket client.
    69  */
    78  */
    70 public final class WebSocketImpl implements WebSocket {
    79 public final class WebSocketImpl implements WebSocket {
    71 
    80 
    72     private static final int IDLE   =  0;
    81     enum State {
    73     private static final int OPEN   =  1;
    82         OPEN,
    74     private static final int TEXT   =  2;
    83         IDLE,
    75     private static final int BINARY =  4;
    84         WAITING,
    76     private static final int PING   =  8;
    85         TEXT,
    77     private static final int PONG   = 16;
    86         BINARY,
    78     private static final int CLOSE  = 32;
    87         PING,
    79     private static final int ERROR  = 64;
    88         PONG,
       
    89         CLOSE,
       
    90         ERROR;
       
    91     }
    80 
    92 
    81     private volatile boolean inputClosed;
    93     private volatile boolean inputClosed;
    82     private volatile boolean outputClosed;
    94     private volatile boolean outputClosed;
    83 
    95 
    84     /* Which of the listener's methods to call next? */
    96     private final AtomicReference<State> state = new AtomicReference<>(OPEN);
    85     private final AtomicInteger state = new AtomicInteger(OPEN);
       
    86 
    97 
    87     /* Components of calls to Listener's methods */
    98     /* Components of calls to Listener's methods */
    88     private MessagePart part;
    99     private MessagePart part;
    89     private ByteBuffer binaryData;
   100     private ByteBuffer binaryData;
    90     private CharSequence text;
   101     private CharSequence text;
   102             queue = new ConcurrentLinkedQueue<>();
   113             queue = new ConcurrentLinkedQueue<>();
   103     private final Context context = new OutgoingMessage.Context();
   114     private final Context context = new OutgoingMessage.Context();
   104     private final Transmitter transmitter;
   115     private final Transmitter transmitter;
   105     private final Receiver receiver;
   116     private final Receiver receiver;
   106     private final SequentialScheduler receiveScheduler = new SequentialScheduler(new ReceiveTask());
   117     private final SequentialScheduler receiveScheduler = new SequentialScheduler(new ReceiveTask());
       
   118     private final Demand demand = new Demand();
   107 
   119 
   108     public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
   120     public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
   109         Function<Result, WebSocket> newWebSocket = r -> {
   121         Function<Result, WebSocket> newWebSocket = r -> {
   110             WebSocket ws = newInstance(b.getUri(),
   122             WebSocket ws = newInstance(b.getUri(),
   111                                        r.subprotocol,
   123                                        r.subprotocol,
   140     }
   152     }
   141 
   153 
   142     private WebSocketImpl(URI uri,
   154     private WebSocketImpl(URI uri,
   143                           String subprotocol,
   155                           String subprotocol,
   144                           Listener listener,
   156                           Listener listener,
   145                           TransportSupplier transport)
   157                           TransportSupplier transport) {
   146     {
       
   147         this.uri = requireNonNull(uri);
   158         this.uri = requireNonNull(uri);
   148         this.subprotocol = requireNonNull(subprotocol);
   159         this.subprotocol = requireNonNull(subprotocol);
   149         this.listener = requireNonNull(listener);
   160         this.listener = requireNonNull(listener);
   150         this.transmitter = transport.transmitter();
   161         this.transmitter = transport.transmitter();
   151         this.receiver = transport.receiver(new SignallingMessageConsumer());
   162         this.receiver = transport.receiver(new SignallingMessageConsumer());
   217      * fashion in respect to other messages accepted through this method. No
   228      * fashion in respect to other messages accepted through this method. No
   218      * further messages will be accepted until the returned CompletableFuture
   229      * further messages will be accepted until the returned CompletableFuture
   219      * completes. This method is used to enforce "one outstanding send
   230      * completes. This method is used to enforce "one outstanding send
   220      * operation" policy.
   231      * operation" policy.
   221      */
   232      */
   222     private CompletableFuture<WebSocket> enqueueExclusively(OutgoingMessage m)
   233     private CompletableFuture<WebSocket> enqueueExclusively(OutgoingMessage m) {
   223     {
       
   224         if (!outstandingSend.compareAndSet(false, true)) {
   234         if (!outstandingSend.compareAndSet(false, true)) {
   225             return failedFuture(new IllegalStateException("Send pending"));
   235             return failedFuture(new IllegalStateException("Send pending"));
   226         }
   236         }
   227         return enqueue(m).whenComplete((r, e) -> outstandingSend.set(false));
   237         return enqueue(m).whenComplete((r, e) -> outstandingSend.set(false));
   228     }
   238     }
   284         }
   294         }
   285     }
   295     }
   286 
   296 
   287     @Override
   297     @Override
   288     public void request(long n) {
   298     public void request(long n) {
   289         // TODO: delay until state becomes ACTIVE, otherwise messages might be
   299         if (demand.increase(n)) {
   290         // requested and consecutively become pending before onOpen is signalled
   300             receiveScheduler.runOrSchedule();
   291         receiver.request(n);
   301         }
   292     }
   302     }
   293 
   303 
   294     @Override
   304     @Override
   295     public String getSubprotocol() {
   305     public String getSubprotocol() {
   296         return subprotocol;
   306         return subprotocol;
   336      *     - after the state has been observed as CLOSE/ERROR, the scheduler
   346      *     - after the state has been observed as CLOSE/ERROR, the scheduler
   337      *       is stopped
   347      *       is stopped
   338      */
   348      */
   339     private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask {
   349     private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask {
   340 
   350 
       
   351         // Receiver only asked here and nowhere else because we must make sure
       
   352         // onOpen is invoked first and no messages become pending before onOpen
       
   353         // finishes
       
   354 
   341         @Override
   355         @Override
   342         public void run() {
   356         public void run() {
   343             final int s = state.getAndSet(IDLE);
   357             while (true) {
   344             try {
   358                 State s = state.get();
   345                 switch (s) {
   359                 try {
   346                     case OPEN:
   360                     switch (s) {
   347                         processOpen();
   361                         case OPEN:
   348                         break;
   362                             processOpen();
   349                     case TEXT:
   363                             tryChangeState(OPEN, IDLE);
   350                         processText();
   364                             break;
   351                         break;
   365                         case TEXT:
   352                     case BINARY:
   366                             processText();
   353                         processBinary();
   367                             tryChangeState(TEXT, IDLE);
   354                         break;
   368                             break;
   355                     case PING:
   369                         case BINARY:
   356                         processPing();
   370                             processBinary();
   357                         break;
   371                             tryChangeState(BINARY, IDLE);
   358                     case PONG:
   372                             break;
   359                         processPong();
   373                         case PING:
   360                         break;
   374                             processPing();
   361                     case CLOSE:
   375                             tryChangeState(PING, IDLE);
   362                         processClose();
   376                             break;
   363                         break;
   377                         case PONG:
   364                     case ERROR:
   378                             processPong();
   365                         processError();
   379                             tryChangeState(PONG, IDLE);
   366                         break;
   380                             break;
   367                     case IDLE:
   381                         case CLOSE:
   368                         // For debugging spurious signalling: when there was a
   382                             processClose();
   369                         // signal, but apparently nothing has changed
   383                             return;
   370                         break;
   384                         case ERROR:
   371                     default:
   385                             processError();
   372                         throw new InternalError(String.valueOf(s));
   386                             return;
       
   387                         case IDLE:
       
   388                             if (demand.tryDecrement()
       
   389                                     && tryChangeState(IDLE, WAITING)) {
       
   390                                 receiver.request(1);
       
   391                             }
       
   392                             return;
       
   393                         case WAITING:
       
   394                             // For debugging spurious signalling: when there was a
       
   395                             // signal, but apparently nothing has changed
       
   396                             return;
       
   397                         default:
       
   398                             throw new InternalError(String.valueOf(s));
       
   399                     }
       
   400                 } catch (Throwable t) {
       
   401                     signalError(t);
   373                 }
   402                 }
   374             } catch (Throwable t) {
       
   375                 signalError(t);
       
   376             }
   403             }
   377         }
   404         }
   378 
   405 
   379         private void processError() throws IOException {
   406         private void processError() throws IOException {
   380             receiver.close();
   407             receiver.close();
   460     }
   487     }
   461 
   488 
   462     private void signalError(Throwable error) {
   489     private void signalError(Throwable error) {
   463         inputClosed = true;
   490         inputClosed = true;
   464         outputClosed = true;
   491         outputClosed = true;
   465         if (!this.error.compareAndSet(null, error) || !tryChangeState(ERROR)) {
   492         if (!this.error.compareAndSet(null, error) || !trySetState(ERROR)) {
   466             Log.logError(error);
   493             Log.logError(error);
   467         } else {
   494         } else {
   468             close();
   495             close();
   469         }
   496         }
   470     }
   497     }
   487      */
   514      */
   488     private void signalClose(int statusCode, String reason) {
   515     private void signalClose(int statusCode, String reason) {
   489         inputClosed = true;
   516         inputClosed = true;
   490         this.statusCode = statusCode;
   517         this.statusCode = statusCode;
   491         this.reason = reason;
   518         this.reason = reason;
   492         if (!tryChangeState(CLOSE)) {
   519         if (!trySetState(CLOSE)) {
   493             Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
   520             Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
   494         } else {
   521         } else {
   495             try {
   522             try {
   496                 receiver.close();
   523                 receiver.close();
   497             } catch (Throwable t) {
   524             } catch (Throwable t) {
   505         @Override
   532         @Override
   506         public void onText(CharSequence data, MessagePart part) {
   533         public void onText(CharSequence data, MessagePart part) {
   507             receiver.acknowledge();
   534             receiver.acknowledge();
   508             text = data;
   535             text = data;
   509             WebSocketImpl.this.part = part;
   536             WebSocketImpl.this.part = part;
   510             tryChangeState(TEXT);
   537             tryChangeState(WAITING, TEXT);
   511         }
   538         }
   512 
   539 
   513         @Override
   540         @Override
   514         public void onBinary(ByteBuffer data, MessagePart part) {
   541         public void onBinary(ByteBuffer data, MessagePart part) {
   515             receiver.acknowledge();
   542             receiver.acknowledge();
   516             binaryData = data;
   543             binaryData = data;
   517             WebSocketImpl.this.part = part;
   544             WebSocketImpl.this.part = part;
   518             tryChangeState(BINARY);
   545             tryChangeState(WAITING, BINARY);
   519         }
   546         }
   520 
   547 
   521         @Override
   548         @Override
   522         public void onPing(ByteBuffer data) {
   549         public void onPing(ByteBuffer data) {
   523             receiver.acknowledge();
   550             receiver.acknowledge();
   524             binaryData = data;
   551             binaryData = data;
   525             tryChangeState(PING);
   552             tryChangeState(WAITING, PING);
   526         }
   553         }
   527 
   554 
   528         @Override
   555         @Override
   529         public void onPong(ByteBuffer data) {
   556         public void onPong(ByteBuffer data) {
   530             receiver.acknowledge();
   557             receiver.acknowledge();
   531             binaryData = data;
   558             binaryData = data;
   532             tryChangeState(PONG);
   559             tryChangeState(WAITING, PONG);
   533         }
   560         }
   534 
   561 
   535         @Override
   562         @Override
   536         public void onClose(int statusCode, CharSequence reason) {
   563         public void onClose(int statusCode, CharSequence reason) {
   537             receiver.acknowledge();
   564             receiver.acknowledge();
   538             signalClose(statusCode, reason.toString());
   565             signalClose(statusCode, reason.toString());
   539         }
   566         }
   540 
   567 
   541         @Override
   568         @Override
   542         public void onComplete() {
   569         public void onComplete() {
       
   570             receiver.acknowledge();
   543             signalClose(CLOSED_ABNORMALLY, "");
   571             signalClose(CLOSED_ABNORMALLY, "");
   544         }
   572         }
   545 
   573 
   546         @Override
   574         @Override
   547         public void onError(Throwable error) {
   575         public void onError(Throwable error) {
   548             signalError(error);
   576             signalError(error);
   549         }
   577         }
   550     }
   578     }
   551 
   579 
   552     private boolean tryChangeState(int newState) {
   580     private boolean trySetState(State newState) {
   553         while (true) {
   581         while (true) {
   554             int currentState = state.get();
   582             State currentState = state.get();
   555             if (currentState == ERROR || currentState == CLOSE) {
   583             if (currentState == ERROR || currentState == CLOSE) {
   556                 return false;
   584                 return false;
   557             } else if (state.compareAndSet(currentState, newState)) {
   585             } else if (state.compareAndSet(currentState, newState)) {
   558                 receiveScheduler.runOrSchedule();
   586                 receiveScheduler.runOrSchedule();
   559                 return true;
   587                 return true;
   560             }
   588             }
   561         }
   589         }
   562     }
   590     }
       
   591 
       
   592     private boolean tryChangeState(State expectedState, State newState) {
       
   593         State witness = state.compareAndExchange(expectedState, newState);
       
   594         if (witness == expectedState) {
       
   595             receiveScheduler.runOrSchedule();
       
   596             return true;
       
   597         }
       
   598         // This should be the only reason for inability to change the state from
       
   599         // IDLE to WAITING: the state has changed to terminal
       
   600         if (witness != ERROR && witness != CLOSE) {
       
   601             throw new InternalError();
       
   602         }
       
   603         return false;
       
   604     }
   563 }
   605 }