384 // coupled with the upstream subscription. This is partly because |
384 // coupled with the upstream subscription. This is partly because |
385 // the header/body parser work with a flow of ByteBuffer, whereas |
385 // the header/body parser work with a flow of ByteBuffer, whereas |
386 // we have a flow List<ByteBuffer> upstream. |
386 // we have a flow List<ByteBuffer> upstream. |
387 Http1AsyncDelegateSubscription subscription = |
387 Http1AsyncDelegateSubscription subscription = |
388 new Http1AsyncDelegateSubscription(scheduler, cancel, onSubscriptionError); |
388 new Http1AsyncDelegateSubscription(scheduler, cancel, onSubscriptionError); |
389 pending.onSubscribe(subscription); |
389 try { |
390 this.delegate = delegate = pending; |
390 pending.onSubscribe(subscription); |
|
391 } finally { |
|
392 this.delegate = delegate = pending; |
|
393 } |
391 final Object captured = delegate; |
394 final Object captured = delegate; |
392 if (debug.on()) |
395 if (debug.on()) |
393 debug.log("delegate is now " + captured |
396 debug.log("delegate is now " + captured |
394 + ", demand=" + subscription.demand().get() |
397 + ", demand=" + subscription.demand().get() |
395 + ", canRequestMore=" + canRequestMore.get() |
398 + ", canRequestMore=" + canRequestMore.get() |
483 } |
486 } |
484 } |
487 } |
485 error = ex; |
488 error = ex; |
486 } |
489 } |
487 } |
490 } |
488 final Throwable t = (recorded == null ? ex : recorded); |
491 |
489 if (debug.on()) |
492 final Throwable t = (recorded == null ? ex : recorded); |
490 debug.log("recorded " + t + "\n\t delegate: " + delegate |
493 if (debug.on()) |
491 + "\t\t queue.isEmpty: " + queue.isEmpty(), ex); |
494 debug.log("recorded " + t + "\n\t delegate: " + delegate |
|
495 + "\t\t queue.isEmpty: " + queue.isEmpty(), ex); |
492 if (Log.errors()) { |
496 if (Log.errors()) { |
493 Log.logError("HTTP/1 read subscriber recorded error: {0} - {1}", describe(), t); |
497 Log.logError("HTTP/1 read subscriber recorded error: {0} - {1}", describe(), t); |
494 } |
498 } |
495 if (queue.isEmpty() || pendingDelegateRef.get() != null || stopRequested) { |
499 if (queue.isEmpty() || pendingDelegateRef.get() != null || stopRequested) { |
496 // This callback is called from within the selector thread. |
500 // This callback is called from within the selector thread. |