1086 @Override |
1086 @Override |
1087 public void onNext(List<ByteBuffer> item) { |
1087 public void onNext(List<ByteBuffer> item) { |
1088 debug.log(Level.DEBUG, () -> "onNext: got " + Utils.remaining(item) |
1088 debug.log(Level.DEBUG, () -> "onNext: got " + Utils.remaining(item) |
1089 + " bytes in " + item.size() + " buffers"); |
1089 + " bytes in " + item.size() + " buffers"); |
1090 queue.addAll(item); |
1090 queue.addAll(item); |
1091 scheduler.deferOrSchedule(client().theExecutor()); |
1091 scheduler.runOrSchedule(client().theExecutor()); |
1092 } |
1092 } |
1093 |
1093 |
1094 @Override |
1094 @Override |
1095 public void onError(Throwable throwable) { |
1095 public void onError(Throwable throwable) { |
1096 debug.log(Level.DEBUG, () -> "onError: " + throwable); |
1096 debug.log(Level.DEBUG, () -> "onError: " + throwable); |
1097 error = throwable; |
1097 error = throwable; |
1098 completed = true; |
1098 completed = true; |
1099 scheduler.deferOrSchedule(client().theExecutor()); |
1099 scheduler.runOrSchedule(client().theExecutor()); |
1100 } |
1100 } |
1101 |
1101 |
1102 @Override |
1102 @Override |
1103 public void onComplete() { |
1103 public void onComplete() { |
1104 debug.log(Level.DEBUG, "EOF"); |
1104 debug.log(Level.DEBUG, "EOF"); |
1105 error = new EOFException("EOF reached while reading"); |
1105 error = new EOFException("EOF reached while reading"); |
1106 completed = true; |
1106 completed = true; |
1107 scheduler.deferOrSchedule(client().theExecutor()); |
1107 scheduler.runOrSchedule(client().theExecutor()); |
1108 } |
1108 } |
1109 |
1109 |
1110 @Override |
1110 @Override |
1111 public void dropSubscription() { |
1111 public void dropSubscription() { |
1112 debug.log(Level.DEBUG, "dropSubscription"); |
1112 debug.log(Level.DEBUG, "dropSubscription"); |