308 if (pending != null && pendingDelegateRef.compareAndSet(pending, null)) { |
308 if (pending != null && pendingDelegateRef.compareAndSet(pending, null)) { |
309 Http1AsyncDelegate delegate = this.delegate; |
309 Http1AsyncDelegate delegate = this.delegate; |
310 if (delegate != null) unsubscribe(delegate); |
310 if (delegate != null) unsubscribe(delegate); |
311 Runnable cancel = () -> { |
311 Runnable cancel = () -> { |
312 debug.log(Level.DEBUG, "Downstream subscription cancelled by %s", pending); |
312 debug.log(Level.DEBUG, "Downstream subscription cancelled by %s", pending); |
|
313 // The connection should be closed, as some data may |
|
314 // be left over in the stream. |
|
315 setRetryOnError(false); |
|
316 onReadError(new IOException("subscription cancelled")); |
313 unsubscribe(pending); |
317 unsubscribe(pending); |
314 }; |
318 }; |
315 // The subscription created by a delegate is only loosely |
319 // The subscription created by a delegate is only loosely |
316 // coupled with the upstream subscription. This is partly because |
320 // coupled with the upstream subscription. This is partly because |
317 // the header/body parser work with a flow of ByteBuffer, whereas |
321 // the header/body parser work with a flow of ByteBuffer, whereas |