src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java
branchhttp-client-branch
changeset 55763 634d8e14c172
parent 47216 71c04702a3d5
child 55842 cb8fcde5b5c8
equal deleted inserted replaced
55762:e947a3a50a95 55763:634d8e14c172
    27 
    27 
    28 import java.io.IOException;
    28 import java.io.IOException;
    29 import java.nio.ByteBuffer;
    29 import java.nio.ByteBuffer;
    30 import java.nio.channels.SelectionKey;
    30 import java.nio.channels.SelectionKey;
    31 import java.util.concurrent.atomic.AtomicLong;
    31 import java.util.concurrent.atomic.AtomicLong;
       
    32 import jdk.incubator.http.internal.common.SequentialScheduler;
    32 
    33 
    33 /*
    34 /*
    34  * Receives incoming data from the channel on demand and converts it into a
    35  * Receives incoming data from the channel on demand and converts it into a
    35  * stream of WebSocket messages which are then delivered to the supplied message
    36  * stream of WebSocket messages which are then delivered to the supplied message
    36  * consumer in a strict sequential order and non-recursively. In other words,
    37  * consumer in a strict sequential order and non-recursively. In other words,
    55     private final RawChannel channel;
    56     private final RawChannel channel;
    56     private final FrameConsumer frameConsumer;
    57     private final FrameConsumer frameConsumer;
    57     private final Frame.Reader reader = new Frame.Reader();
    58     private final Frame.Reader reader = new Frame.Reader();
    58     private final RawChannel.RawEvent event = createHandler();
    59     private final RawChannel.RawEvent event = createHandler();
    59     private final AtomicLong demand = new AtomicLong();
    60     private final AtomicLong demand = new AtomicLong();
    60     private final CooperativeHandler handler;
    61     private final SequentialScheduler pushScheduler;
    61 
    62 
    62     private ByteBuffer data;
    63     private ByteBuffer data;
    63     private volatile int state;
    64     private volatile int state;
    64 
    65 
    65     private static final int UNREGISTERED = 0;
    66     private static final int UNREGISTERED = 0;
    72         this.frameConsumer = new FrameConsumer(this.messageConsumer);
    73         this.frameConsumer = new FrameConsumer(this.messageConsumer);
    73         this.data = channel.initialByteBuffer();
    74         this.data = channel.initialByteBuffer();
    74         // To ensure the initial non-final `data` will be visible
    75         // To ensure the initial non-final `data` will be visible
    75         // (happens-before) when `handler` invokes `pushContinuously`
    76         // (happens-before) when `handler` invokes `pushContinuously`
    76         // the following assignment is done last:
    77         // the following assignment is done last:
    77         handler = new CooperativeHandler(this::pushContinuously);
    78         pushScheduler = new SequentialScheduler(new PushContinuouslyTask());
    78     }
    79     }
    79 
    80 
    80     private RawChannel.RawEvent createHandler() {
    81     private RawChannel.RawEvent createHandler() {
    81         return new RawChannel.RawEvent() {
    82         return new RawChannel.RawEvent() {
    82 
    83 
    86             }
    87             }
    87 
    88 
    88             @Override
    89             @Override
    89             public void handle() {
    90             public void handle() {
    90                 state = AVAILABLE;
    91                 state = AVAILABLE;
    91                 handler.handle();
    92                 pushScheduler.runOrSchedule();
    92             }
    93             }
    93         };
    94         };
    94     }
    95     }
    95 
    96 
    96     void request(long n) {
    97     void request(long n) {
    97         if (n < 0L) {
    98         if (n < 0L) {
    98             throw new IllegalArgumentException("Negative: " + n);
    99             throw new IllegalArgumentException("Negative: " + n);
    99         }
   100         }
   100         demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i);
   101         demand.accumulateAndGet(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i);
   101         handler.handle();
   102         pushScheduler.runOrSchedule();
   102     }
   103     }
   103 
   104 
   104     void acknowledge() {
   105     void acknowledge() {
   105         long x = demand.decrementAndGet();
   106         long x = demand.decrementAndGet();
   106         if (x < 0) {
   107         if (x < 0) {
   111     /*
   112     /*
   112      * Stops the machinery from reading and delivering messages permanently,
   113      * Stops the machinery from reading and delivering messages permanently,
   113      * regardless of the current demand and data availability.
   114      * regardless of the current demand and data availability.
   114      */
   115      */
   115     void close() {
   116     void close() {
   116         handler.stop();
   117         pushScheduler.stop();
   117     }
   118     }
   118 
   119 
   119     private void pushContinuously() {
   120     private class PushContinuouslyTask
   120         while (!handler.isStopped()) {
   121         extends SequentialScheduler.CompleteRestartableTask
   121             if (data.hasRemaining()) {
   122     {
   122                 if (demand.get() > 0) {
   123         @Override
   123                     try {
   124         public void run() {
   124                         int oldPos = data.position();
   125             while (!pushScheduler.isStopped()) {
   125                         reader.readFrame(data, frameConsumer);
   126                 if (data.hasRemaining()) {
   126                         int newPos = data.position();
   127                     if (demand.get() > 0) {
   127                         assert oldPos != newPos : data; // reader always consumes bytes
   128                         try {
   128                     } catch (FailWebSocketException e) {
   129                             int oldPos = data.position();
   129                         handler.stop();
   130                             reader.readFrame(data, frameConsumer);
   130                         messageConsumer.onError(e);
   131                             int newPos = data.position();
       
   132                             assert oldPos != newPos : data; // reader always consumes bytes
       
   133                         } catch (FailWebSocketException e) {
       
   134                             pushScheduler.stop();
       
   135                             messageConsumer.onError(e);
       
   136                         }
       
   137                         continue;
   131                     }
   138                     }
   132                     continue;
   139                     break;
   133                 }
   140                 }
   134                 break;
   141                 switch (state) {
   135             }
   142                     case WAITING:
   136             switch (state) {
       
   137                 case WAITING:
       
   138                     return;
       
   139                 case UNREGISTERED:
       
   140                     try {
       
   141                         state = WAITING;
       
   142                         channel.registerEvent(event);
       
   143                     } catch (IOException e) {
       
   144                         handler.stop();
       
   145                         messageConsumer.onError(e);
       
   146                     }
       
   147                     return;
       
   148                 case AVAILABLE:
       
   149                     try {
       
   150                         data = channel.read();
       
   151                     } catch (IOException e) {
       
   152                         handler.stop();
       
   153                         messageConsumer.onError(e);
       
   154                         return;
   143                         return;
   155                     }
   144                     case UNREGISTERED:
   156                     if (data == null) { // EOF
   145                         try {
   157                         handler.stop();
   146                             state = WAITING;
   158                         messageConsumer.onComplete();
   147                             channel.registerEvent(event);
       
   148                         } catch (IOException e) {
       
   149                             pushScheduler.stop();
       
   150                             messageConsumer.onError(e);
       
   151                         }
   159                         return;
   152                         return;
   160                     } else if (!data.hasRemaining()) { // No data at the moment
   153                     case AVAILABLE:
   161                         // Pretty much a "goto", reusing the existing code path
   154                         try {
   162                         // for registration
   155                             data = channel.read();
   163                         state = UNREGISTERED;
   156                         } catch (IOException e) {
   164                     }
   157                             pushScheduler.stop();
   165                     continue;
   158                             messageConsumer.onError(e);
   166                 default:
   159                             return;
   167                     throw new InternalError(String.valueOf(state));
   160                         }
       
   161                         if (data == null) { // EOF
       
   162                             pushScheduler.stop();
       
   163                             messageConsumer.onComplete();
       
   164                             return;
       
   165                         } else if (!data.hasRemaining()) { // No data at the moment
       
   166                             // Pretty much a "goto", reusing the existing code path
       
   167                             // for registration
       
   168                             state = UNREGISTERED;
       
   169                         }
       
   170                         continue;
       
   171                     default:
       
   172                         throw new InternalError(String.valueOf(state));
       
   173                 }
   168             }
   174             }
   169         }
   175         }
   170     }
   176     }
   171 }
   177 }
   172