equal
deleted
inserted
replaced
358 } else { |
358 } else { |
359 resumeWriteEvent(inSelectorThread); |
359 resumeWriteEvent(inSelectorThread); |
360 } |
360 } |
361 } catch (Throwable t) { |
361 } catch (Throwable t) { |
362 signalError(t); |
362 signalError(t); |
363 subscription.cancel(); |
|
364 } |
363 } |
365 } |
364 } |
366 |
365 |
367 // Kick off the initial request:1 that will start the writing side. |
366 // Kick off the initial request:1 that will start the writing side. |
368 // Invoked in the selector manager thread. |
367 // Invoked in the selector manager thread. |
422 Log.logChannel("Failed to write to channel ({0}: {1})", |
421 Log.logChannel("Failed to write to channel ({0}: {1})", |
423 channelDescr(), error); |
422 channelDescr(), error); |
424 } |
423 } |
425 completed = true; |
424 completed = true; |
426 readPublisher.signalError(error); |
425 readPublisher.signalError(error); |
|
426 Flow.Subscription subscription = this.subscription; |
|
427 if (subscription != null) subscription.cancel(); |
427 } |
428 } |
428 |
429 |
429 // A repeatable WriteEvent which is paused after firing and can |
430 // A repeatable WriteEvent which is paused after firing and can |
430 // be resumed if required - see SocketFlowEvent; |
431 // be resumed if required - see SocketFlowEvent; |
431 final class WriteEvent extends SocketFlowEvent { |
432 final class WriteEvent extends SocketFlowEvent { |
466 upstreamSubscription.request(n); |
467 upstreamSubscription.request(n); |
467 } |
468 } |
468 |
469 |
469 @Override |
470 @Override |
470 public void cancel() { |
471 public void cancel() { |
|
472 if (cancelled) return; |
471 if (debug.on()) debug.log("write: cancel"); |
473 if (debug.on()) debug.log("write: cancel"); |
|
474 if (Log.channel()) { |
|
475 Log.logChannel("Cancelling write subscription"); |
|
476 } |
472 dropSubscription(); |
477 dropSubscription(); |
473 upstreamSubscription.cancel(); |
478 upstreamSubscription.cancel(); |
474 } |
479 } |
475 |
480 |
476 void dropSubscription() { |
481 void dropSubscription() { |
501 debug.log("write: no need to request more: %d", d); |
506 debug.log("write: no need to request more: %d", d); |
502 } |
507 } |
503 } catch (Throwable t) { |
508 } catch (Throwable t) { |
504 if (debug.on()) |
509 if (debug.on()) |
505 debug.log("write: error while requesting more: " + t); |
510 debug.log("write: error while requesting more: " + t); |
506 cancelled = true; |
|
507 signalError(t); |
511 signalError(t); |
508 subscription.cancel(); |
|
509 } finally { |
512 } finally { |
510 debugState("leaving requestMore: "); |
513 debugState("leaving requestMore: "); |
511 } |
514 } |
512 } |
515 } |
513 } |
516 } |