307 Flow.Subscription subscription; |
307 Flow.Subscription subscription; |
308 boolean handleNow; |
308 boolean handleNow; |
309 synchronized (this) { |
309 synchronized (this) { |
310 previous = pendingDelegate.getAndSet(delegateWrapper); |
310 previous = pendingDelegate.getAndSet(delegateWrapper); |
311 subscription = readSubscription; |
311 subscription = readSubscription; |
312 handleNow = this.errorRef.get() != null || finished; |
312 handleNow = this.errorRef.get() != null || onCompleteReceived; |
313 } |
313 } |
314 if (previous != null) { |
314 if (previous != null) { |
315 previous.dropSubscription(); |
315 previous.dropSubscription(); |
316 } |
316 } |
317 if (subscription == null) { |
317 if (subscription == null) { |
422 // The following twisted logic is just here that we don't invoke |
422 // The following twisted logic is just here that we don't invoke |
423 // onError before onSubscribe. It also prevents race conditions |
423 // onError before onSubscribe. It also prevents race conditions |
424 // if onError is invoked concurrently with setDelegate. |
424 // if onError is invoked concurrently with setDelegate. |
425 synchronized (this) { |
425 synchronized (this) { |
426 failed = this.errorRef.get(); |
426 failed = this.errorRef.get(); |
427 completed = finished; |
427 completed = onCompleteReceived; |
428 subscribed = subscriberImpl; |
428 subscribed = subscriberImpl; |
429 } |
429 } |
|
430 |
430 if (failed != null) { |
431 if (failed != null) { |
|
432 if (debug.on()) |
|
433 debug.log("onNewSubscription: subscriberImpl:%s, invoking onError:%s", |
|
434 subscriberImpl, failed); |
431 subscriberImpl.onError(failed); |
435 subscriberImpl.onError(failed); |
432 } else if (completed) { |
436 } else if (completed) { |
|
437 if (debug.on()) |
|
438 debug.log("onNewSubscription: subscriberImpl:%s, invoking onCompleted", |
|
439 subscriberImpl); |
|
440 finished = true; |
433 subscriberImpl.onComplete(); |
441 subscriberImpl.onComplete(); |
434 } |
442 } |
435 } |
443 } |
436 |
444 |
437 @Override |
445 @Override |
503 engine.isInboundDone(), |
510 engine.isInboundDone(), |
504 engine.isOutboundDone()); |
511 engine.isOutboundDone()); |
505 onErrorImpl(new SSLHandshakeException( |
512 onErrorImpl(new SSLHandshakeException( |
506 "Remote host terminated the handshake")); |
513 "Remote host terminated the handshake")); |
507 } else if (subscriberImpl != null) { |
514 } else if (subscriberImpl != null) { |
508 finished = true; |
515 onCompleteReceived = finished = true; |
509 subscriberImpl.onComplete(); |
516 subscriberImpl.onComplete(); |
|
517 } else { |
|
518 onCompleteReceived = true; |
510 } |
519 } |
511 // now if we have any pending subscriber, we should complete |
520 // now if we have any pending subscriber, we should complete |
512 // them immediately as the read scheduler will already be stopped. |
521 // them immediately as the read scheduler will already be stopped. |
513 processPendingSubscriber(); |
522 processPendingSubscriber(); |
514 } |
523 } |
526 private final Demand writeDemand = new Demand(); |
535 private final Demand writeDemand = new Demand(); |
527 |
536 |
528 final class SSLSubscriptionWrapper implements Flow.Subscription { |
537 final class SSLSubscriptionWrapper implements Flow.Subscription { |
529 |
538 |
530 volatile Flow.Subscription delegate; |
539 volatile Flow.Subscription delegate; |
|
540 private volatile boolean cancelled; |
531 |
541 |
532 void setSubscription(Flow.Subscription sub) { |
542 void setSubscription(Flow.Subscription sub) { |
533 long demand = writeDemand.get(); // FIXME: isn't it a racy way of passing the demand? |
543 long demand = writeDemand.get(); // FIXME: isn't it a racy way of passing the demand? |
534 delegate = sub; |
544 delegate = sub; |
535 if (debug.on()) debug.log("setSubscription: demand=%d", demand); |
545 if (debug.on()) |
536 if (demand > 0) |
546 debug.log("setSubscription: demand=%d, cancelled:%s", demand, cancelled); |
|
547 |
|
548 if (cancelled) |
|
549 delegate.cancel(); |
|
550 else if (demand > 0) |
537 sub.request(demand); |
551 sub.request(demand); |
538 } |
552 } |
539 |
553 |
540 @Override |
554 @Override |
541 public void request(long n) { |
555 public void request(long n) { |