src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java
equal
deleted
inserted
replaced
592 @Override |
592 @Override |
593 public void run() { |
593 public void run() { |
594 debug.log(Level.DEBUG, "enter receive task"); |
594 debug.log(Level.DEBUG, "enter receive task"); |
595 loop: |
595 loop: |
596 while (!receiveScheduler.isStopped()) { |
596 while (!receiveScheduler.isStopped()) { |
|
597 ChannelState rs = readState; |
597 if (data.hasRemaining()) { |
598 if (data.hasRemaining()) { |
598 debug.log(Level.DEBUG, "remaining bytes received %s", |
599 debug.log(Level.DEBUG, "remaining bytes received %s", |
599 data.remaining()); |
600 data.remaining()); |
600 if (!demand.isFulfilled()) { |
601 if (!demand.isFulfilled()) { |
601 try { |
602 try { |
606 assert oldPos != newPos : data; |
607 assert oldPos != newPos : data; |
607 } catch (Throwable e) { |
608 } catch (Throwable e) { |
608 receiveScheduler.stop(); |
609 receiveScheduler.stop(); |
609 messageConsumer.onError(e); |
610 messageConsumer.onError(e); |
610 } |
611 } |
|
612 if (!data.hasRemaining()) { |
|
613 rs = readState = UNREGISTERED; |
|
614 } |
611 continue; |
615 continue; |
612 } |
616 } |
613 break loop; |
617 break loop; |
614 } |
618 } |
615 final ChannelState rs = readState; |
|
616 debug.log(Level.DEBUG, "receive state: %s", rs); |
619 debug.log(Level.DEBUG, "receive state: %s", rs); |
617 switch (rs) { |
620 switch (rs) { |
618 case WAITING: |
621 case WAITING: |
619 break loop; |
622 break loop; |
620 case UNREGISTERED: |
623 case UNREGISTERED: |
621 try { |
624 try { |
622 readState = WAITING; |
625 rs = readState = WAITING; |
623 channel.registerEvent(readEvent); |
626 channel.registerEvent(readEvent); |
624 } catch (Throwable e) { |
627 } catch (Throwable e) { |
625 receiveScheduler.stop(); |
628 receiveScheduler.stop(); |
626 messageConsumer.onError(e); |
629 messageConsumer.onError(e); |
627 } |
630 } |
639 messageConsumer.onComplete(); |
642 messageConsumer.onComplete(); |
640 break loop; |
643 break loop; |
641 } else if (!data.hasRemaining()) { |
644 } else if (!data.hasRemaining()) { |
642 // No data at the moment. Pretty much a "goto", |
645 // No data at the moment. Pretty much a "goto", |
643 // reusing the existing code path for registration |
646 // reusing the existing code path for registration |
644 readState = UNREGISTERED; |
647 rs = readState = UNREGISTERED; |
645 } |
648 } |
646 continue loop; |
649 continue loop; |
647 default: |
650 default: |
648 throw new InternalError(String.valueOf(rs)); |
651 throw new InternalError(String.valueOf(rs)); |
649 } |
652 } |