src/java.net.http/share/classes/java/net/http/internal/SocketTube.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56091 aedd6133e7a0
child 56093 22d94c4a3641
equal deleted inserted replaced
56091:aedd6133e7a0 56092:fd85b2bf2b0d
     1 /*
       
     2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package java.net.http.internal;
       
    27 
       
    28 import java.io.EOFException;
       
    29 import java.io.IOException;
       
    30 import java.lang.System.Logger.Level;
       
    31 import java.nio.ByteBuffer;
       
    32 import java.util.List;
       
    33 import java.util.Objects;
       
    34 import java.util.concurrent.Flow;
       
    35 import java.util.concurrent.atomic.AtomicLong;
       
    36 import java.util.concurrent.atomic.AtomicReference;
       
    37 import java.nio.channels.SelectableChannel;
       
    38 import java.nio.channels.SelectionKey;
       
    39 import java.nio.channels.SocketChannel;
       
    40 import java.util.ArrayList;
       
    41 import java.util.function.Consumer;
       
    42 import java.util.function.Supplier;
       
    43 
       
    44 import java.net.http.internal.common.Demand;
       
    45 import java.net.http.internal.common.FlowTube;
       
    46 import java.net.http.internal.common.SequentialScheduler;
       
    47 import java.net.http.internal.common.SequentialScheduler.DeferredCompleter;
       
    48 import java.net.http.internal.common.SequentialScheduler.RestartableTask;
       
    49 import java.net.http.internal.common.Utils;
       
    50 
       
    51 /**
       
    52  * A SocketTube is a terminal tube plugged directly into the socket.
       
    53  * The read subscriber should call {@code subscribe} on the SocketTube before
       
    54  * the SocketTube can be subscribed to the write publisher.
       
    55  */
       
    56 final class SocketTube implements FlowTube {
       
    57 
       
    58     static final boolean DEBUG = Utils.DEBUG; // revisit: temporary developer's flag
       
    59     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
    60     static final AtomicLong IDS = new AtomicLong();
       
    61 
       
    62     private final HttpClientImpl client;
       
    63     private final SocketChannel channel;
       
    64     private final Supplier<ByteBuffer> buffersSource;
       
    65     private final Object lock = new Object();
       
    66     private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
       
    67     private final InternalReadPublisher readPublisher;
       
    68     private final InternalWriteSubscriber writeSubscriber;
       
    69     private final long id = IDS.incrementAndGet();
       
    70 
       
    71     public SocketTube(HttpClientImpl client, SocketChannel channel,
       
    72                       Supplier<ByteBuffer> buffersSource) {
       
    73         this.client = client;
       
    74         this.channel = channel;
       
    75         this.buffersSource = buffersSource;
       
    76         this.readPublisher = new InternalReadPublisher();
       
    77         this.writeSubscriber = new InternalWriteSubscriber();
       
    78     }
       
    79 
       
    80 //    private static Flow.Subscription nopSubscription() {
       
    81 //        return new Flow.Subscription() {
       
    82 //            @Override public void request(long n) { }
       
    83 //            @Override public void cancel() { }
       
    84 //        };
       
    85 //    }
       
    86 
       
    87     /**
       
    88      * Returns {@code true} if this flow is finished.
       
    89      * This happens when this flow internal read subscription is completed,
       
    90      * either normally (EOF reading) or exceptionally  (EOF writing, or
       
    91      * underlying socket closed, or some exception occurred while reading or
       
    92      * writing to the socket).
       
    93      *
       
    94      * @return {@code true} if this flow is finished.
       
    95      */
       
    96     public boolean isFinished() {
       
    97         InternalReadPublisher.InternalReadSubscription subscription =
       
    98                 readPublisher.subscriptionImpl;
       
    99         return subscription != null && subscription.completed
       
   100                 || subscription == null && errorRef.get() != null;
       
   101     }
       
   102 
       
   103     // ===================================================================== //
       
   104     //                       Flow.Publisher                                  //
       
   105     // ======================================================================//
       
   106 
       
   107     /**
       
   108      * {@inheritDoc }
       
   109      * @apiNote This method should be called first. In particular, the caller
       
   110      *          must ensure that this method must be called by the read
       
   111      *          subscriber before the write publisher can call {@code onSubscribe}.
       
   112      *          Failure to adhere to this contract may result in assertion errors.
       
   113      */
       
   114     @Override
       
   115     public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
       
   116         Objects.requireNonNull(s);
       
   117         assert s instanceof TubeSubscriber : "Expected TubeSubscriber, got:" + s;
       
   118         readPublisher.subscribe(s);
       
   119     }
       
   120 
       
   121 
       
   122     // ===================================================================== //
       
   123     //                       Flow.Subscriber                                 //
       
   124     // ======================================================================//
       
   125 
       
   126     /**
       
   127      * {@inheritDoc }
       
   128      * @apiNote The caller must ensure that {@code subscribe} is called by
       
   129      *          the read subscriber before {@code onSubscribe} is called by
       
   130      *          the write publisher.
       
   131      *          Failure to adhere to this contract may result in assertion errors.
       
   132      */
       
   133     @Override
       
   134     public void onSubscribe(Flow.Subscription subscription) {
       
   135         writeSubscriber.onSubscribe(subscription);
       
   136     }
       
   137 
       
   138     @Override
       
   139     public void onNext(List<ByteBuffer> item) {
       
   140         writeSubscriber.onNext(item);
       
   141     }
       
   142 
       
   143     @Override
       
   144     public void onError(Throwable throwable) {
       
   145         writeSubscriber.onError(throwable);
       
   146     }
       
   147 
       
   148     @Override
       
   149     public void onComplete() {
       
   150         writeSubscriber.onComplete();
       
   151     }
       
   152 
       
   153     // ===================================================================== //
       
   154     //                           Events                                      //
       
   155     // ======================================================================//
       
   156 
       
   157     /**
       
   158      * A restartable task used to process tasks in sequence.
       
   159      */
       
   160     private static class SocketFlowTask implements RestartableTask {
       
   161         final Runnable task;
       
   162         private final Object monitor = new Object();
       
   163         SocketFlowTask(Runnable task) {
       
   164             this.task = task;
       
   165         }
       
   166         @Override
       
   167         public final void run(DeferredCompleter taskCompleter) {
       
   168             try {
       
   169                 // non contentious synchronized for visibility.
       
   170                 synchronized(monitor) {
       
   171                     task.run();
       
   172                 }
       
   173             } finally {
       
   174                 taskCompleter.complete();
       
   175             }
       
   176         }
       
   177     }
       
   178 
       
   179     // This is best effort - there's no guarantee that the printed set
       
   180     // of values is consistent. It should only be considered as
       
   181     // weakly accurate - in particular in what concerns the events states,
       
   182     // especially when displaying a read event state from a write event
       
   183     // callback and conversely.
       
   184     void debugState(String when) {
       
   185         if (debug.isLoggable(Level.DEBUG)) {
       
   186             StringBuilder state = new StringBuilder();
       
   187 
       
   188             InternalReadPublisher.InternalReadSubscription sub =
       
   189                     readPublisher.subscriptionImpl;
       
   190             InternalReadPublisher.ReadEvent readEvent =
       
   191                     sub == null ? null : sub.readEvent;
       
   192             Demand rdemand = sub == null ? null : sub.demand;
       
   193             InternalWriteSubscriber.WriteEvent writeEvent =
       
   194                     writeSubscriber.writeEvent;
       
   195             AtomicLong wdemand = writeSubscriber.writeDemand;
       
   196             int rops = readEvent == null ? 0 : readEvent.interestOps();
       
   197             long rd = rdemand == null ? 0 : rdemand.get();
       
   198             int wops = writeEvent == null ? 0 : writeEvent.interestOps();
       
   199             long wd = wdemand == null ? 0 : wdemand.get();
       
   200 
       
   201             state.append(when).append(" Reading: [ops=")
       
   202                     .append(rops).append(", demand=").append(rd)
       
   203                     .append(", stopped=")
       
   204                     .append((sub == null ? false : sub.readScheduler.isStopped()))
       
   205                     .append("], Writing: [ops=").append(wops)
       
   206                     .append(", demand=").append(wd)
       
   207                     .append("]");
       
   208             debug.log(Level.DEBUG, state.toString());
       
   209         }
       
   210     }
       
   211 
       
   212     /**
       
   213      * A repeatable event that can be paused or resumed by changing
       
   214      * its interestOps.
       
   215      * When the event is fired, it is first paused before being signaled.
       
   216      * It is the responsibility of the code triggered by {@code signalEvent}
       
   217      * to resume the event if required.
       
   218      */
       
   219     private static abstract class SocketFlowEvent extends AsyncEvent {
       
   220         final SocketChannel channel;
       
   221         final int defaultInterest;
       
   222         volatile int interestOps;
       
   223         volatile boolean registered;
       
   224         SocketFlowEvent(int defaultInterest, SocketChannel channel) {
       
   225             super(AsyncEvent.REPEATING);
       
   226             this.defaultInterest = defaultInterest;
       
   227             this.channel = channel;
       
   228         }
       
   229         final boolean registered() {return registered;}
       
   230         final void resume() {
       
   231             interestOps = defaultInterest;
       
   232             registered = true;
       
   233         }
       
   234         final void pause() {interestOps = 0;}
       
   235         @Override
       
   236         public final SelectableChannel channel() {return channel;}
       
   237         @Override
       
   238         public final int interestOps() {return interestOps;}
       
   239 
       
   240         @Override
       
   241         public final void handle() {
       
   242             pause();       // pause, then signal
       
   243             signalEvent(); // won't be fired again until resumed.
       
   244         }
       
   245         @Override
       
   246         public final void abort(IOException error) {
       
   247             debug().log(Level.DEBUG, () -> "abort: " + error);
       
   248             pause();              // pause, then signal
       
   249             signalError(error);   // should not be resumed after abort (not checked)
       
   250         }
       
   251 
       
   252         protected abstract void signalEvent();
       
   253         protected abstract void signalError(Throwable error);
       
   254         abstract System.Logger debug();
       
   255     }
       
   256 
       
   257     // ===================================================================== //
       
   258     //                              Writing                                  //
       
   259     // ======================================================================//
       
   260 
       
   261     // This class makes the assumption that the publisher will call
       
   262     // onNext sequentially, and that onNext won't be called if the demand
       
   263     // has not been incremented by request(1).
       
   264     // It has a 'queue of 1' meaning that it will call request(1) in
       
   265     // onSubscribe, and then only after its 'current' buffer list has been
       
   266     // fully written and current set to null;
       
   267     private final class InternalWriteSubscriber
       
   268             implements Flow.Subscriber<List<ByteBuffer>> {
       
   269 
       
   270         volatile Flow.Subscription subscription;
       
   271         volatile List<ByteBuffer> current;
       
   272         volatile boolean completed;
       
   273         final WriteEvent writeEvent = new WriteEvent(channel, this);
       
   274         final AtomicLong writeDemand = new AtomicLong();
       
   275 
       
   276         @Override
       
   277         public void onSubscribe(Flow.Subscription subscription) {
       
   278             Flow.Subscription previous = this.subscription;
       
   279             this.subscription = subscription;
       
   280             debug.log(Level.DEBUG, "subscribed for writing");
       
   281             if (current == null) {
       
   282                 if (previous == subscription || previous == null) {
       
   283                     if (writeDemand.compareAndSet(0, 1)) {
       
   284                         subscription.request(1);
       
   285                     }
       
   286                 } else {
       
   287                     writeDemand.set(1);
       
   288                     subscription.request(1);
       
   289                 }
       
   290             }
       
   291         }
       
   292 
       
   293         @Override
       
   294         public void onNext(List<ByteBuffer> bufs) {
       
   295             assert current == null; // this is a queue of 1.
       
   296             assert subscription != null;
       
   297             current = bufs;
       
   298             tryFlushCurrent(client.isSelectorThread()); // may be in selector thread
       
   299             // For instance in HTTP/2, a received SETTINGS frame might trigger
       
   300             // the sending of a SETTINGS frame in turn which might cause
       
   301             // onNext to be called from within the same selector thread that the
       
   302             // original SETTINGS frames arrived on. If rs is the read-subscriber
       
   303             // and ws is the write-subscriber then the following can occur:
       
   304             // ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write
       
   305             // client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent
       
   306             debugState("leaving w.onNext");
       
   307         }
       
   308 
       
   309         // we don't use a SequentialScheduler here: we rely on
       
   310         // onNext() being called sequentially, and not being called
       
   311         // if we haven't call request(1)
       
   312         // onNext is usually called from within a user/executor thread.
       
   313         // we will perform the initial writing in that thread.
       
   314         // if for some reason, not all data can be written, the writeEvent
       
   315         // will be resumed, and the rest of the data will be written from
       
   316         // the selector manager thread when the writeEvent is fired.
       
   317         // If we are in the selector manager thread, then we will use the executor
       
   318         // to call request(1), ensuring that onNext() won't be called from
       
   319         // within the selector thread.
       
   320         // If we are not in the selector manager thread, then we don't care.
       
   321         void tryFlushCurrent(boolean inSelectorThread) {
       
   322             List<ByteBuffer> bufs = current;
       
   323             if (bufs == null) return;
       
   324             try {
       
   325                 assert inSelectorThread == client.isSelectorThread() :
       
   326                        "should " + (inSelectorThread ? "" : "not ")
       
   327                         + " be in the selector thread";
       
   328                 long remaining = Utils.remaining(bufs);
       
   329                 debug.log(Level.DEBUG, "trying to write: %d", remaining);
       
   330                 long written = writeAvailable(bufs);
       
   331                 debug.log(Level.DEBUG, "wrote: %d", remaining);
       
   332                 if (written == -1) {
       
   333                     signalError(new EOFException("EOF reached while writing"));
       
   334                     return;
       
   335                 }
       
   336                 assert written <= remaining;
       
   337                 if (remaining - written == 0) {
       
   338                     current = null;
       
   339                     writeDemand.decrementAndGet();
       
   340                     Runnable requestMore = this::requestMore;
       
   341                     if (inSelectorThread) {
       
   342                         assert client.isSelectorThread();
       
   343                         client.theExecutor().execute(requestMore);
       
   344                     } else {
       
   345                         assert !client.isSelectorThread();
       
   346                         requestMore.run();
       
   347                     }
       
   348                 } else {
       
   349                     resumeWriteEvent(inSelectorThread);
       
   350                 }
       
   351             } catch (Throwable t) {
       
   352                 signalError(t);
       
   353                 subscription.cancel();
       
   354             }
       
   355         }
       
   356 
       
   357         void requestMore() {
       
   358             try {
       
   359                 if (completed) return;
       
   360                 long d =  writeDemand.get();
       
   361                 if (writeDemand.compareAndSet(0,1)) {
       
   362                     debug.log(Level.DEBUG, "write: requesting more...");
       
   363                     subscription.request(1);
       
   364                 } else {
       
   365                     debug.log(Level.DEBUG, "write: no need to request more: %d", d);
       
   366                 }
       
   367             } catch (Throwable t) {
       
   368                 debug.log(Level.DEBUG, () ->
       
   369                         "write: error while requesting more: " + t);
       
   370                 signalError(t);
       
   371                 subscription.cancel();
       
   372             } finally {
       
   373                 debugState("leaving requestMore: ");
       
   374             }
       
   375         }
       
   376 
       
   377         @Override
       
   378         public void onError(Throwable throwable) {
       
   379             signalError(throwable);
       
   380         }
       
   381 
       
   382         @Override
       
   383         public void onComplete() {
       
   384             completed = true;
       
   385             // no need to pause the write event here: the write event will
       
   386             // be paused if there is nothing more to write.
       
   387             List<ByteBuffer> bufs = current;
       
   388             long remaining = bufs == null ? 0 : Utils.remaining(bufs);
       
   389             debug.log(Level.DEBUG,  "write completed, %d yet to send", remaining);
       
   390             debugState("InternalWriteSubscriber::onComplete");
       
   391         }
       
   392 
       
   393         void resumeWriteEvent(boolean inSelectorThread) {
       
   394             debug.log(Level.DEBUG, "scheduling write event");
       
   395             resumeEvent(writeEvent, this::signalError);
       
   396         }
       
   397 
       
   398 //        void pauseWriteEvent() {
       
   399 //            debug.log(Level.DEBUG, "pausing write event");
       
   400 //            pauseEvent(writeEvent, this::signalError);
       
   401 //        }
       
   402 
       
   403         void signalWritable() {
       
   404             debug.log(Level.DEBUG, "channel is writable");
       
   405             tryFlushCurrent(true);
       
   406         }
       
   407 
       
   408         void signalError(Throwable error) {
       
   409             debug.log(Level.DEBUG, () -> "write error: " + error);
       
   410             completed = true;
       
   411             readPublisher.signalError(error);
       
   412         }
       
   413 
       
   414         // A repeatable WriteEvent which is paused after firing and can
       
   415         // be resumed if required - see SocketFlowEvent;
       
   416         final class WriteEvent extends SocketFlowEvent {
       
   417             final InternalWriteSubscriber sub;
       
   418             WriteEvent(SocketChannel channel, InternalWriteSubscriber sub) {
       
   419                 super(SelectionKey.OP_WRITE, channel);
       
   420                 this.sub = sub;
       
   421             }
       
   422             @Override
       
   423             protected final void signalEvent() {
       
   424                 try {
       
   425                     client.eventUpdated(this);
       
   426                     sub.signalWritable();
       
   427                 } catch(Throwable t) {
       
   428                     sub.signalError(t);
       
   429                 }
       
   430             }
       
   431 
       
   432             @Override
       
   433             protected void signalError(Throwable error) {
       
   434                 sub.signalError(error);
       
   435             }
       
   436 
       
   437             @Override
       
   438             System.Logger debug() {
       
   439                 return debug;
       
   440             }
       
   441 
       
   442         }
       
   443 
       
   444     }
       
   445 
       
   446     // ===================================================================== //
       
   447     //                              Reading                                  //
       
   448     // ===================================================================== //
       
   449 
       
   450     // The InternalReadPublisher uses a SequentialScheduler to ensure that
       
   451     // onNext/onError/onComplete are called sequentially on the caller's
       
   452     // subscriber.
       
   453     // However, it relies on the fact that the only time where
       
   454     // runOrSchedule() is called from a user/executor thread is in signalError,
       
   455     // right after the errorRef has been set.
       
   456     // Because the sequential scheduler's task always checks for errors first,
       
   457     // and always terminate the scheduler on error, then it is safe to assume
       
   458     // that if it reaches the point where it reads from the channel, then
       
   459     // it is running in the SelectorManager thread. This is because all
       
   460     // other invocation of runOrSchedule() are triggered from within a
       
   461     // ReadEvent.
       
   462     //
       
   463     // When pausing/resuming the event, some shortcuts can then be taken
       
   464     // when we know we're running in the selector manager thread
       
   465     // (in that case there's no need to call client.eventUpdated(readEvent);
       
   466     //
       
   467     private final class InternalReadPublisher
       
   468             implements Flow.Publisher<List<ByteBuffer>> {
       
   469         private final InternalReadSubscription subscriptionImpl
       
   470                 = new InternalReadSubscription();
       
   471         AtomicReference<ReadSubscription> pendingSubscription = new AtomicReference<>();
       
   472         private volatile ReadSubscription subscription;
       
   473 
       
   474         @Override
       
   475         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
       
   476             Objects.requireNonNull(s);
       
   477 
       
   478             TubeSubscriber sub = FlowTube.asTubeSubscriber(s);
       
   479             ReadSubscription target = new ReadSubscription(subscriptionImpl, sub);
       
   480             ReadSubscription previous = pendingSubscription.getAndSet(target);
       
   481 
       
   482             if (previous != null && previous != target) {
       
   483                 debug.log(Level.DEBUG,
       
   484                         () -> "read publisher: dropping pending subscriber: "
       
   485                         + previous.subscriber);
       
   486                 previous.errorRef.compareAndSet(null, errorRef.get());
       
   487                 previous.signalOnSubscribe();
       
   488                 if (subscriptionImpl.completed) {
       
   489                     previous.signalCompletion();
       
   490                 } else {
       
   491                     previous.subscriber.dropSubscription();
       
   492                 }
       
   493             }
       
   494 
       
   495             debug.log(Level.DEBUG, "read publisher got subscriber");
       
   496             subscriptionImpl.signalSubscribe();
       
   497             debugState("leaving read.subscribe: ");
       
   498         }
       
   499 
       
   500         void signalError(Throwable error) {
       
   501             if (!errorRef.compareAndSet(null, error)) {
       
   502                 return;
       
   503             }
       
   504             subscriptionImpl.handleError();
       
   505         }
       
   506 
       
   507         final class ReadSubscription implements Flow.Subscription {
       
   508             final InternalReadSubscription impl;
       
   509             final TubeSubscriber  subscriber;
       
   510             final AtomicReference<Throwable> errorRef = new AtomicReference<>();
       
   511             volatile boolean subscribed;
       
   512             volatile boolean cancelled;
       
   513             volatile boolean completed;
       
   514 
       
   515             public ReadSubscription(InternalReadSubscription impl,
       
   516                                     TubeSubscriber subscriber) {
       
   517                 this.impl = impl;
       
   518                 this.subscriber = subscriber;
       
   519             }
       
   520 
       
   521             @Override
       
   522             public void cancel() {
       
   523                 cancelled = true;
       
   524             }
       
   525 
       
   526             @Override
       
   527             public void request(long n) {
       
   528                 if (!cancelled) {
       
   529                     impl.request(n);
       
   530                 } else {
       
   531                     debug.log(Level.DEBUG,
       
   532                               "subscription cancelled, ignoring request %d", n);
       
   533                 }
       
   534             }
       
   535 
       
   536             void signalCompletion() {
       
   537                 assert subscribed || cancelled;
       
   538                 if (completed || cancelled) return;
       
   539                 synchronized (this) {
       
   540                     if (completed) return;
       
   541                     completed = true;
       
   542                 }
       
   543                 Throwable error = errorRef.get();
       
   544                 if (error != null) {
       
   545                     debug.log(Level.DEBUG, () ->
       
   546                         "forwarding error to subscriber: "
       
   547                         + error);
       
   548                     subscriber.onError(error);
       
   549                 } else {
       
   550                     debug.log(Level.DEBUG, "completing subscriber");
       
   551                     subscriber.onComplete();
       
   552                 }
       
   553             }
       
   554 
       
   555             void signalOnSubscribe() {
       
   556                 if (subscribed || cancelled) return;
       
   557                 synchronized (this) {
       
   558                     if (subscribed || cancelled) return;
       
   559                     subscribed = true;
       
   560                 }
       
   561                 subscriber.onSubscribe(this);
       
   562                 debug.log(Level.DEBUG, "onSubscribe called");
       
   563                 if (errorRef.get() != null) {
       
   564                     signalCompletion();
       
   565                 }
       
   566             }
       
   567         }
       
   568 
       
   569         final class InternalReadSubscription implements Flow.Subscription {
       
   570 
       
   571             private final Demand demand = new Demand();
       
   572             final SequentialScheduler readScheduler;
       
   573             private volatile boolean completed;
       
   574             private final ReadEvent readEvent;
       
   575             private final AsyncEvent subscribeEvent;
       
   576 
       
   577             InternalReadSubscription() {
       
   578                 readScheduler = new SequentialScheduler(new SocketFlowTask(this::read));
       
   579                 subscribeEvent = new AsyncTriggerEvent(this::signalError,
       
   580                                                        this::handleSubscribeEvent);
       
   581                 readEvent = new ReadEvent(channel, this);
       
   582             }
       
   583 
       
   584             /*
       
   585              * This method must be invoked before any other method of this class.
       
   586              */
       
   587             final void signalSubscribe() {
       
   588                 if (readScheduler.isStopped() || completed) {
       
   589                     // if already completed or stopped we can handle any
       
   590                     // pending connection directly from here.
       
   591                     debug.log(Level.DEBUG,
       
   592                               "handling pending subscription while completed");
       
   593                     handlePending();
       
   594                 } else {
       
   595                     try {
       
   596                         debug.log(Level.DEBUG,
       
   597                                   "registering subscribe event");
       
   598                         client.registerEvent(subscribeEvent);
       
   599                     } catch (Throwable t) {
       
   600                         signalError(t);
       
   601                         handlePending();
       
   602                     }
       
   603                 }
       
   604             }
       
   605 
       
   606             final void handleSubscribeEvent() {
       
   607                 assert client.isSelectorThread();
       
   608                 debug.log(Level.DEBUG, "subscribe event raised");
       
   609                 readScheduler.runOrSchedule();
       
   610                 if (readScheduler.isStopped() || completed) {
       
   611                     // if already completed or stopped we can handle any
       
   612                     // pending connection directly from here.
       
   613                     debug.log(Level.DEBUG,
       
   614                               "handling pending subscription when completed");
       
   615                     handlePending();
       
   616                 }
       
   617             }
       
   618 
       
   619 
       
   620             /*
       
   621              * Although this method is thread-safe, the Reactive-Streams spec seems
       
   622              * to not require it to be as such. It's a responsibility of the
       
   623              * subscriber to signal demand in a thread-safe manner.
       
   624              *
       
   625              * https://github.com/reactive-streams/reactive-streams-jvm/blob/dd24d2ab164d7de6c316f6d15546f957bec29eaa/README.md
       
   626              * (rules 2.7 and 3.4)
       
   627              */
       
   628             @Override
       
   629             public final void request(long n) {
       
   630                 if (n > 0L) {
       
   631                     boolean wasFulfilled = demand.increase(n);
       
   632                     if (wasFulfilled) {
       
   633                         debug.log(Level.DEBUG, "got some demand for reading");
       
   634                         resumeReadEvent();
       
   635                         // if demand has been changed from fulfilled
       
   636                         // to unfulfilled register read event;
       
   637                     }
       
   638                 } else {
       
   639                     signalError(new IllegalArgumentException("non-positive request"));
       
   640                 }
       
   641                 debugState("leaving request("+n+"): ");
       
   642             }
       
   643 
       
   644             @Override
       
   645             public final void cancel() {
       
   646                 pauseReadEvent();
       
   647                 readScheduler.stop();
       
   648             }
       
   649 
       
   650             private void resumeReadEvent() {
       
   651                 debug.log(Level.DEBUG, "resuming read event");
       
   652                 resumeEvent(readEvent, this::signalError);
       
   653             }
       
   654 
       
   655             private void pauseReadEvent() {
       
   656                 debug.log(Level.DEBUG, "pausing read event");
       
   657                 pauseEvent(readEvent, this::signalError);
       
   658             }
       
   659 
       
   660 
       
   661             final void handleError() {
       
   662                 assert errorRef.get() != null;
       
   663                 readScheduler.runOrSchedule();
       
   664             }
       
   665 
       
   666             final void signalError(Throwable error) {
       
   667                 if (!errorRef.compareAndSet(null, error)) {
       
   668                     return;
       
   669                 }
       
   670                 debug.log(Level.DEBUG, () -> "got read error: " + error);
       
   671                 readScheduler.runOrSchedule();
       
   672             }
       
   673 
       
   674             final void signalReadable() {
       
   675                 readScheduler.runOrSchedule();
       
   676             }
       
   677 
       
   678             /** The body of the task that runs in SequentialScheduler. */
       
   679             final void read() {
       
   680                 // It is important to only call pauseReadEvent() when stopping
       
   681                 // the scheduler. The event is automatically paused before
       
   682                 // firing, and trying to pause it again could cause a race
       
   683                 // condition between this loop, which calls tryDecrementDemand(),
       
   684                 // and the thread that calls request(n), which will try to resume
       
   685                 // reading.
       
   686                 try {
       
   687                     while(!readScheduler.isStopped()) {
       
   688                         if (completed) return;
       
   689 
       
   690                         // make sure we have a subscriber
       
   691                         if (handlePending()) {
       
   692                             debug.log(Level.DEBUG, "pending subscriber subscribed");
       
   693                             return;
       
   694                         }
       
   695 
       
   696                         // If an error was signaled, we might not be in the
       
   697                         // the selector thread, and that is OK, because we
       
   698                         // will just call onError and return.
       
   699                         ReadSubscription current = subscription;
       
   700                         TubeSubscriber subscriber = current.subscriber;
       
   701                         Throwable error = errorRef.get();
       
   702                         if (error != null) {
       
   703                             completed = true;
       
   704                             // safe to pause here because we're finished anyway.
       
   705                             pauseReadEvent();
       
   706                             debug.log(Level.DEBUG, () -> "Sending error " + error
       
   707                                   + " to subscriber " + subscriber);
       
   708                             current.errorRef.compareAndSet(null, error);
       
   709                             current.signalCompletion();
       
   710                             readScheduler.stop();
       
   711                             debugState("leaving read() loop with error: ");
       
   712                             return;
       
   713                         }
       
   714 
       
   715                         // If we reach here then we must be in the selector thread.
       
   716                         assert client.isSelectorThread();
       
   717                         if (demand.tryDecrement()) {
       
   718                             // we have demand.
       
   719                             try {
       
   720                                 List<ByteBuffer> bytes = readAvailable();
       
   721                                 if (bytes == EOF) {
       
   722                                     if (!completed) {
       
   723                                         debug.log(Level.DEBUG, "got read EOF");
       
   724                                         completed = true;
       
   725                                         // safe to pause here because we're finished
       
   726                                         // anyway.
       
   727                                         pauseReadEvent();
       
   728                                         current.signalCompletion();
       
   729                                         readScheduler.stop();
       
   730                                     }
       
   731                                     debugState("leaving read() loop after EOF: ");
       
   732                                     return;
       
   733                                 } else if (Utils.remaining(bytes) > 0) {
       
   734                                     // the subscriber is responsible for offloading
       
   735                                     // to another thread if needed.
       
   736                                     debug.log(Level.DEBUG, () -> "read bytes: "
       
   737                                             + Utils.remaining(bytes));
       
   738                                     assert !current.completed;
       
   739                                     subscriber.onNext(bytes);
       
   740                                     // we could continue looping until the demand
       
   741                                     // reaches 0. However, that would risk starving
       
   742                                     // other connections (bound to other socket
       
   743                                     // channels) - as other selected keys activated
       
   744                                     // by the selector manager thread might be
       
   745                                     // waiting for this event to terminate.
       
   746                                     // So resume the read event and return now...
       
   747                                     resumeReadEvent();
       
   748                                     debugState("leaving read() loop after onNext: ");
       
   749                                     return;
       
   750                                 } else {
       
   751                                     // nothing available!
       
   752                                     debug.log(Level.DEBUG, "no more bytes available");
       
   753                                     // re-increment the demand and resume the read
       
   754                                     // event. This ensures that this loop is
       
   755                                     // executed again when the socket becomes
       
   756                                     // readable again.
       
   757                                     demand.increase(1);
       
   758                                     resumeReadEvent();
       
   759                                     debugState("leaving read() loop with no bytes");
       
   760                                     return;
       
   761                                 }
       
   762                             } catch (Throwable x) {
       
   763                                 signalError(x);
       
   764                                 continue;
       
   765                             }
       
   766                         } else {
       
   767                             debug.log(Level.DEBUG, "no more demand for reading");
       
   768                             // the event is paused just after firing, so it should
       
   769                             // still be paused here, unless the demand was just
       
   770                             // incremented from 0 to n, in which case, the
       
   771                             // event will be resumed, causing this loop to be
       
   772                             // invoked again when the socket becomes readable:
       
   773                             // This is what we want.
       
   774                             // Trying to pause the event here would actually
       
   775                             // introduce a race condition between this loop and
       
   776                             // request(n).
       
   777                             debugState("leaving read() loop with no demand");
       
   778                             break;
       
   779                         }
       
   780                     }
       
   781                 } catch (Throwable t) {
       
   782                     debug.log(Level.DEBUG, "Unexpected exception in read loop", t);
       
   783                     signalError(t);
       
   784                 } finally {
       
   785                     handlePending();
       
   786                 }
       
   787             }
       
   788 
       
   789             boolean handlePending() {
       
   790                 ReadSubscription pending = pendingSubscription.getAndSet(null);
       
   791                 if (pending == null) return false;
       
   792                 debug.log(Level.DEBUG, "handling pending subscription for %s",
       
   793                           pending.subscriber);
       
   794                 ReadSubscription current = subscription;
       
   795                 if (current != null && current != pending && !completed) {
       
   796                     current.subscriber.dropSubscription();
       
   797                 }
       
   798                 debug.log(Level.DEBUG, "read demand reset to 0");
       
   799                 subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to.
       
   800                 pending.errorRef.compareAndSet(null, errorRef.get());
       
   801                 if (!readScheduler.isStopped()) {
       
   802                     subscription = pending;
       
   803                 } else {
       
   804                     debug.log(Level.DEBUG, "socket tube is already stopped");
       
   805                 }
       
   806                 debug.log(Level.DEBUG, "calling onSubscribe");
       
   807                 pending.signalOnSubscribe();
       
   808                 if (completed) {
       
   809                     pending.errorRef.compareAndSet(null, errorRef.get());
       
   810                     pending.signalCompletion();
       
   811                 }
       
   812                 return true;
       
   813             }
       
   814         }
       
   815 
       
   816 
       
   817         // A repeatable ReadEvent which is paused after firing and can
       
   818         // be resumed if required - see SocketFlowEvent;
       
   819         final class ReadEvent extends SocketFlowEvent {
       
   820             final InternalReadSubscription sub;
       
   821             ReadEvent(SocketChannel channel, InternalReadSubscription sub) {
       
   822                 super(SelectionKey.OP_READ, channel);
       
   823                 this.sub = sub;
       
   824             }
       
   825             @Override
       
   826             protected final void signalEvent() {
       
   827                 try {
       
   828                     client.eventUpdated(this);
       
   829                     sub.signalReadable();
       
   830                 } catch(Throwable t) {
       
   831                     sub.signalError(t);
       
   832                 }
       
   833             }
       
   834 
       
   835             @Override
       
   836             protected final void signalError(Throwable error) {
       
   837                 sub.signalError(error);
       
   838             }
       
   839 
       
   840             @Override
       
   841             System.Logger debug() {
       
   842                 return debug;
       
   843             }
       
   844         }
       
   845 
       
   846     }
       
   847 
       
   848     // ===================================================================== //
       
   849     //                   Socket Channel Read/Write                           //
       
   850     // ===================================================================== //
       
   851     static final int MAX_BUFFERS = 3;
       
   852     static final List<ByteBuffer> EOF = List.of();
       
   853 
       
   854     private List<ByteBuffer> readAvailable() throws IOException {
       
   855         ByteBuffer buf = buffersSource.get();
       
   856         assert buf.hasRemaining();
       
   857 
       
   858         int read;
       
   859         int pos = buf.position();
       
   860         List<ByteBuffer> list = null;
       
   861         while (buf.hasRemaining()) {
       
   862             while ((read = channel.read(buf)) > 0) {
       
   863                if (!buf.hasRemaining()) break;
       
   864             }
       
   865 
       
   866             // nothing read;
       
   867             if (buf.position() == pos) {
       
   868                 // An empty list signal the end of data, and should only be
       
   869                 // returned if read == -1.
       
   870                 // If we already read some data, then we must return what we have
       
   871                 // read, and -1 will be returned next time the caller attempts to
       
   872                 // read something.
       
   873                 if (list == null && read == -1) {  // eof
       
   874                     list = EOF;
       
   875                     break;
       
   876                 }
       
   877             }
       
   878             buf.limit(buf.position());
       
   879             buf.position(pos);
       
   880             if (list == null) {
       
   881                 list = List.of(buf);
       
   882             } else {
       
   883                 if (!(list instanceof ArrayList)) {
       
   884                     list = new ArrayList<>(list);
       
   885                 }
       
   886                 list.add(buf);
       
   887             }
       
   888             if (read <= 0 || list.size() == MAX_BUFFERS) break;
       
   889             buf = buffersSource.get();
       
   890             pos = buf.position();
       
   891             assert buf.hasRemaining();
       
   892         }
       
   893         return list;
       
   894     }
       
   895 
       
   896     private long writeAvailable(List<ByteBuffer> bytes) throws IOException {
       
   897         ByteBuffer[] srcs = bytes.toArray(Utils.EMPTY_BB_ARRAY);
       
   898         final long remaining = Utils.remaining(srcs);
       
   899         long written = 0;
       
   900         while (remaining > written) {
       
   901             long w = channel.write(srcs);
       
   902             if (w == -1 && written == 0) return -1;
       
   903             if (w == 0) break;
       
   904             written += w;
       
   905         }
       
   906         return written;
       
   907     }
       
   908 
       
   909     private void resumeEvent(SocketFlowEvent event,
       
   910                              Consumer<Throwable> errorSignaler) {
       
   911         boolean registrationRequired;
       
   912         synchronized(lock) {
       
   913             registrationRequired = !event.registered();
       
   914             event.resume();
       
   915         }
       
   916         try {
       
   917             if (registrationRequired) {
       
   918                 client.registerEvent(event);
       
   919              } else {
       
   920                 client.eventUpdated(event);
       
   921             }
       
   922         } catch(Throwable t) {
       
   923             errorSignaler.accept(t);
       
   924         }
       
   925    }
       
   926 
       
   927     private void pauseEvent(SocketFlowEvent event,
       
   928                             Consumer<Throwable> errorSignaler) {
       
   929         synchronized(lock) {
       
   930             event.pause();
       
   931         }
       
   932         try {
       
   933             client.eventUpdated(event);
       
   934         } catch(Throwable t) {
       
   935             errorSignaler.accept(t);
       
   936         }
       
   937     }
       
   938 
       
   939     @Override
       
   940     public void connectFlows(TubePublisher writePublisher,
       
   941                              TubeSubscriber readSubscriber) {
       
   942         debug.log(Level.DEBUG, "connecting flows");
       
   943         this.subscribe(readSubscriber);
       
   944         writePublisher.subscribe(this);
       
   945     }
       
   946 
       
   947 
       
   948     @Override
       
   949     public String toString() {
       
   950         return dbgString();
       
   951     }
       
   952 
       
   953     final String dbgString() {
       
   954         return "SocketTube("+id+")";
       
   955     }
       
   956 }