198 @Override |
198 @Override |
199 public void onSubscribe(Flow.Subscription subscription) { |
199 public void onSubscribe(Flow.Subscription subscription) { |
200 onSubscribe(delegate::onSubscribe, subscription); |
200 onSubscribe(delegate::onSubscribe, subscription); |
201 } |
201 } |
202 |
202 |
203 @Override |
|
204 public void onConnection(Flow.Subscription subscription) { |
|
205 onSubscribe(delegate::onConnection, subscription); |
|
206 } |
|
207 |
|
208 private void onSubscribe(Consumer<Flow.Subscription> method, |
203 private void onSubscribe(Consumer<Flow.Subscription> method, |
209 Flow.Subscription subscription) { |
204 Flow.Subscription subscription) { |
210 subscribedCalled = true; |
205 subscribedCalled = true; |
211 method.accept(subscription); |
206 method.accept(subscription); |
212 Throwable x; |
207 Throwable x; |
290 // may already be subscribed (readSubscription != null) or not. |
285 // may already be subscribed (readSubscription != null) or not. |
291 // 1. If it's already subscribed (readSubscription != null), we |
286 // 1. If it's already subscribed (readSubscription != null), we |
292 // are going to signal the SSLFlowDelegate reader, and make sure |
287 // are going to signal the SSLFlowDelegate reader, and make sure |
293 // onSubscribed is called within the reader flow |
288 // onSubscribed is called within the reader flow |
294 // 2. If it's not yet subscribed (readSubscription == null), then |
289 // 2. If it's not yet subscribed (readSubscription == null), then |
295 // we're going to wait for onSubscribe/onConnection to be called. |
290 // we're going to wait for onSubscribe to be called. |
296 // |
291 // |
297 void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) { |
292 void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) { |
298 debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s", |
293 debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s", |
299 delegate); |
294 delegate); |
300 assert delegate != null; |
295 assert delegate != null; |
343 delegateWrapper = pendingDelegate.getAndSet(null); |
338 delegateWrapper = pendingDelegate.getAndSet(null); |
344 if (delegateWrapper == null) return; |
339 if (delegateWrapper == null) return; |
345 if (previous != null) { |
340 if (previous != null) { |
346 previous.dropSubscription(); |
341 previous.dropSubscription(); |
347 } |
342 } |
348 onNewSubscription(delegateWrapper, |
343 onNewSubscription(delegateWrapper, subscription); |
349 delegateWrapper::onSubscribe, |
|
350 subscription); |
|
351 } |
344 } |
352 |
345 |
353 @Override |
346 @Override |
354 public void dropSubscription() { |
347 public void dropSubscription() { |
355 DelegateWrapper subscriberImpl = subscribed; |
348 DelegateWrapper subscriberImpl = subscribed; |
356 if (subscriberImpl != null) { |
349 if (subscriberImpl != null) { |
357 subscriberImpl.dropSubscription(); |
350 subscriberImpl.dropSubscription(); |
358 } |
351 } |
359 } |
|
360 |
|
361 @Override |
|
362 public void onConnection(Flow.Subscription subscription) { |
|
363 debug.log(Level.DEBUG, |
|
364 "SSLSubscriberWrapper (reader) onConnection(%s)", |
|
365 subscription); |
|
366 onSubscribeImpl(subscription); |
|
367 } |
352 } |
368 |
353 |
369 @Override |
354 @Override |
370 public void onSubscribe(Flow.Subscription subscription) { |
355 public void onSubscribe(Flow.Subscription subscription) { |
371 debug.log(Level.DEBUG, |
356 debug.log(Level.DEBUG, |
372 "SSLSubscriberWrapper (reader) onSubscribe(%s)", |
357 "SSLSubscriberWrapper (reader) onSubscribe(%s)", |
373 subscription); |
358 subscription); |
374 onSubscribeImpl(subscription); |
359 onSubscribeImpl(subscription); |
375 } |
360 } |
376 |
361 |
377 // called in the reader flow, from either onSubscribe or onConnection. |
362 // called in the reader flow, from onSubscribe. |
378 private void onSubscribeImpl(Flow.Subscription subscription) { |
363 private void onSubscribeImpl(Flow.Subscription subscription) { |
379 assert subscription != null; |
364 assert subscription != null; |
380 DelegateWrapper subscriberImpl, pending; |
365 DelegateWrapper subscriberImpl, pending; |
381 synchronized (this) { |
366 synchronized (this) { |
382 readSubscription = subscription; |
367 readSubscription = subscription; |
393 |
378 |
394 if (pending == null) { |
379 if (pending == null) { |
395 // There is no pending delegate, but we have a previously |
380 // There is no pending delegate, but we have a previously |
396 // subscribed delegate. This is obviously a re-subscribe. |
381 // subscribed delegate. This is obviously a re-subscribe. |
397 // We are in the downstream reader flow, so we should call |
382 // We are in the downstream reader flow, so we should call |
398 // onConnection directly. |
383 // onSubscribe directly. |
399 debug.log(Level.DEBUG, |
384 debug.log(Level.DEBUG, |
400 "SSLSubscriberWrapper (reader) onSubscribeImpl: %s", |
385 "SSLSubscriberWrapper (reader) onSubscribeImpl: %s", |
401 "resubscribing"); |
386 "resubscribing"); |
402 onNewSubscription(subscriberImpl, |
387 onNewSubscription(subscriberImpl, subscription); |
403 subscriberImpl::onConnection, |
|
404 subscription); |
|
405 } else { |
388 } else { |
406 // We have some pending subscriber: subscribe it now that we have |
389 // We have some pending subscriber: subscribe it now that we have |
407 // a subscription. If we already had a previous delegate then |
390 // a subscription. If we already had a previous delegate then |
408 // it will get a dropSubscription(). |
391 // it will get a dropSubscription(). |
409 debug.log(Level.DEBUG, |
392 debug.log(Level.DEBUG, |
412 processPendingSubscriber(); |
395 processPendingSubscriber(); |
413 } |
396 } |
414 } |
397 } |
415 |
398 |
416 private void onNewSubscription(DelegateWrapper subscriberImpl, |
399 private void onNewSubscription(DelegateWrapper subscriberImpl, |
417 Consumer<Flow.Subscription> method, |
|
418 Flow.Subscription subscription) { |
400 Flow.Subscription subscription) { |
419 assert subscriberImpl != null; |
401 assert subscriberImpl != null; |
420 assert method != null; |
|
421 assert subscription != null; |
402 assert subscription != null; |
422 |
403 |
423 Throwable failed; |
404 Throwable failed; |
424 boolean completed; |
405 boolean completed; |
425 // reset any demand that may have been made by the previous |
406 // reset any demand that may have been made by the previous |
426 // subscriber |
407 // subscriber |
427 sslDelegate.resetReaderDemand(); |
408 sslDelegate.resetReaderDemand(); |
428 // send the subscription to the subscriber. |
409 // send the subscription to the subscriber. |
429 method.accept(subscription); |
410 subscriberImpl.onSubscribe(subscription); |
430 |
411 |
431 // The following twisted logic is just here that we don't invoke |
412 // The following twisted logic is just here that we don't invoke |
432 // onError before onSubscribe. It also prevents race conditions |
413 // onError before onSubscribe. It also prevents race conditions |
433 // if onError is invoked concurrently with setDelegate. |
414 // if onError is invoked concurrently with setDelegate. |
434 synchronized (this) { |
415 synchronized (this) { |
480 } |
461 } |
481 |
462 |
482 private boolean handshaking() { |
463 private boolean handshaking() { |
483 HandshakeStatus hs = engine.getHandshakeStatus(); |
464 HandshakeStatus hs = engine.getHandshakeStatus(); |
484 return !(hs == NOT_HANDSHAKING || hs == FINISHED); |
465 return !(hs == NOT_HANDSHAKING || hs == FINISHED); |
|
466 } |
|
467 |
|
468 private boolean handshakeFailed() { |
|
469 // sslDelegate can be null if we reach here |
|
470 // during the initial handshake, as that happens |
|
471 // within the SSLFlowDelegate constructor. |
|
472 // In that case we will want to raise an exception. |
|
473 return handshaking() |
|
474 && (sslDelegate == null |
|
475 || !sslDelegate.closeNotifyReceived()); |
485 } |
476 } |
486 |
477 |
487 @Override |
478 @Override |
488 public void onComplete() { |
479 public void onComplete() { |
489 assert !finished && !onCompleteReceived; |
480 assert !finished && !onCompleteReceived; |
491 DelegateWrapper subscriberImpl; |
482 DelegateWrapper subscriberImpl; |
492 synchronized(this) { |
483 synchronized(this) { |
493 subscriberImpl = subscribed; |
484 subscriberImpl = subscribed; |
494 } |
485 } |
495 |
486 |
496 if (handshaking()) { |
487 if (handshakeFailed()) { |
|
488 debug.log(Level.DEBUG, |
|
489 "handshake: %s, inbound done: %s outbound done: %s", |
|
490 engine.getHandshakeStatus(), |
|
491 engine.isInboundDone(), |
|
492 engine.isOutboundDone()); |
497 onErrorImpl(new SSLHandshakeException( |
493 onErrorImpl(new SSLHandshakeException( |
498 "Remote host terminated the handshake")); |
494 "Remote host terminated the handshake")); |
499 } else if (subscriberImpl != null) { |
495 } else if (subscriberImpl != null) { |
500 finished = true; |
496 finished = true; |
501 subscriberImpl.onComplete(); |
497 subscriberImpl.onComplete(); |