src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java
branchhttp-client-branch
changeset 55763 634d8e14c172
parent 47216 71c04702a3d5
child 55764 34d7cc00f87a
equal deleted inserted replaced
55762:e947a3a50a95 55763:634d8e14c172
    23  * questions.
    23  * questions.
    24  */
    24  */
    25 
    25 
    26 package jdk.incubator.http.internal.websocket;
    26 package jdk.incubator.http.internal.websocket;
    27 
    27 
    28 import jdk.incubator.http.WebSocket;
       
    29 import jdk.incubator.http.internal.common.Log;
       
    30 import jdk.incubator.http.internal.common.Pair;
       
    31 import jdk.incubator.http.internal.websocket.OpeningHandshake.Result;
       
    32 import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary;
       
    33 import jdk.incubator.http.internal.websocket.OutgoingMessage.Close;
       
    34 import jdk.incubator.http.internal.websocket.OutgoingMessage.Context;
       
    35 import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping;
       
    36 import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
       
    37 import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
       
    38 
       
    39 import java.io.IOException;
    28 import java.io.IOException;
    40 import java.net.ProtocolException;
    29 import java.net.ProtocolException;
    41 import java.net.URI;
    30 import java.net.URI;
    42 import java.nio.ByteBuffer;
    31 import java.nio.ByteBuffer;
    43 import java.util.Queue;
    32 import java.util.Queue;
    45 import java.util.concurrent.CompletionStage;
    34 import java.util.concurrent.CompletionStage;
    46 import java.util.concurrent.ConcurrentLinkedQueue;
    35 import java.util.concurrent.ConcurrentLinkedQueue;
    47 import java.util.concurrent.atomic.AtomicBoolean;
    36 import java.util.concurrent.atomic.AtomicBoolean;
    48 import java.util.function.Consumer;
    37 import java.util.function.Consumer;
    49 import java.util.function.Function;
    38 import java.util.function.Function;
       
    39 import jdk.incubator.http.WebSocket;
       
    40 import jdk.incubator.http.internal.common.Log;
       
    41 import jdk.incubator.http.internal.common.Pair;
       
    42 import jdk.incubator.http.internal.common.SequentialScheduler;
       
    43 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
       
    44 import jdk.incubator.http.internal.websocket.OpeningHandshake.Result;
       
    45 import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary;
       
    46 import jdk.incubator.http.internal.websocket.OutgoingMessage.Close;
       
    47 import jdk.incubator.http.internal.websocket.OutgoingMessage.Context;
       
    48 import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping;
       
    49 import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
       
    50 import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
    50 
    51 
    51 import static java.util.Objects.requireNonNull;
    52 import static java.util.Objects.requireNonNull;
    52 import static java.util.concurrent.CompletableFuture.failedFuture;
    53 import static java.util.concurrent.CompletableFuture.failedFuture;
    53 import static jdk.incubator.http.internal.common.Pair.pair;
    54 import static jdk.incubator.http.internal.common.Pair.pair;
       
    55 import jdk.incubator.http.internal.common.Utils;
    54 import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
    56 import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
    55 import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
    57 import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
    56 import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
    58 import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
    57 
    59 
    58 /*
    60 /*
    70      * invoked. We keep track of this since only one of these methods is invoked
    72      * invoked. We keep track of this since only one of these methods is invoked
    71      * and it is invoked at most once.
    73      * and it is invoked at most once.
    72      */
    74      */
    73     private boolean lastMethodInvoked;
    75     private boolean lastMethodInvoked;
    74     private final AtomicBoolean outstandingSend = new AtomicBoolean();
    76     private final AtomicBoolean outstandingSend = new AtomicBoolean();
    75     private final CooperativeHandler sendHandler =
    77     private final SequentialScheduler sendScheduler;
    76               new CooperativeHandler(this::sendFirst);
       
    77     private final Queue<Pair<OutgoingMessage, CompletableFuture<WebSocket>>>
    78     private final Queue<Pair<OutgoingMessage, CompletableFuture<WebSocket>>>
    78             queue = new ConcurrentLinkedQueue<>();
    79             queue = new ConcurrentLinkedQueue<>();
    79     private final Context context = new OutgoingMessage.Context();
    80     private final Context context = new OutgoingMessage.Context();
    80     private final Transmitter transmitter;
    81     private final Transmitter transmitter;
    81     private final Receiver receiver;
    82     private final Receiver receiver;
   134         this.subprotocol = requireNonNull(subprotocol);
   135         this.subprotocol = requireNonNull(subprotocol);
   135         this.channel = requireNonNull(channel);
   136         this.channel = requireNonNull(channel);
   136         this.listener = requireNonNull(listener);
   137         this.listener = requireNonNull(listener);
   137         this.transmitter = new Transmitter(channel);
   138         this.transmitter = new Transmitter(channel);
   138         this.receiver = new Receiver(messageConsumerOf(listener), channel);
   139         this.receiver = new Receiver(messageConsumerOf(listener), channel);
       
   140         this.sendScheduler = new SequentialScheduler(new SendFirstTask());
   139 
   141 
   140         // Set up the Closing Handshake action
   142         // Set up the Closing Handshake action
   141         CompletableFuture.allOf(closeReceived, closeSent)
   143         CompletableFuture.allOf(closeReceived, closeSent)
   142                 .whenComplete((result, error) -> {
   144                 .whenComplete((result, error) -> {
   143                     try {
   145                     try {
   323         boolean added = queue.add(pair(m, cf));
   325         boolean added = queue.add(pair(m, cf));
   324         if (!added) {
   326         if (!added) {
   325             // The queue is supposed to be unbounded
   327             // The queue is supposed to be unbounded
   326             throw new InternalError();
   328             throw new InternalError();
   327         }
   329         }
   328         sendHandler.handle();
   330         sendScheduler.runOrSchedule();
   329         return cf;
   331         return cf;
   330     }
   332     }
   331 
   333 
   332     /*
   334     /*
   333      * This is the main sending method. It may be run in different threads,
   335      * This is the main sending task. It may be run in different threads,
   334      * but never concurrently.
   336      * but never concurrently.
   335      */
   337      */
   336     private void sendFirst(Runnable whenSent) {
   338     private class SendFirstTask implements SequentialScheduler.RestartableTask {
   337         Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
   339         @Override
   338         if (p == null) {
   340         public void run (DeferredCompleter taskCompleter){
   339             whenSent.run();
   341             Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
   340             return;
   342             if (p == null) {
   341         }
   343                 taskCompleter.complete();
   342         OutgoingMessage message = p.first;
   344                 return;
   343         CompletableFuture<WebSocket> cf = p.second;
   345             }
   344         try {
   346             OutgoingMessage message = p.first;
   345             message.contextualize(context);
   347             CompletableFuture<WebSocket> cf = p.second;
   346             Consumer<Exception> h = e -> {
   348             try {
   347                 if (e == null) {
   349                 message.contextualize(context);
   348                     cf.complete(WebSocketImpl.this);
   350                 Consumer<Exception> h = e -> {
   349                 } else {
   351                     if (e == null) {
   350                     cf.completeExceptionally(e);
   352                         cf.complete(WebSocketImpl.this);
   351                 }
   353                     } else {
   352                 sendHandler.handle();
   354                         cf.completeExceptionally(e);
   353                 whenSent.run();
   355                     }
   354             };
   356                     sendScheduler.runOrSchedule();
   355             transmitter.send(message, h);
   357                     taskCompleter.complete();
   356         } catch (Exception t) {
   358                 };
   357             cf.completeExceptionally(t);
   359                 transmitter.send(message, h);
       
   360             } catch (Exception t) {
       
   361                 cf.completeExceptionally(t);
       
   362             }
   358         }
   363         }
   359     }
   364     }
   360 
   365 
   361     @Override
   366     @Override
   362     public void request(long n) {
   367     public void request(long n) {
   434                 // Non-exclusive send;
   439                 // Non-exclusive send;
   435                 CompletableFuture<WebSocket> pongSent = enqueue(new Pong(copy));
   440                 CompletableFuture<WebSocket> pongSent = enqueue(new Pong(copy));
   436                 pongSent.whenComplete(
   441                 pongSent.whenComplete(
   437                         (r, error) -> {
   442                         (r, error) -> {
   438                             if (error != null) {
   443                             if (error != null) {
   439                                 WebSocketImpl.this.signalError(error);
   444                                 WebSocketImpl.this.signalError(
       
   445                                         Utils.getCompletionCause(error));
   440                             }
   446                             }
   441                         }
   447                         }
   442                 );
   448                 );
   443                 synchronized (WebSocketImpl.this.lock) {
   449                 synchronized (WebSocketImpl.this.lock) {
   444                     try {
   450                     try {
   480                     Exception ex = (Exception) new ProtocolException().initCause(error);
   486                     Exception ex = (Exception) new ProtocolException().initCause(error);
   481                     int code = ((FailWebSocketException) error).getStatusCode();
   487                     int code = ((FailWebSocketException) error).getStatusCode();
   482                     enqueueClose(new Close(code, ""))
   488                     enqueueClose(new Close(code, ""))
   483                             .whenComplete((r, e) -> {
   489                             .whenComplete((r, e) -> {
   484                                 if (e != null) {
   490                                 if (e != null) {
   485                                     ex.addSuppressed(e);
   491                                     ex.addSuppressed(Utils.getCompletionCause(e));
   486                                 }
   492                                 }
   487                                 try {
   493                                 try {
   488                                     channel.close();
   494                                     channel.close();
   489                                 } catch (IOException e1) {
   495                                 } catch (IOException e1) {
   490                                     ex.addSuppressed(e1);
   496                                     ex.addSuppressed(e1);