src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java
branchhttp-client-branch
changeset 56326 63422db47911
parent 56314 f92e7a8a189f
child 56389 0ba90c4f1e3f
equal deleted inserted replaced
56325:195d2970d981 56326:63422db47911
   592         @Override
   592         @Override
   593         public void run() {
   593         public void run() {
   594             debug.log(Level.DEBUG, "enter receive task");
   594             debug.log(Level.DEBUG, "enter receive task");
   595             loop:
   595             loop:
   596             while (!receiveScheduler.isStopped()) {
   596             while (!receiveScheduler.isStopped()) {
       
   597                 ChannelState rs = readState;
   597                 if (data.hasRemaining()) {
   598                 if (data.hasRemaining()) {
   598                     debug.log(Level.DEBUG, "remaining bytes received %s",
   599                     debug.log(Level.DEBUG, "remaining bytes received %s",
   599                               data.remaining());
   600                               data.remaining());
   600                     if (!demand.isFulfilled()) {
   601                     if (!demand.isFulfilled()) {
   601                         try {
   602                         try {
   606                             assert oldPos != newPos : data;
   607                             assert oldPos != newPos : data;
   607                         } catch (Throwable e) {
   608                         } catch (Throwable e) {
   608                             receiveScheduler.stop();
   609                             receiveScheduler.stop();
   609                             messageConsumer.onError(e);
   610                             messageConsumer.onError(e);
   610                         }
   611                         }
       
   612                         if (!data.hasRemaining()) {
       
   613                             rs = readState = UNREGISTERED;
       
   614                         }
   611                         continue;
   615                         continue;
   612                     }
   616                     }
   613                     break loop;
   617                     break loop;
   614                 }
   618                 }
   615                 final ChannelState rs = readState;
       
   616                 debug.log(Level.DEBUG, "receive state: %s", rs);
   619                 debug.log(Level.DEBUG, "receive state: %s", rs);
   617                 switch (rs) {
   620                 switch (rs) {
   618                     case WAITING:
   621                     case WAITING:
   619                         break loop;
   622                         break loop;
   620                     case UNREGISTERED:
   623                     case UNREGISTERED:
   621                         try {
   624                         try {
   622                             readState = WAITING;
   625                             rs = readState = WAITING;
   623                             channel.registerEvent(readEvent);
   626                             channel.registerEvent(readEvent);
   624                         } catch (Throwable e) {
   627                         } catch (Throwable e) {
   625                             receiveScheduler.stop();
   628                             receiveScheduler.stop();
   626                             messageConsumer.onError(e);
   629                             messageConsumer.onError(e);
   627                         }
   630                         }
   639                             messageConsumer.onComplete();
   642                             messageConsumer.onComplete();
   640                             break loop;
   643                             break loop;
   641                         } else if (!data.hasRemaining()) {
   644                         } else if (!data.hasRemaining()) {
   642                             // No data at the moment. Pretty much a "goto",
   645                             // No data at the moment. Pretty much a "goto",
   643                             // reusing the existing code path for registration
   646                             // reusing the existing code path for registration
   644                             readState = UNREGISTERED;
   647                             rs = readState = UNREGISTERED;
   645                         }
   648                         }
   646                         continue loop;
   649                         continue loop;
   647                     default:
   650                     default:
   648                         throw new InternalError(String.valueOf(rs));
   651                         throw new InternalError(String.valueOf(rs));
   649                 }
   652                 }