404 + "subscribing pending"); |
404 + "subscribing pending"); |
405 processPendingSubscriber(); |
405 processPendingSubscriber(); |
406 } |
406 } |
407 } |
407 } |
408 |
408 |
|
409 private void complete(DelegateWrapper subscriberImpl, Throwable t) { |
|
410 try { |
|
411 if (t == null) subscriberImpl.onComplete(); |
|
412 else subscriberImpl.onError(t); |
|
413 if (debug.on()) { |
|
414 debug.log("subscriber completed %s" |
|
415 + ((t == null) ? "normally" : ("with error: " + t))); |
|
416 } |
|
417 } finally { |
|
418 // Error or EOF while reading: |
|
419 // cancel write side after completing read side |
|
420 writeSubscription.cancel(); |
|
421 } |
|
422 } |
|
423 |
409 private void onNewSubscription(DelegateWrapper subscriberImpl, |
424 private void onNewSubscription(DelegateWrapper subscriberImpl, |
410 Flow.Subscription subscription) { |
425 Flow.Subscription subscription) { |
411 assert subscriberImpl != null; |
426 assert subscriberImpl != null; |
412 assert subscription != null; |
427 assert subscription != null; |
413 |
428 |
430 |
445 |
431 if (failed != null) { |
446 if (failed != null) { |
432 if (debug.on()) |
447 if (debug.on()) |
433 debug.log("onNewSubscription: subscriberImpl:%s, invoking onError:%s", |
448 debug.log("onNewSubscription: subscriberImpl:%s, invoking onError:%s", |
434 subscriberImpl, failed); |
449 subscriberImpl, failed); |
435 subscriberImpl.onError(failed); |
450 complete(subscriberImpl, failed); |
436 } else if (completed) { |
451 } else if (completed) { |
437 if (debug.on()) |
452 if (debug.on()) |
438 debug.log("onNewSubscription: subscriberImpl:%s, invoking onCompleted", |
453 debug.log("onNewSubscription: subscriberImpl:%s, invoking onCompleted", |
439 subscriberImpl); |
454 subscriberImpl); |
440 finished = true; |
455 finished = true; |
441 subscriberImpl.onComplete(); |
456 complete(subscriberImpl, null); |
442 } |
457 } |
443 } |
458 } |
444 |
459 |
445 @Override |
460 @Override |
446 public void onNext(List<ByteBuffer> item) { |
461 public void onNext(List<ByteBuffer> item) { |
461 DelegateWrapper subscriberImpl; |
476 DelegateWrapper subscriberImpl; |
462 synchronized (this) { |
477 synchronized (this) { |
463 subscriberImpl = subscribed; |
478 subscriberImpl = subscribed; |
464 } |
479 } |
465 if (subscriberImpl != null) { |
480 if (subscriberImpl != null) { |
466 subscriberImpl.onError(failed); |
481 complete(subscriberImpl, failed); |
467 } else { |
482 } else { |
468 if (debug.on()) |
483 if (debug.on()) |
469 debug.log("%s: delegate null, stored %s", this, failed); |
484 debug.log("%s: delegate null, stored %s", this, failed); |
470 } |
485 } |
471 // now if we have any pending subscriber, we should forward |
486 // now if we have any pending subscriber, we should forward |
483 private boolean handshaking() { |
498 private boolean handshaking() { |
484 HandshakeStatus hs = engine.getHandshakeStatus(); |
499 HandshakeStatus hs = engine.getHandshakeStatus(); |
485 return !(hs == NOT_HANDSHAKING || hs == FINISHED); |
500 return !(hs == NOT_HANDSHAKING || hs == FINISHED); |
486 } |
501 } |
487 |
502 |
488 private boolean handshakeFailed() { |
503 private String handshakeFailed() { |
489 // sslDelegate can be null if we reach here |
504 // sslDelegate can be null if we reach here |
490 // during the initial handshake, as that happens |
505 // during the initial handshake, as that happens |
491 // within the SSLFlowDelegate constructor. |
506 // within the SSLFlowDelegate constructor. |
492 // In that case we will want to raise an exception. |
507 // In that case we will want to raise an exception. |
493 return handshaking() |
508 if (handshaking() |
494 && (sslDelegate == null |
509 && (sslDelegate == null |
495 || !sslDelegate.closeNotifyReceived()); |
510 || !sslDelegate.closeNotifyReceived())) { |
|
511 return "Remote host terminated the handshake"; |
|
512 } |
|
513 // The initial handshake may not have been started yet. |
|
514 // In which case - if we are completed before the initial handshake |
|
515 // is started, we consider this a handshake failure as well. |
|
516 if ("SSL_NULL_WITH_NULL_NULL".equals(engine.getSession().getCipherSuite())) |
|
517 return "Remote host closed the channel"; |
|
518 return null; |
496 } |
519 } |
497 |
520 |
498 @Override |
521 @Override |
499 public void onComplete() { |
522 public void onComplete() { |
500 assert !finished && !onCompleteReceived; |
523 assert !finished && !onCompleteReceived; |
501 DelegateWrapper subscriberImpl; |
524 DelegateWrapper subscriberImpl; |
502 synchronized(this) { |
525 synchronized(this) { |
503 subscriberImpl = subscribed; |
526 subscriberImpl = subscribed; |
504 } |
527 } |
505 |
528 |
506 if (handshakeFailed()) { |
529 String handshakeFailed = handshakeFailed(); |
507 if (debug.on()) |
530 if (handshakeFailed != null) { |
508 debug.log("handshake: %s, inbound done: %s outbound done: %s", |
531 if (debug.on()) |
|
532 debug.log("handshake: %s, inbound done: %s, outbound done: %s: %s", |
509 engine.getHandshakeStatus(), |
533 engine.getHandshakeStatus(), |
510 engine.isInboundDone(), |
534 engine.isInboundDone(), |
511 engine.isOutboundDone()); |
535 engine.isOutboundDone(), |
512 onErrorImpl(new SSLHandshakeException( |
536 handshakeFailed); |
513 "Remote host terminated the handshake")); |
537 onErrorImpl(new SSLHandshakeException(handshakeFailed)); |
514 } else if (subscriberImpl != null) { |
538 } else if (subscriberImpl != null) { |
515 onCompleteReceived = finished = true; |
539 onCompleteReceived = finished = true; |
516 subscriberImpl.onComplete(); |
540 complete(subscriberImpl, null); |
517 } else { |
541 } else { |
518 onCompleteReceived = true; |
542 onCompleteReceived = true; |
519 } |
543 } |
520 // now if we have any pending subscriber, we should complete |
544 // now if we have any pending subscriber, we should complete |
521 // them immediately as the read scheduler will already be stopped. |
545 // them immediately as the read scheduler will already be stopped. |