src/java.net.http/share/classes/java/net/http/internal/common/SSLTube.java
branchhttp-client-branch
changeset 56089 42208b2f224e
parent 56088 38fac6d0521d
equal deleted inserted replaced
56088:38fac6d0521d 56089:42208b2f224e
       
     1 /*
       
     2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package java.net.http.internal.common;
       
    27 
       
    28 import java.lang.System.Logger.Level;
       
    29 import java.nio.ByteBuffer;
       
    30 import java.util.List;
       
    31 import java.util.Objects;
       
    32 import java.util.concurrent.CompletableFuture;
       
    33 import java.util.concurrent.Executor;
       
    34 import java.util.concurrent.Flow;
       
    35 import java.util.concurrent.atomic.AtomicReference;
       
    36 import java.util.function.Consumer;
       
    37 import javax.net.ssl.SSLEngine;
       
    38 import javax.net.ssl.SSLHandshakeException;
       
    39 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
       
    40 import java.net.http.internal.common.SubscriberWrapper.SchedulingAction;
       
    41 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
       
    42 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
       
    43 
       
    44 /**
       
    45  * An implementation of FlowTube that wraps another FlowTube in an
       
    46  * SSL flow.
       
    47  * <p>
       
    48  * The following diagram shows a typical usage of the SSLTube, where
       
    49  * the SSLTube wraps a SocketTube on the right hand side, and is connected
       
    50  * to an HttpConnection on the left hand side.
       
    51  *
       
    52  * <preformatted>{@code
       
    53  *                  +----------  SSLTube -------------------------+
       
    54  *                  |                                             |
       
    55  *                  |                    +---SSLFlowDelegate---+  |
       
    56  *  HttpConnection  |                    |                     |  |   SocketTube
       
    57  *    read sink  <- SSLSubscriberW.   <- Reader <- upstreamR.() <---- read source
       
    58  *  (a subscriber)  |                    |    \         /      |  |  (a publisher)
       
    59  *                  |                    |     SSLEngine       |  |
       
    60  *  HttpConnection  |                    |    /         \      |  |   SocketTube
       
    61  *  write source -> SSLSubscriptionW. -> upstreamW.() -> Writer ----> write sink
       
    62  *  (a publisher)   |                    |                     |  |  (a subscriber)
       
    63  *                  |                    +---------------------+  |
       
    64  *                  |                                             |
       
    65  *                  +---------------------------------------------+
       
    66  * }</preformatted>
       
    67  */
       
    68 public class SSLTube implements FlowTube {
       
    69 
       
    70     static final boolean DEBUG = Utils.DEBUG; // revisit: temporary developer's flag.
       
    71     final System.Logger debug =
       
    72             Utils.getDebugLogger(this::dbgString, DEBUG);
       
    73 
       
    74     private final FlowTube tube;
       
    75     private final SSLSubscriberWrapper readSubscriber;
       
    76     private final SSLSubscriptionWrapper writeSubscription;
       
    77     private final SSLFlowDelegate sslDelegate;
       
    78     private final SSLEngine engine;
       
    79     private volatile boolean finished;
       
    80 
       
    81     public SSLTube(SSLEngine engine, Executor executor, FlowTube tube) {
       
    82         Objects.requireNonNull(engine);
       
    83         Objects.requireNonNull(executor);
       
    84         this.tube = Objects.requireNonNull(tube);
       
    85         writeSubscription = new SSLSubscriptionWrapper();
       
    86         readSubscriber = new SSLSubscriberWrapper();
       
    87         this.engine = engine;
       
    88         sslDelegate = new SSLTubeFlowDelegate(engine,
       
    89                                               executor,
       
    90                                               readSubscriber,
       
    91                                               tube);
       
    92     }
       
    93 
       
    94     final class SSLTubeFlowDelegate extends SSLFlowDelegate {
       
    95         SSLTubeFlowDelegate(SSLEngine engine, Executor executor,
       
    96                             SSLSubscriberWrapper readSubscriber,
       
    97                             FlowTube tube) {
       
    98             super(engine, executor, readSubscriber, tube);
       
    99         }
       
   100         protected SchedulingAction enterReadScheduling() {
       
   101             readSubscriber.processPendingSubscriber();
       
   102             return SchedulingAction.CONTINUE;
       
   103         }
       
   104         void connect(Flow.Subscriber<? super List<ByteBuffer>> downReader,
       
   105                      Flow.Subscriber<? super List<ByteBuffer>> downWriter) {
       
   106             assert downWriter == tube;
       
   107             assert downReader == readSubscriber;
       
   108 
       
   109             // Connect the read sink first. That's the left-hand side
       
   110             // downstream subscriber from the HttpConnection (or more
       
   111             // accurately, the SSLSubscriberWrapper that will wrap it
       
   112             // when SSLTube::connectFlows is called.
       
   113             reader.subscribe(downReader);
       
   114 
       
   115             // Connect the right hand side tube (the socket tube).
       
   116             //
       
   117             // The SSLFlowDelegate.writer publishes ByteBuffer to
       
   118             // the SocketTube for writing on the socket, and the
       
   119             // SSLFlowDelegate::upstreamReader subscribes to the
       
   120             // SocketTube to receive ByteBuffers read from the socket.
       
   121             //
       
   122             // Basically this method is equivalent to:
       
   123             //     // connect the read source:
       
   124             //     //   subscribe the SSLFlowDelegate upstream reader
       
   125             //     //   to the socket tube publisher.
       
   126             //     tube.subscribe(upstreamReader());
       
   127             //     // connect the write sink:
       
   128             //     //   subscribe the socket tube write subscriber
       
   129             //     //   with the SSLFlowDelegate downstream writer.
       
   130             //     writer.subscribe(tube);
       
   131             tube.connectFlows(FlowTube.asTubePublisher(writer),
       
   132                               FlowTube.asTubeSubscriber(upstreamReader()));
       
   133 
       
   134             // Finally connect the write source. That's the left
       
   135             // hand side publisher which will push ByteBuffer for
       
   136             // writing and encryption to the SSLFlowDelegate.
       
   137             // The writeSubscription is in fact the SSLSubscriptionWrapper
       
   138             // that will wrap the subscription provided by the
       
   139             // HttpConnection publisher when SSLTube::connectFlows
       
   140             // is called.
       
   141             upstreamWriter().onSubscribe(writeSubscription);
       
   142         }
       
   143     }
       
   144 
       
   145     public CompletableFuture<String> getALPN() {
       
   146         return sslDelegate.alpn();
       
   147     }
       
   148 
       
   149     @Override
       
   150     public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
       
   151         readSubscriber.dropSubscription();
       
   152         readSubscriber.setDelegate(s);
       
   153         s.onSubscribe(readSubscription);
       
   154     }
       
   155 
       
   156     /**
       
   157      * Tells whether, or not, this FlowTube has finished receiving data.
       
   158      *
       
   159      * @return true when one of this FlowTube Subscriber's OnError or onComplete
       
   160      * methods have been invoked
       
   161      */
       
   162     @Override
       
   163     public boolean isFinished() {
       
   164         return finished;
       
   165     }
       
   166 
       
   167     private volatile Flow.Subscription readSubscription;
       
   168 
       
   169     // The DelegateWrapper wraps a subscribed {@code Flow.Subscriber} and
       
   170     // tracks the subscriber's state. In particular it makes sure that
       
   171     // onComplete/onError are not called before onSubscribed.
       
   172     final static class DelegateWrapper implements FlowTube.TubeSubscriber {
       
   173         private final FlowTube.TubeSubscriber delegate;
       
   174         private final System.Logger debug;
       
   175         volatile boolean subscribedCalled;
       
   176         volatile boolean subscribedDone;
       
   177         volatile boolean completed;
       
   178         volatile Throwable error;
       
   179         DelegateWrapper(Flow.Subscriber<? super List<ByteBuffer>> delegate,
       
   180                         System.Logger debug) {
       
   181             this.delegate = FlowTube.asTubeSubscriber(delegate);
       
   182             this.debug = debug;
       
   183         }
       
   184 
       
   185         @Override
       
   186         public void dropSubscription() {
       
   187             if (subscribedCalled && !completed) {
       
   188                 delegate.dropSubscription();
       
   189             }
       
   190         }
       
   191 
       
   192         @Override
       
   193         public void onNext(List<ByteBuffer> item) {
       
   194             assert subscribedCalled;
       
   195             delegate.onNext(item);
       
   196         }
       
   197 
       
   198         @Override
       
   199         public void onSubscribe(Flow.Subscription subscription) {
       
   200             onSubscribe(delegate::onSubscribe, subscription);
       
   201         }
       
   202 
       
   203         private void onSubscribe(Consumer<Flow.Subscription> method,
       
   204                                  Flow.Subscription subscription) {
       
   205             subscribedCalled = true;
       
   206             method.accept(subscription);
       
   207             Throwable x;
       
   208             boolean finished;
       
   209             synchronized (this) {
       
   210                 subscribedDone = true;
       
   211                 x = error;
       
   212                 finished = completed;
       
   213             }
       
   214             if (x != null) {
       
   215                 debug.log(Level.DEBUG,
       
   216                           "Subscriber completed before subscribe: forwarding %s",
       
   217                           (Object)x);
       
   218                 delegate.onError(x);
       
   219             } else if (finished) {
       
   220                 debug.log(Level.DEBUG,
       
   221                           "Subscriber completed before subscribe: calling onComplete()");
       
   222                 delegate.onComplete();
       
   223             }
       
   224         }
       
   225 
       
   226         @Override
       
   227         public void onError(Throwable t) {
       
   228             if (completed) {
       
   229                 debug.log(Level.DEBUG,
       
   230                           "Subscriber already completed: ignoring %s",
       
   231                           (Object)t);
       
   232                 return;
       
   233             }
       
   234             boolean subscribed;
       
   235             synchronized (this) {
       
   236                 if (completed) return;
       
   237                 error = t;
       
   238                 completed = true;
       
   239                 subscribed = subscribedDone;
       
   240             }
       
   241             if (subscribed) {
       
   242                 delegate.onError(t);
       
   243             } else {
       
   244                 debug.log(Level.DEBUG,
       
   245                           "Subscriber not yet subscribed: stored %s",
       
   246                           (Object)t);
       
   247             }
       
   248         }
       
   249 
       
   250         @Override
       
   251         public void onComplete() {
       
   252             if (completed) return;
       
   253             boolean subscribed;
       
   254             synchronized (this) {
       
   255                 if (completed) return;
       
   256                 completed = true;
       
   257                 subscribed = subscribedDone;
       
   258             }
       
   259             if (subscribed) {
       
   260                 debug.log(Level.DEBUG, "DelegateWrapper: completing subscriber");
       
   261                 delegate.onComplete();
       
   262             } else {
       
   263                 debug.log(Level.DEBUG,
       
   264                           "Subscriber not yet subscribed: stored completed=true");
       
   265             }
       
   266         }
       
   267 
       
   268         @Override
       
   269         public String toString() {
       
   270             return "DelegateWrapper:" + delegate.toString();
       
   271         }
       
   272 
       
   273     }
       
   274 
       
   275     // Used to read data from the SSLTube.
       
   276     final class SSLSubscriberWrapper implements FlowTube.TubeSubscriber {
       
   277         private AtomicReference<DelegateWrapper> pendingDelegate =
       
   278                 new AtomicReference<>();
       
   279         private volatile DelegateWrapper subscribed;
       
   280         private volatile boolean onCompleteReceived;
       
   281         private final AtomicReference<Throwable> errorRef
       
   282                 = new AtomicReference<>();
       
   283 
       
   284         // setDelegate can be called asynchronously when the SSLTube flow
       
   285         // is connected. At this time the permanent subscriber (this class)
       
   286         // may already be subscribed (readSubscription != null) or not.
       
   287         // 1. If it's already subscribed (readSubscription != null), we
       
   288         //    are going to signal the SSLFlowDelegate reader, and make sure
       
   289         //    onSubscribed is called within the reader flow
       
   290         // 2. If it's not yet subscribed (readSubscription == null), then
       
   291         //    we're going to wait for onSubscribe to be called.
       
   292         //
       
   293         void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
       
   294             debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s",
       
   295                       delegate);
       
   296             assert delegate != null;
       
   297             DelegateWrapper delegateWrapper = new DelegateWrapper(delegate, debug);
       
   298             DelegateWrapper previous;
       
   299             Flow.Subscription subscription;
       
   300             boolean handleNow;
       
   301             synchronized (this) {
       
   302                 previous = pendingDelegate.getAndSet(delegateWrapper);
       
   303                 subscription = readSubscription;
       
   304                 handleNow = this.errorRef.get() != null || finished;
       
   305             }
       
   306             if (previous != null) {
       
   307                 previous.dropSubscription();
       
   308             }
       
   309             if (subscription == null) {
       
   310                 debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) no subscription yet");
       
   311                 return;
       
   312             }
       
   313             if (handleNow || !sslDelegate.resumeReader()) {
       
   314                 processPendingSubscriber();
       
   315             }
       
   316         }
       
   317 
       
   318         // Can be called outside of the flow if an error has already been
       
   319         // raise. Otherwise, must be called within the SSLFlowDelegate
       
   320         // downstream reader flow.
       
   321         // If there is a subscription, and if there is a pending delegate,
       
   322         // calls dropSubscription() on the previous delegate (if any),
       
   323         // then subscribe the pending delegate.
       
   324         void processPendingSubscriber() {
       
   325             Flow.Subscription subscription;
       
   326             DelegateWrapper delegateWrapper, previous;
       
   327             synchronized (this) {
       
   328                 delegateWrapper = pendingDelegate.get();
       
   329                 if (delegateWrapper == null) return;
       
   330                 subscription = readSubscription;
       
   331                 previous = subscribed;
       
   332             }
       
   333             if (subscription == null) {
       
   334                 debug.log(Level.DEBUG,
       
   335                          "SSLSubscriberWrapper (reader) %s",
       
   336                          "processPendingSubscriber: no subscription yet");
       
   337                 return;
       
   338             }
       
   339             delegateWrapper = pendingDelegate.getAndSet(null);
       
   340             if (delegateWrapper == null) return;
       
   341             if (previous != null) {
       
   342                 previous.dropSubscription();
       
   343             }
       
   344             onNewSubscription(delegateWrapper, subscription);
       
   345         }
       
   346 
       
   347         @Override
       
   348         public void dropSubscription() {
       
   349             DelegateWrapper subscriberImpl = subscribed;
       
   350             if (subscriberImpl != null) {
       
   351                 subscriberImpl.dropSubscription();
       
   352             }
       
   353         }
       
   354 
       
   355         @Override
       
   356         public void onSubscribe(Flow.Subscription subscription) {
       
   357             debug.log(Level.DEBUG,
       
   358                       "SSLSubscriberWrapper (reader) onSubscribe(%s)",
       
   359                       subscription);
       
   360             onSubscribeImpl(subscription);
       
   361         }
       
   362 
       
   363         // called in the reader flow, from onSubscribe.
       
   364         private void onSubscribeImpl(Flow.Subscription subscription) {
       
   365             assert subscription != null;
       
   366             DelegateWrapper subscriberImpl, pending;
       
   367             synchronized (this) {
       
   368                 readSubscription = subscription;
       
   369                 subscriberImpl = subscribed;
       
   370                 pending = pendingDelegate.get();
       
   371             }
       
   372 
       
   373             if (subscriberImpl == null && pending == null) {
       
   374                 debug.log(Level.DEBUG,
       
   375                       "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
       
   376                       "no delegate yet");
       
   377                 return;
       
   378             }
       
   379 
       
   380             if (pending == null) {
       
   381                 // There is no pending delegate, but we have a previously
       
   382                 // subscribed delegate. This is obviously a re-subscribe.
       
   383                 // We are in the downstream reader flow, so we should call
       
   384                 // onSubscribe directly.
       
   385                 debug.log(Level.DEBUG,
       
   386                       "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
       
   387                       "resubscribing");
       
   388                 onNewSubscription(subscriberImpl, subscription);
       
   389             } else {
       
   390                 // We have some pending subscriber: subscribe it now that we have
       
   391                 // a subscription. If we already had a previous delegate then
       
   392                 // it will get a dropSubscription().
       
   393                 debug.log(Level.DEBUG,
       
   394                       "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
       
   395                       "subscribing pending");
       
   396                 processPendingSubscriber();
       
   397             }
       
   398         }
       
   399 
       
   400         private void onNewSubscription(DelegateWrapper subscriberImpl,
       
   401                                        Flow.Subscription subscription) {
       
   402             assert subscriberImpl != null;
       
   403             assert subscription != null;
       
   404 
       
   405             Throwable failed;
       
   406             boolean completed;
       
   407             // reset any demand that may have been made by the previous
       
   408             // subscriber
       
   409             sslDelegate.resetReaderDemand();
       
   410             // send the subscription to the subscriber.
       
   411             subscriberImpl.onSubscribe(subscription);
       
   412 
       
   413             // The following twisted logic is just here that we don't invoke
       
   414             // onError before onSubscribe. It also prevents race conditions
       
   415             // if onError is invoked concurrently with setDelegate.
       
   416             synchronized (this) {
       
   417                 failed = this.errorRef.get();
       
   418                 completed = finished;
       
   419                 subscribed = subscriberImpl;
       
   420             }
       
   421             if (failed != null) {
       
   422                 subscriberImpl.onError(failed);
       
   423             } else if (completed) {
       
   424                 subscriberImpl.onComplete();
       
   425             }
       
   426         }
       
   427 
       
   428         @Override
       
   429         public void onNext(List<ByteBuffer> item) {
       
   430             subscribed.onNext(item);
       
   431         }
       
   432 
       
   433         public void onErrorImpl(Throwable throwable) {
       
   434             // The following twisted logic is just here that we don't invoke
       
   435             // onError before onSubscribe. It also prevents race conditions
       
   436             // if onError is invoked concurrently with setDelegate.
       
   437             // See setDelegate.
       
   438 
       
   439             errorRef.compareAndSet(null, throwable);
       
   440             Throwable failed = errorRef.get();
       
   441             finished = true;
       
   442             debug.log(Level.DEBUG, "%s: onErrorImpl: %s", this, throwable);
       
   443             DelegateWrapper subscriberImpl;
       
   444             synchronized (this) {
       
   445                 subscriberImpl = subscribed;
       
   446             }
       
   447             if (subscriberImpl != null) {
       
   448                 subscriberImpl.onError(failed);
       
   449             } else {
       
   450                 debug.log(Level.DEBUG, "%s: delegate null, stored %s", this, failed);
       
   451             }
       
   452             // now if we have any pending subscriber, we should forward
       
   453             // the error to them immediately as the read scheduler will
       
   454             // already be stopped.
       
   455             processPendingSubscriber();
       
   456         }
       
   457 
       
   458         @Override
       
   459         public void onError(Throwable throwable) {
       
   460             assert !finished && !onCompleteReceived;
       
   461             onErrorImpl(throwable);
       
   462         }
       
   463 
       
   464         private boolean handshaking() {
       
   465             HandshakeStatus hs = engine.getHandshakeStatus();
       
   466             return !(hs == NOT_HANDSHAKING || hs == FINISHED);
       
   467         }
       
   468 
       
   469         private boolean handshakeFailed() {
       
   470             // sslDelegate can be null if we reach here
       
   471             // during the initial handshake, as that happens
       
   472             // within the SSLFlowDelegate constructor.
       
   473             // In that case we will want to raise an exception.
       
   474             return handshaking()
       
   475                     && (sslDelegate == null
       
   476                     || !sslDelegate.closeNotifyReceived());
       
   477         }
       
   478 
       
   479         @Override
       
   480         public void onComplete() {
       
   481             assert !finished && !onCompleteReceived;
       
   482             onCompleteReceived = true;
       
   483             DelegateWrapper subscriberImpl;
       
   484             synchronized(this) {
       
   485                 subscriberImpl = subscribed;
       
   486             }
       
   487 
       
   488             if (handshakeFailed()) {
       
   489                 debug.log(Level.DEBUG,
       
   490                         "handshake: %s, inbound done: %s outbound done: %s",
       
   491                         engine.getHandshakeStatus(),
       
   492                         engine.isInboundDone(),
       
   493                         engine.isOutboundDone());
       
   494                 onErrorImpl(new SSLHandshakeException(
       
   495                         "Remote host terminated the handshake"));
       
   496             } else if (subscriberImpl != null) {
       
   497                 finished = true;
       
   498                 subscriberImpl.onComplete();
       
   499             }
       
   500             // now if we have any pending subscriber, we should complete
       
   501             // them immediately as the read scheduler will already be stopped.
       
   502             processPendingSubscriber();
       
   503         }
       
   504     }
       
   505 
       
   506     @Override
       
   507     public void connectFlows(TubePublisher writePub,
       
   508                              TubeSubscriber readSub) {
       
   509         debug.log(Level.DEBUG, "connecting flows");
       
   510         readSubscriber.setDelegate(readSub);
       
   511         writePub.subscribe(this);
       
   512     }
       
   513 
       
   514     /** Outstanding write demand from the SSL Flow Delegate. */
       
   515     private final Demand writeDemand = new Demand();
       
   516 
       
   517     final class SSLSubscriptionWrapper implements Flow.Subscription {
       
   518 
       
   519         volatile Flow.Subscription delegate;
       
   520 
       
   521         void setSubscription(Flow.Subscription sub) {
       
   522             long demand = writeDemand.get(); // FIXME: isn't it a racy way of passing the demand?
       
   523             delegate = sub;
       
   524             debug.log(Level.DEBUG, "setSubscription: demand=%d", demand);
       
   525             if (demand > 0)
       
   526                 sub.request(demand);
       
   527         }
       
   528 
       
   529         @Override
       
   530         public void request(long n) {
       
   531             writeDemand.increase(n);
       
   532             debug.log(Level.DEBUG, "request: n=%d", n);
       
   533             Flow.Subscription sub = delegate;
       
   534             if (sub != null && n > 0) {
       
   535                 sub.request(n);
       
   536             }
       
   537         }
       
   538 
       
   539         @Override
       
   540         public void cancel() {
       
   541             // TODO:  no-op or error?
       
   542         }
       
   543     }
       
   544 
       
   545     /* Subscriber - writing side */
       
   546     @Override
       
   547     public void onSubscribe(Flow.Subscription subscription) {
       
   548         Objects.requireNonNull(subscription);
       
   549         Flow.Subscription x = writeSubscription.delegate;
       
   550         if (x != null)
       
   551             x.cancel();
       
   552 
       
   553         writeSubscription.setSubscription(subscription);
       
   554     }
       
   555 
       
   556     @Override
       
   557     public void onNext(List<ByteBuffer> item) {
       
   558         Objects.requireNonNull(item);
       
   559         boolean decremented = writeDemand.tryDecrement();
       
   560         assert decremented : "Unexpected writeDemand: ";
       
   561         debug.log(Level.DEBUG,
       
   562                 "sending %d  buffers to SSL flow delegate", item.size());
       
   563         sslDelegate.upstreamWriter().onNext(item);
       
   564     }
       
   565 
       
   566     @Override
       
   567     public void onError(Throwable throwable) {
       
   568         Objects.requireNonNull(throwable);
       
   569         sslDelegate.upstreamWriter().onError(throwable);
       
   570     }
       
   571 
       
   572     @Override
       
   573     public void onComplete() {
       
   574         sslDelegate.upstreamWriter().onComplete();
       
   575     }
       
   576 
       
   577     @Override
       
   578     public String toString() {
       
   579         return dbgString();
       
   580     }
       
   581 
       
   582     final String dbgString() {
       
   583         return "SSLTube(" + tube + ")";
       
   584     }
       
   585 
       
   586 }