equal
deleted
inserted
replaced
417 } |
417 } |
418 |
418 |
419 void signalError(Throwable error) { |
419 void signalError(Throwable error) { |
420 debug.log(() -> "write error: " + error); |
420 debug.log(() -> "write error: " + error); |
421 if (Log.channel()) { |
421 if (Log.channel()) { |
422 Log.logChannel("Failed to write on channel ({0}: {1})", |
422 Log.logChannel("Failed to write to channel ({0}: {1})", |
423 channelDescr(), error); |
423 channelDescr(), error); |
424 } |
424 } |
425 completed = true; |
425 completed = true; |
426 readPublisher.signalError(error); |
426 readPublisher.signalError(error); |
427 } |
427 } |
719 } |
719 } |
720 |
720 |
721 @Override |
721 @Override |
722 public final void cancel() { |
722 public final void cancel() { |
723 pauseReadEvent(); |
723 pauseReadEvent(); |
|
724 if (Log.channel()) { |
|
725 Log.logChannel("Read subscription cancelled for channel {0}", |
|
726 channelDescr()); |
|
727 } |
724 readScheduler.stop(); |
728 readScheduler.stop(); |
725 } |
729 } |
726 |
730 |
727 private void resumeReadEvent() { |
731 private void resumeReadEvent() { |
728 if (debug.on()) debug.log("resuming read event"); |
732 if (debug.on()) debug.log("resuming read event"); |
743 final void signalError(Throwable error) { |
747 final void signalError(Throwable error) { |
744 if (!errorRef.compareAndSet(null, error)) { |
748 if (!errorRef.compareAndSet(null, error)) { |
745 return; |
749 return; |
746 } |
750 } |
747 if (debug.on()) debug.log("got read error: " + error); |
751 if (debug.on()) debug.log("got read error: " + error); |
|
752 if (Log.channel()) { |
|
753 Log.logChannel("Read error signalled on channel {0}: {1}", |
|
754 channelDescr(), error); |
|
755 } |
748 readScheduler.runOrSchedule(); |
756 readScheduler.runOrSchedule(); |
749 } |
757 } |
750 |
758 |
751 final void signalReadable() { |
759 final void signalReadable() { |
752 readScheduler.runOrSchedule(); |
760 readScheduler.runOrSchedule(); |
789 // safe to pause here because we're finished anyway. |
797 // safe to pause here because we're finished anyway. |
790 pauseReadEvent(); |
798 pauseReadEvent(); |
791 if (debug.on()) |
799 if (debug.on()) |
792 debug.log("Sending error " + error |
800 debug.log("Sending error " + error |
793 + " to subscriber " + subscriber); |
801 + " to subscriber " + subscriber); |
|
802 if (Log.channel()) { |
|
803 Log.logChannel("Raising error with subscriber for {0}: {1}", |
|
804 channelDescr(), error); |
|
805 } |
794 current.errorRef.compareAndSet(null, error); |
806 current.errorRef.compareAndSet(null, error); |
795 current.signalCompletion(); |
807 current.signalCompletion(); |
796 readScheduler.stop(); |
808 readScheduler.stop(); |
797 debugState("leaving read() loop with error: "); |
809 debugState("leaving read() loop with error: "); |
798 return; |
810 return; |
806 List<ByteBuffer> bytes = readAvailable(current.bufferSource); |
818 List<ByteBuffer> bytes = readAvailable(current.bufferSource); |
807 if (bytes == EOF) { |
819 if (bytes == EOF) { |
808 if (!completed) { |
820 if (!completed) { |
809 if (debug.on()) debug.log("got read EOF"); |
821 if (debug.on()) debug.log("got read EOF"); |
810 if (Log.channel()) { |
822 if (Log.channel()) { |
811 Log.logChannel("EOF reached from channel: {0}", |
823 Log.logChannel("EOF read from channel: {0}", |
812 channelDescr()); |
824 channelDescr()); |
813 } |
825 } |
814 completed = true; |
826 completed = true; |
815 // safe to pause here because we're finished |
827 // safe to pause here because we're finished |
816 // anyway. |
828 // anyway. |
870 } |
882 } |
871 } catch (Throwable t) { |
883 } catch (Throwable t) { |
872 if (debug.on()) debug.log("Unexpected exception in read loop", t); |
884 if (debug.on()) debug.log("Unexpected exception in read loop", t); |
873 signalError(t); |
885 signalError(t); |
874 } finally { |
886 } finally { |
|
887 if (readScheduler.isStopped()) { |
|
888 if (debug.on()) debug.log("Read scheduler stopped"); |
|
889 if (Log.channel()) { |
|
890 Log.logChannel("Stopped reading from channel {0}", channelDescr()); |
|
891 } |
|
892 } |
875 handlePending(); |
893 handlePending(); |
876 } |
894 } |
877 } |
895 } |
878 |
896 |
879 boolean handlePending() { |
897 boolean handlePending() { |