1 /* |
1 /* |
2 * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. |
2 * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
4 * |
4 * |
5 * This code is free software; you can redistribute it and/or modify it |
5 * This code is free software; you can redistribute it and/or modify it |
6 * under the terms of the GNU General Public License version 2 only, as |
6 * under the terms of the GNU General Public License version 2 only, as |
7 * published by the Free Software Foundation. Oracle designates this |
7 * published by the Free Software Foundation. Oracle designates this |
285 new AtomicReference<>(); |
289 new AtomicReference<>(); |
286 private volatile DelegateWrapper subscribed; |
290 private volatile DelegateWrapper subscribed; |
287 private volatile boolean onCompleteReceived; |
291 private volatile boolean onCompleteReceived; |
288 private final AtomicReference<Throwable> errorRef |
292 private final AtomicReference<Throwable> errorRef |
289 = new AtomicReference<>(); |
293 = new AtomicReference<>(); |
|
294 |
|
295 @Override |
|
296 public String toString() { |
|
297 DelegateWrapper sub = subscribed; |
|
298 DelegateWrapper pend = pendingDelegate.get(); |
|
299 // Though final sslFD may be null if called from within |
|
300 // SSLFD::connect() as SSLTube is not fully constructed yet. |
|
301 SSLFlowDelegate sslFD = sslDelegate; |
|
302 return "SSLSubscriberWrapper[" + SSLTube.this |
|
303 + ", delegate: " + (sub == null ? pend :sub) |
|
304 + ", getALPN: " + (sslFD == null ? null : sslFD.alpn()) |
|
305 + ", onCompleteReceived: " + onCompleteReceived |
|
306 + ", onError: " + errorRef.get() + "]"; |
|
307 } |
290 |
308 |
291 // setDelegate can be called asynchronously when the SSLTube flow |
309 // setDelegate can be called asynchronously when the SSLTube flow |
292 // is connected. At this time the permanent subscriber (this class) |
310 // is connected. At this time the permanent subscriber (this class) |
293 // may already be subscribed (readSubscription != null) or not. |
311 // may already be subscribed (readSubscription != null) or not. |
294 // 1. If it's already subscribed (readSubscription != null), we |
312 // 1. If it's already subscribed (readSubscription != null), we |
317 if (subscription == null) { |
335 if (subscription == null) { |
318 if (debug.on()) |
336 if (debug.on()) |
319 debug.log("SSLSubscriberWrapper (reader) no subscription yet"); |
337 debug.log("SSLSubscriberWrapper (reader) no subscription yet"); |
320 return; |
338 return; |
321 } |
339 } |
|
340 // sslDelegate field should have been initialized by the |
|
341 // the time we reach here, as there can be no subscriber |
|
342 // until SSLTube is fully constructed. |
322 if (handleNow || !sslDelegate.resumeReader()) { |
343 if (handleNow || !sslDelegate.resumeReader()) { |
323 processPendingSubscriber(); |
344 processPendingSubscriber(); |
324 } |
345 } |
325 } |
346 } |
326 |
347 |
427 assert subscription != null; |
448 assert subscription != null; |
428 |
449 |
429 Throwable failed; |
450 Throwable failed; |
430 boolean completed; |
451 boolean completed; |
431 // reset any demand that may have been made by the previous |
452 // reset any demand that may have been made by the previous |
432 // subscriber |
453 // subscriber. sslDelegate field should have been initialized, |
|
454 // since we only reach here when there is a subscriber. |
433 sslDelegate.resetReaderDemand(); |
455 sslDelegate.resetReaderDemand(); |
434 // send the subscription to the subscriber. |
456 // send the subscription to the subscriber. |
435 subscriberImpl.onSubscribe(subscription); |
457 subscriberImpl.onSubscribe(subscription); |
436 |
458 |
437 // The following twisted logic is just here that we don't invoke |
459 // The following twisted logic is just here that we don't invoke |