src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java
equal
deleted
inserted
replaced
209 |
209 |
210 public void outgoing(List<ByteBuffer> buffers, boolean complete) { |
210 public void outgoing(List<ByteBuffer> buffers, boolean complete) { |
211 Objects.requireNonNull(buffers); |
211 Objects.requireNonNull(buffers); |
212 if (complete) { |
212 if (complete) { |
213 assert Utils.remaining(buffers) == 0; |
213 assert Utils.remaining(buffers) == 0; |
214 logger.log(Level.DEBUG, "completionAcknowledged"); |
214 boolean closing = closing(); |
215 if (!upstreamCompleted && !closing()) |
215 logger.log(Level.DEBUG, |
|
216 "completionAcknowledged upstreamCompleted:%s, downstreamCompleted:%s, closing:%s", |
|
217 upstreamCompleted, downstreamCompleted, closing); |
|
218 if (!upstreamCompleted && !closing) |
216 throw new IllegalStateException("upstream not completed"); |
219 throw new IllegalStateException("upstream not completed"); |
217 completionAcknowledged = true; |
220 completionAcknowledged = true; |
218 } else { |
221 } else { |
219 logger.log(Level.DEBUG, () -> "Adding " |
222 logger.log(Level.DEBUG, () -> "Adding " |
220 + Utils.remaining(buffers) |
223 + Utils.remaining(buffers) |
415 if (errorRef.get() != null) { |
418 if (errorRef.get() != null) { |
416 pushScheduler.runOrSchedule(); |
419 pushScheduler.runOrSchedule(); |
417 return; |
420 return; |
418 } |
421 } |
419 if (completionAcknowledged) { |
422 if (completionAcknowledged) { |
|
423 logger.log(Level.DEBUG, "calling downstreamSubscriber.onComplete()"); |
420 downstreamSubscriber.onComplete(); |
424 downstreamSubscriber.onComplete(); |
421 // Fix me subscriber.onComplete.run(); |
425 // Fix me subscriber.onComplete.run(); |
422 downstreamCompleted = true; |
426 downstreamCompleted = true; |
423 cf.complete(null); |
427 cf.complete(null); |
424 } |
428 } |