src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
changeset 49765 ee6f7a61f3a5
parent 48083 b1c1b4ef4be2
child 49944 4690a2871b44
child 56451 9585061fdb04
equal deleted inserted replaced
49707:f7fd051519ac 49765:ee6f7a61f3a5
       
     1 /*
       
     2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.internal.net.http;
       
    27 
       
    28 import java.io.IOException;
       
    29 import java.lang.System.Logger.Level;
       
    30 import java.nio.ByteBuffer;
       
    31 import java.util.List;
       
    32 import java.util.Objects;
       
    33 import java.util.concurrent.Flow;
       
    34 import java.util.concurrent.atomic.AtomicLong;
       
    35 import java.util.concurrent.atomic.AtomicReference;
       
    36 import java.nio.channels.SelectableChannel;
       
    37 import java.nio.channels.SelectionKey;
       
    38 import java.nio.channels.SocketChannel;
       
    39 import java.util.ArrayList;
       
    40 import java.util.function.Consumer;
       
    41 import java.util.function.Supplier;
       
    42 import jdk.internal.net.http.common.Demand;
       
    43 import jdk.internal.net.http.common.FlowTube;
       
    44 import jdk.internal.net.http.common.Logger;
       
    45 import jdk.internal.net.http.common.SequentialScheduler;
       
    46 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
       
    47 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask;
       
    48 import jdk.internal.net.http.common.Utils;
       
    49 
       
    50 /**
       
    51  * A SocketTube is a terminal tube plugged directly into the socket.
       
    52  * The read subscriber should call {@code subscribe} on the SocketTube before
       
    53  * the SocketTube is subscribed to the write publisher.
       
    54  */
       
    55 final class SocketTube implements FlowTube {
       
    56 
       
    57     final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
       
    58     static final AtomicLong IDS = new AtomicLong();
       
    59 
       
    60     private final HttpClientImpl client;
       
    61     private final SocketChannel channel;
       
    62     private final Supplier<ByteBuffer> buffersSource;
       
    63     private final Object lock = new Object();
       
    64     private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
       
    65     private final InternalReadPublisher readPublisher;
       
    66     private final InternalWriteSubscriber writeSubscriber;
       
    67     private final long id = IDS.incrementAndGet();
       
    68 
       
    69     public SocketTube(HttpClientImpl client, SocketChannel channel,
       
    70                       Supplier<ByteBuffer> buffersSource) {
       
    71         this.client = client;
       
    72         this.channel = channel;
       
    73         this.buffersSource = buffersSource;
       
    74         this.readPublisher = new InternalReadPublisher();
       
    75         this.writeSubscriber = new InternalWriteSubscriber();
       
    76     }
       
    77 
       
    78     /**
       
    79      * Returns {@code true} if this flow is finished.
       
    80      * This happens when this flow internal read subscription is completed,
       
    81      * either normally (EOF reading) or exceptionally  (EOF writing, or
       
    82      * underlying socket closed, or some exception occurred while reading or
       
    83      * writing to the socket).
       
    84      *
       
    85      * @return {@code true} if this flow is finished.
       
    86      */
       
    87     public boolean isFinished() {
       
    88         InternalReadPublisher.InternalReadSubscription subscription =
       
    89                 readPublisher.subscriptionImpl;
       
    90         return subscription != null && subscription.completed
       
    91                 || subscription == null && errorRef.get() != null;
       
    92     }
       
    93 
       
    94     // ===================================================================== //
       
    95     //                       Flow.Publisher                                  //
       
    96     // ======================================================================//
       
    97 
       
    98     /**
       
    99      * {@inheritDoc }
       
   100      * @apiNote This method should be called first. In particular, the caller
       
   101      *          must ensure that this method must be called by the read
       
   102      *          subscriber before the write publisher can call {@code onSubscribe}.
       
   103      *          Failure to adhere to this contract may result in assertion errors.
       
   104      */
       
   105     @Override
       
   106     public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
       
   107         Objects.requireNonNull(s);
       
   108         assert s instanceof TubeSubscriber : "Expected TubeSubscriber, got:" + s;
       
   109         readPublisher.subscribe(s);
       
   110     }
       
   111 
       
   112 
       
   113     // ===================================================================== //
       
   114     //                       Flow.Subscriber                                 //
       
   115     // ======================================================================//
       
   116 
       
   117     /**
       
   118      * {@inheritDoc }
       
   119      * @apiNote The caller must ensure that {@code subscribe} is called by
       
   120      *          the read subscriber before {@code onSubscribe} is called by
       
   121      *          the write publisher.
       
   122      *          Failure to adhere to this contract may result in assertion errors.
       
   123      */
       
   124     @Override
       
   125     public void onSubscribe(Flow.Subscription subscription) {
       
   126         writeSubscriber.onSubscribe(subscription);
       
   127     }
       
   128 
       
   129     @Override
       
   130     public void onNext(List<ByteBuffer> item) {
       
   131         writeSubscriber.onNext(item);
       
   132     }
       
   133 
       
   134     @Override
       
   135     public void onError(Throwable throwable) {
       
   136         writeSubscriber.onError(throwable);
       
   137     }
       
   138 
       
   139     @Override
       
   140     public void onComplete() {
       
   141         writeSubscriber.onComplete();
       
   142     }
       
   143 
       
   144     // ===================================================================== //
       
   145     //                           Events                                      //
       
   146     // ======================================================================//
       
   147 
       
   148     void signalClosed() {
       
   149         // Ensures that the subscriber will be terminated and that future
       
   150         // subscribers will be notified when the connection is closed.
       
   151         readPublisher.subscriptionImpl.signalError(
       
   152                 new IOException("connection closed locally"));
       
   153     }
       
   154 
       
   155     /**
       
   156      * A restartable task used to process tasks in sequence.
       
   157      */
       
   158     private static class SocketFlowTask implements RestartableTask {
       
   159         final Runnable task;
       
   160         private final Object monitor = new Object();
       
   161         SocketFlowTask(Runnable task) {
       
   162             this.task = task;
       
   163         }
       
   164         @Override
       
   165         public final void run(DeferredCompleter taskCompleter) {
       
   166             try {
       
   167                 // non contentious synchronized for visibility.
       
   168                 synchronized(monitor) {
       
   169                     task.run();
       
   170                 }
       
   171             } finally {
       
   172                 taskCompleter.complete();
       
   173             }
       
   174         }
       
   175     }
       
   176 
       
   177     // This is best effort - there's no guarantee that the printed set of values
       
   178     // is consistent. It should only be considered as weakly accurate - in
       
   179     // particular in what concerns the events states, especially when displaying
       
   180     // a read event state from a write event callback and conversely.
       
   181     void debugState(String when) {
       
   182         if (debug.on()) {
       
   183             StringBuilder state = new StringBuilder();
       
   184 
       
   185             InternalReadPublisher.InternalReadSubscription sub =
       
   186                     readPublisher.subscriptionImpl;
       
   187             InternalReadPublisher.ReadEvent readEvent =
       
   188                     sub == null ? null : sub.readEvent;
       
   189             Demand rdemand = sub == null ? null : sub.demand;
       
   190             InternalWriteSubscriber.WriteEvent writeEvent =
       
   191                     writeSubscriber.writeEvent;
       
   192             Demand wdemand = writeSubscriber.writeDemand;
       
   193             int rops = readEvent == null ? 0 : readEvent.interestOps();
       
   194             long rd = rdemand == null ? 0 : rdemand.get();
       
   195             int wops = writeEvent == null ? 0 : writeEvent.interestOps();
       
   196             long wd = wdemand == null ? 0 : wdemand.get();
       
   197 
       
   198             state.append(when).append(" Reading: [ops=")
       
   199                     .append(rops).append(", demand=").append(rd)
       
   200                     .append(", stopped=")
       
   201                     .append((sub == null ? false : sub.readScheduler.isStopped()))
       
   202                     .append("], Writing: [ops=").append(wops)
       
   203                     .append(", demand=").append(wd)
       
   204                     .append("]");
       
   205             debug.log(state.toString());
       
   206         }
       
   207     }
       
   208 
       
   209     /**
       
   210      * A repeatable event that can be paused or resumed by changing its
       
   211      * interestOps. When the event is fired, it is first paused before being
       
   212      * signaled. It is the responsibility of the code triggered by
       
   213      * {@code signalEvent} to resume the event if required.
       
   214      */
       
   215     private static abstract class SocketFlowEvent extends AsyncEvent {
       
   216         final SocketChannel channel;
       
   217         final int defaultInterest;
       
   218         volatile int interestOps;
       
   219         volatile boolean registered;
       
   220         SocketFlowEvent(int defaultInterest, SocketChannel channel) {
       
   221             super(AsyncEvent.REPEATING);
       
   222             this.defaultInterest = defaultInterest;
       
   223             this.channel = channel;
       
   224         }
       
   225         final boolean registered() {return registered;}
       
   226         final void resume() {
       
   227             interestOps = defaultInterest;
       
   228             registered = true;
       
   229         }
       
   230         final void pause() {interestOps = 0;}
       
   231         @Override
       
   232         public final SelectableChannel channel() {return channel;}
       
   233         @Override
       
   234         public final int interestOps() {return interestOps;}
       
   235 
       
   236         @Override
       
   237         public final void handle() {
       
   238             pause();       // pause, then signal
       
   239             signalEvent(); // won't be fired again until resumed.
       
   240         }
       
   241         @Override
       
   242         public final void abort(IOException error) {
       
   243             debug().log(() -> "abort: " + error);
       
   244             pause();              // pause, then signal
       
   245             signalError(error);   // should not be resumed after abort (not checked)
       
   246         }
       
   247 
       
   248         protected abstract void signalEvent();
       
   249         protected abstract void signalError(Throwable error);
       
   250         abstract Logger debug();
       
   251     }
       
   252 
       
   253     // ===================================================================== //
       
   254     //                              Writing                                  //
       
   255     // ======================================================================//
       
   256 
       
   257     // This class makes the assumption that the publisher will call onNext
       
   258     // sequentially, and that onNext won't be called if the demand has not been
       
   259     // incremented by request(1).
       
   260     // It has a 'queue of 1' meaning that it will call request(1) in
       
   261     // onSubscribe, and then only after its 'current' buffer list has been
       
   262     // fully written and current set to null;
       
   263     private final class InternalWriteSubscriber
       
   264             implements Flow.Subscriber<List<ByteBuffer>> {
       
   265 
       
   266         volatile WriteSubscription subscription;
       
   267         volatile List<ByteBuffer> current;
       
   268         volatile boolean completed;
       
   269         final AsyncTriggerEvent startSubscription =
       
   270                 new AsyncTriggerEvent(this::signalError, this::startSubscription);
       
   271         final WriteEvent writeEvent = new WriteEvent(channel, this);
       
   272         final Demand writeDemand = new Demand();
       
   273 
       
   274         @Override
       
   275         public void onSubscribe(Flow.Subscription subscription) {
       
   276             WriteSubscription previous = this.subscription;
       
   277             if (debug.on()) debug.log("subscribed for writing");
       
   278             try {
       
   279                 boolean needEvent = current == null;
       
   280                 if (needEvent) {
       
   281                     if (previous != null && previous.upstreamSubscription != subscription) {
       
   282                         previous.dropSubscription();
       
   283                     }
       
   284                 }
       
   285                 this.subscription = new WriteSubscription(subscription);
       
   286                 if (needEvent) {
       
   287                     if (debug.on())
       
   288                         debug.log("write: registering startSubscription event");
       
   289                     client.registerEvent(startSubscription);
       
   290                 }
       
   291             } catch (Throwable t) {
       
   292                 signalError(t);
       
   293             }
       
   294         }
       
   295 
       
   296         @Override
       
   297         public void onNext(List<ByteBuffer> bufs) {
       
   298             assert current == null : dbgString() // this is a queue of 1.
       
   299                     + "w.onNext current: " + current;
       
   300             assert subscription != null : dbgString()
       
   301                     + "w.onNext: subscription is null";
       
   302             current = bufs;
       
   303             tryFlushCurrent(client.isSelectorThread()); // may be in selector thread
       
   304             // For instance in HTTP/2, a received SETTINGS frame might trigger
       
   305             // the sending of a SETTINGS frame in turn which might cause
       
   306             // onNext to be called from within the same selector thread that the
       
   307             // original SETTINGS frames arrived on. If rs is the read-subscriber
       
   308             // and ws is the write-subscriber then the following can occur:
       
   309             // ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write
       
   310             // client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent
       
   311             debugState("leaving w.onNext");
       
   312         }
       
   313 
       
   314         // Don't use a SequentialScheduler here: rely on onNext() being invoked
       
   315         // sequentially, and not being invoked if there is no demand, request(1).
       
   316         // onNext is usually called from within a user / executor thread.
       
   317         // Initial writing will be performed in that thread. If for some reason,
       
   318         // not all the data can be written, a writeEvent will be registered, and
       
   319         // writing will resume in the the selector manager thread when the
       
   320         // writeEvent is fired.
       
   321         //
       
   322         // If this method is invoked in the selector manager thread (because of
       
   323         // a writeEvent), then the executor will be used to invoke request(1),
       
   324         // ensuring that onNext() won't be invoked from within the selector
       
   325         // thread. If not in the selector manager thread, then request(1) is
       
   326         // invoked directly.
       
   327         void tryFlushCurrent(boolean inSelectorThread) {
       
   328             List<ByteBuffer> bufs = current;
       
   329             if (bufs == null) return;
       
   330             try {
       
   331                 assert inSelectorThread == client.isSelectorThread() :
       
   332                        "should " + (inSelectorThread ? "" : "not ")
       
   333                         + " be in the selector thread";
       
   334                 long remaining = Utils.remaining(bufs);
       
   335                 if (debug.on()) debug.log("trying to write: %d", remaining);
       
   336                 long written = writeAvailable(bufs);
       
   337                 if (debug.on()) debug.log("wrote: %d", written);
       
   338                 assert written >= 0 : "negative number of bytes written:" + written;
       
   339                 assert written <= remaining;
       
   340                 if (remaining - written == 0) {
       
   341                     current = null;
       
   342                     if (writeDemand.tryDecrement()) {
       
   343                         Runnable requestMore = this::requestMore;
       
   344                         if (inSelectorThread) {
       
   345                             assert client.isSelectorThread();
       
   346                             client.theExecutor().execute(requestMore);
       
   347                         } else {
       
   348                             assert !client.isSelectorThread();
       
   349                             requestMore.run();
       
   350                         }
       
   351                     }
       
   352                 } else {
       
   353                     resumeWriteEvent(inSelectorThread);
       
   354                 }
       
   355             } catch (Throwable t) {
       
   356                 signalError(t);
       
   357                 subscription.cancel();
       
   358             }
       
   359         }
       
   360 
       
   361         // Kick off the initial request:1 that will start the writing side.
       
   362         // Invoked in the selector manager thread.
       
   363         void startSubscription() {
       
   364             try {
       
   365                 if (debug.on()) debug.log("write: starting subscription");
       
   366                 assert client.isSelectorThread();
       
   367                 // make sure read registrations are handled before;
       
   368                 readPublisher.subscriptionImpl.handlePending();
       
   369                 if (debug.on()) debug.log("write: offloading requestMore");
       
   370                 // start writing;
       
   371                 client.theExecutor().execute(this::requestMore);
       
   372             } catch(Throwable t) {
       
   373                 signalError(t);
       
   374             }
       
   375         }
       
   376 
       
   377         void requestMore() {
       
   378            WriteSubscription subscription = this.subscription;
       
   379            subscription.requestMore();
       
   380         }
       
   381 
       
   382         @Override
       
   383         public void onError(Throwable throwable) {
       
   384             signalError(throwable);
       
   385         }
       
   386 
       
   387         @Override
       
   388         public void onComplete() {
       
   389             completed = true;
       
   390             // no need to pause the write event here: the write event will
       
   391             // be paused if there is nothing more to write.
       
   392             List<ByteBuffer> bufs = current;
       
   393             long remaining = bufs == null ? 0 : Utils.remaining(bufs);
       
   394             if (debug.on())
       
   395                 debug.log( "write completed, %d yet to send", remaining);
       
   396             debugState("InternalWriteSubscriber::onComplete");
       
   397         }
       
   398 
       
   399         void resumeWriteEvent(boolean inSelectorThread) {
       
   400             if (debug.on()) debug.log("scheduling write event");
       
   401             resumeEvent(writeEvent, this::signalError);
       
   402         }
       
   403 
       
   404         void signalWritable() {
       
   405             if (debug.on()) debug.log("channel is writable");
       
   406             tryFlushCurrent(true);
       
   407         }
       
   408 
       
   409         void signalError(Throwable error) {
       
   410             debug.log(() -> "write error: " + error);
       
   411             completed = true;
       
   412             readPublisher.signalError(error);
       
   413         }
       
   414 
       
   415         // A repeatable WriteEvent which is paused after firing and can
       
   416         // be resumed if required - see SocketFlowEvent;
       
   417         final class WriteEvent extends SocketFlowEvent {
       
   418             final InternalWriteSubscriber sub;
       
   419             WriteEvent(SocketChannel channel, InternalWriteSubscriber sub) {
       
   420                 super(SelectionKey.OP_WRITE, channel);
       
   421                 this.sub = sub;
       
   422             }
       
   423             @Override
       
   424             protected final void signalEvent() {
       
   425                 try {
       
   426                     client.eventUpdated(this);
       
   427                     sub.signalWritable();
       
   428                 } catch(Throwable t) {
       
   429                     sub.signalError(t);
       
   430                 }
       
   431             }
       
   432 
       
   433             @Override
       
   434             protected void signalError(Throwable error) {
       
   435                 sub.signalError(error);
       
   436             }
       
   437 
       
   438             @Override
       
   439             Logger debug() { return debug; }
       
   440         }
       
   441 
       
   442         final class WriteSubscription implements Flow.Subscription {
       
   443             final Flow.Subscription upstreamSubscription;
       
   444             volatile boolean cancelled;
       
   445             WriteSubscription(Flow.Subscription subscription) {
       
   446                 this.upstreamSubscription = subscription;
       
   447             }
       
   448 
       
   449             @Override
       
   450             public void request(long n) {
       
   451                 if (cancelled) return;
       
   452                 upstreamSubscription.request(n);
       
   453             }
       
   454 
       
   455             @Override
       
   456             public void cancel() {
       
   457                 dropSubscription();
       
   458                 upstreamSubscription.cancel();
       
   459             }
       
   460 
       
   461             void dropSubscription() {
       
   462                 synchronized (InternalWriteSubscriber.this) {
       
   463                     cancelled = true;
       
   464                     if (debug.on()) debug.log("write: resetting demand to 0");
       
   465                     writeDemand.reset();
       
   466                 }
       
   467             }
       
   468 
       
   469             void requestMore() {
       
   470                 try {
       
   471                     if (completed || cancelled) return;
       
   472                     boolean requestMore;
       
   473                     long d;
       
   474                     // don't fiddle with demand after cancel.
       
   475                     // see dropSubscription.
       
   476                     synchronized (InternalWriteSubscriber.this) {
       
   477                         if (cancelled) return;
       
   478                         d = writeDemand.get();
       
   479                         requestMore = writeDemand.increaseIfFulfilled();
       
   480                     }
       
   481                     if (requestMore) {
       
   482                         if (debug.on()) debug.log("write: requesting more...");
       
   483                         upstreamSubscription.request(1);
       
   484                     } else {
       
   485                         if (debug.on())
       
   486                             debug.log("write: no need to request more: %d", d);
       
   487                     }
       
   488                 } catch (Throwable t) {
       
   489                     if (debug.on())
       
   490                         debug.log("write: error while requesting more: " + t);
       
   491                     cancelled = true;
       
   492                     signalError(t);
       
   493                     subscription.cancel();
       
   494                 } finally {
       
   495                     debugState("leaving requestMore: ");
       
   496                 }
       
   497             }
       
   498         }
       
   499     }
       
   500 
       
   501     // ===================================================================== //
       
   502     //                              Reading                                  //
       
   503     // ===================================================================== //
       
   504 
       
   505     // The InternalReadPublisher uses a SequentialScheduler to ensure that
       
   506     // onNext/onError/onComplete are called sequentially on the caller's
       
   507     // subscriber.
       
   508     // However, it relies on the fact that the only time where
       
   509     // runOrSchedule() is called from a user/executor thread is in signalError,
       
   510     // right after the errorRef has been set.
       
   511     // Because the sequential scheduler's task always checks for errors first,
       
   512     // and always terminate the scheduler on error, then it is safe to assume
       
   513     // that if it reaches the point where it reads from the channel, then
       
   514     // it is running in the SelectorManager thread. This is because all
       
   515     // other invocation of runOrSchedule() are triggered from within a
       
   516     // ReadEvent.
       
   517     //
       
   518     // When pausing/resuming the event, some shortcuts can then be taken
       
   519     // when we know we're running in the selector manager thread
       
   520     // (in that case there's no need to call client.eventUpdated(readEvent);
       
   521     //
       
   522     private final class InternalReadPublisher
       
   523             implements Flow.Publisher<List<ByteBuffer>> {
       
   524         private final InternalReadSubscription subscriptionImpl
       
   525                 = new InternalReadSubscription();
       
   526         AtomicReference<ReadSubscription> pendingSubscription = new AtomicReference<>();
       
   527         private volatile ReadSubscription subscription;
       
   528 
       
   529         @Override
       
   530         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
       
   531             Objects.requireNonNull(s);
       
   532 
       
   533             TubeSubscriber sub = FlowTube.asTubeSubscriber(s);
       
   534             ReadSubscription target = new ReadSubscription(subscriptionImpl, sub);
       
   535             ReadSubscription previous = pendingSubscription.getAndSet(target);
       
   536 
       
   537             if (previous != null && previous != target) {
       
   538                 if (debug.on())
       
   539                     debug.log("read publisher: dropping pending subscriber: "
       
   540                               + previous.subscriber);
       
   541                 previous.errorRef.compareAndSet(null, errorRef.get());
       
   542                 previous.signalOnSubscribe();
       
   543                 if (subscriptionImpl.completed) {
       
   544                     previous.signalCompletion();
       
   545                 } else {
       
   546                     previous.subscriber.dropSubscription();
       
   547                 }
       
   548             }
       
   549 
       
   550             if (debug.on()) debug.log("read publisher got subscriber");
       
   551             subscriptionImpl.signalSubscribe();
       
   552             debugState("leaving read.subscribe: ");
       
   553         }
       
   554 
       
   555         void signalError(Throwable error) {
       
   556             if (debug.on()) debug.log("error signalled " + error);
       
   557             if (!errorRef.compareAndSet(null, error)) {
       
   558                 return;
       
   559             }
       
   560             subscriptionImpl.handleError();
       
   561         }
       
   562 
       
   563         final class ReadSubscription implements Flow.Subscription {
       
   564             final InternalReadSubscription impl;
       
   565             final TubeSubscriber  subscriber;
       
   566             final AtomicReference<Throwable> errorRef = new AtomicReference<>();
       
   567             volatile boolean subscribed;
       
   568             volatile boolean cancelled;
       
   569             volatile boolean completed;
       
   570 
       
   571             public ReadSubscription(InternalReadSubscription impl,
       
   572                                     TubeSubscriber subscriber) {
       
   573                 this.impl = impl;
       
   574                 this.subscriber = subscriber;
       
   575             }
       
   576 
       
   577             @Override
       
   578             public void cancel() {
       
   579                 cancelled = true;
       
   580             }
       
   581 
       
   582             @Override
       
   583             public void request(long n) {
       
   584                 if (!cancelled) {
       
   585                     impl.request(n);
       
   586                 } else {
       
   587                     if (debug.on())
       
   588                         debug.log("subscription cancelled, ignoring request %d", n);
       
   589                 }
       
   590             }
       
   591 
       
   592             void signalCompletion() {
       
   593                 assert subscribed || cancelled;
       
   594                 if (completed || cancelled) return;
       
   595                 synchronized (this) {
       
   596                     if (completed) return;
       
   597                     completed = true;
       
   598                 }
       
   599                 Throwable error = errorRef.get();
       
   600                 if (error != null) {
       
   601                     if (debug.on())
       
   602                         debug.log("forwarding error to subscriber: " + error);
       
   603                     subscriber.onError(error);
       
   604                 } else {
       
   605                     if (debug.on()) debug.log("completing subscriber");
       
   606                     subscriber.onComplete();
       
   607                 }
       
   608             }
       
   609 
       
   610             void signalOnSubscribe() {
       
   611                 if (subscribed || cancelled) return;
       
   612                 synchronized (this) {
       
   613                     if (subscribed || cancelled) return;
       
   614                     subscribed = true;
       
   615                 }
       
   616                 subscriber.onSubscribe(this);
       
   617                 if (debug.on()) debug.log("onSubscribe called");
       
   618                 if (errorRef.get() != null) {
       
   619                     signalCompletion();
       
   620                 }
       
   621             }
       
   622         }
       
   623 
       
   624         final class InternalReadSubscription implements Flow.Subscription {
       
   625 
       
   626             private final Demand demand = new Demand();
       
   627             final SequentialScheduler readScheduler;
       
   628             private volatile boolean completed;
       
   629             private final ReadEvent readEvent;
       
   630             private final AsyncEvent subscribeEvent;
       
   631 
       
   632             InternalReadSubscription() {
       
   633                 readScheduler = new SequentialScheduler(new SocketFlowTask(this::read));
       
   634                 subscribeEvent = new AsyncTriggerEvent(this::signalError,
       
   635                                                        this::handleSubscribeEvent);
       
   636                 readEvent = new ReadEvent(channel, this);
       
   637             }
       
   638 
       
   639             /*
       
   640              * This method must be invoked before any other method of this class.
       
   641              */
       
   642             final void signalSubscribe() {
       
   643                 if (readScheduler.isStopped() || completed) {
       
   644                     // if already completed or stopped we can handle any
       
   645                     // pending connection directly from here.
       
   646                     if (debug.on())
       
   647                         debug.log("handling pending subscription while completed");
       
   648                     handlePending();
       
   649                 } else {
       
   650                     try {
       
   651                         if (debug.on()) debug.log("registering subscribe event");
       
   652                         client.registerEvent(subscribeEvent);
       
   653                     } catch (Throwable t) {
       
   654                         signalError(t);
       
   655                         handlePending();
       
   656                     }
       
   657                 }
       
   658             }
       
   659 
       
   660             final void handleSubscribeEvent() {
       
   661                 assert client.isSelectorThread();
       
   662                 debug.log("subscribe event raised");
       
   663                 readScheduler.runOrSchedule();
       
   664                 if (readScheduler.isStopped() || completed) {
       
   665                     // if already completed or stopped we can handle any
       
   666                     // pending connection directly from here.
       
   667                     if (debug.on())
       
   668                         debug.log("handling pending subscription when completed");
       
   669                     handlePending();
       
   670                 }
       
   671             }
       
   672 
       
   673 
       
   674             /*
       
   675              * Although this method is thread-safe, the Reactive-Streams spec seems
       
   676              * to not require it to be as such. It's a responsibility of the
       
   677              * subscriber to signal demand in a thread-safe manner.
       
   678              *
       
   679              * See Reactive Streams specification, rules 2.7 and 3.4.
       
   680              */
       
   681             @Override
       
   682             public final void request(long n) {
       
   683                 if (n > 0L) {
       
   684                     boolean wasFulfilled = demand.increase(n);
       
   685                     if (wasFulfilled) {
       
   686                         if (debug.on()) debug.log("got some demand for reading");
       
   687                         resumeReadEvent();
       
   688                         // if demand has been changed from fulfilled
       
   689                         // to unfulfilled register read event;
       
   690                     }
       
   691                 } else {
       
   692                     signalError(new IllegalArgumentException("non-positive request"));
       
   693                 }
       
   694                 debugState("leaving request("+n+"): ");
       
   695             }
       
   696 
       
   697             @Override
       
   698             public final void cancel() {
       
   699                 pauseReadEvent();
       
   700                 readScheduler.stop();
       
   701             }
       
   702 
       
   703             private void resumeReadEvent() {
       
   704                 if (debug.on()) debug.log("resuming read event");
       
   705                 resumeEvent(readEvent, this::signalError);
       
   706             }
       
   707 
       
   708             private void pauseReadEvent() {
       
   709                 if (debug.on()) debug.log("pausing read event");
       
   710                 pauseEvent(readEvent, this::signalError);
       
   711             }
       
   712 
       
   713 
       
   714             final void handleError() {
       
   715                 assert errorRef.get() != null;
       
   716                 readScheduler.runOrSchedule();
       
   717             }
       
   718 
       
   719             final void signalError(Throwable error) {
       
   720                 if (!errorRef.compareAndSet(null, error)) {
       
   721                     return;
       
   722                 }
       
   723                 if (debug.on()) debug.log("got read error: " + error);
       
   724                 readScheduler.runOrSchedule();
       
   725             }
       
   726 
       
   727             final void signalReadable() {
       
   728                 readScheduler.runOrSchedule();
       
   729             }
       
   730 
       
   731             /** The body of the task that runs in SequentialScheduler. */
       
   732             final void read() {
       
   733                 // It is important to only call pauseReadEvent() when stopping
       
   734                 // the scheduler. The event is automatically paused before
       
   735                 // firing, and trying to pause it again could cause a race
       
   736                 // condition between this loop, which calls tryDecrementDemand(),
       
   737                 // and the thread that calls request(n), which will try to resume
       
   738                 // reading.
       
   739                 try {
       
   740                     while(!readScheduler.isStopped()) {
       
   741                         if (completed) return;
       
   742 
       
   743                         // make sure we have a subscriber
       
   744                         if (handlePending()) {
       
   745                             if (debug.on())
       
   746                                 debug.log("pending subscriber subscribed");
       
   747                             return;
       
   748                         }
       
   749 
       
   750                         // If an error was signaled, we might not be in the
       
   751                         // the selector thread, and that is OK, because we
       
   752                         // will just call onError and return.
       
   753                         ReadSubscription current = subscription;
       
   754                         Throwable error = errorRef.get();
       
   755                         if (current == null)  {
       
   756                             assert error != null;
       
   757                             if (debug.on())
       
   758                                 debug.log("error raised before subscriber subscribed: %s",
       
   759                                           (Object)error);
       
   760                             return;
       
   761                         }
       
   762                         TubeSubscriber subscriber = current.subscriber;
       
   763                         if (error != null) {
       
   764                             completed = true;
       
   765                             // safe to pause here because we're finished anyway.
       
   766                             pauseReadEvent();
       
   767                             if (debug.on())
       
   768                                 debug.log("Sending error " + error
       
   769                                           + " to subscriber " + subscriber);
       
   770                             current.errorRef.compareAndSet(null, error);
       
   771                             current.signalCompletion();
       
   772                             readScheduler.stop();
       
   773                             debugState("leaving read() loop with error: ");
       
   774                             return;
       
   775                         }
       
   776 
       
   777                         // If we reach here then we must be in the selector thread.
       
   778                         assert client.isSelectorThread();
       
   779                         if (demand.tryDecrement()) {
       
   780                             // we have demand.
       
   781                             try {
       
   782                                 List<ByteBuffer> bytes = readAvailable();
       
   783                                 if (bytes == EOF) {
       
   784                                     if (!completed) {
       
   785                                         if (debug.on()) debug.log("got read EOF");
       
   786                                         completed = true;
       
   787                                         // safe to pause here because we're finished
       
   788                                         // anyway.
       
   789                                         pauseReadEvent();
       
   790                                         current.signalCompletion();
       
   791                                         readScheduler.stop();
       
   792                                     }
       
   793                                     debugState("leaving read() loop after EOF: ");
       
   794                                     return;
       
   795                                 } else if (Utils.remaining(bytes) > 0) {
       
   796                                     // the subscriber is responsible for offloading
       
   797                                     // to another thread if needed.
       
   798                                     if (debug.on())
       
   799                                         debug.log("read bytes: " + Utils.remaining(bytes));
       
   800                                     assert !current.completed;
       
   801                                     subscriber.onNext(bytes);
       
   802                                     // we could continue looping until the demand
       
   803                                     // reaches 0. However, that would risk starving
       
   804                                     // other connections (bound to other socket
       
   805                                     // channels) - as other selected keys activated
       
   806                                     // by the selector manager thread might be
       
   807                                     // waiting for this event to terminate.
       
   808                                     // So resume the read event and return now...
       
   809                                     resumeReadEvent();
       
   810                                     debugState("leaving read() loop after onNext: ");
       
   811                                     return;
       
   812                                 } else {
       
   813                                     // nothing available!
       
   814                                     if (debug.on()) debug.log("no more bytes available");
       
   815                                     // re-increment the demand and resume the read
       
   816                                     // event. This ensures that this loop is
       
   817                                     // executed again when the socket becomes
       
   818                                     // readable again.
       
   819                                     demand.increase(1);
       
   820                                     resumeReadEvent();
       
   821                                     debugState("leaving read() loop with no bytes");
       
   822                                     return;
       
   823                                 }
       
   824                             } catch (Throwable x) {
       
   825                                 signalError(x);
       
   826                                 continue;
       
   827                             }
       
   828                         } else {
       
   829                             if (debug.on()) debug.log("no more demand for reading");
       
   830                             // the event is paused just after firing, so it should
       
   831                             // still be paused here, unless the demand was just
       
   832                             // incremented from 0 to n, in which case, the
       
   833                             // event will be resumed, causing this loop to be
       
   834                             // invoked again when the socket becomes readable:
       
   835                             // This is what we want.
       
   836                             // Trying to pause the event here would actually
       
   837                             // introduce a race condition between this loop and
       
   838                             // request(n).
       
   839                             debugState("leaving read() loop with no demand");
       
   840                             break;
       
   841                         }
       
   842                     }
       
   843                 } catch (Throwable t) {
       
   844                     if (debug.on()) debug.log("Unexpected exception in read loop", t);
       
   845                     signalError(t);
       
   846                 } finally {
       
   847                     handlePending();
       
   848                 }
       
   849             }
       
   850 
       
   851             boolean handlePending() {
       
   852                 ReadSubscription pending = pendingSubscription.getAndSet(null);
       
   853                 if (pending == null) return false;
       
   854                 if (debug.on())
       
   855                     debug.log("handling pending subscription for %s",
       
   856                             pending.subscriber);
       
   857                 ReadSubscription current = subscription;
       
   858                 if (current != null && current != pending && !completed) {
       
   859                     current.subscriber.dropSubscription();
       
   860                 }
       
   861                 if (debug.on()) debug.log("read demand reset to 0");
       
   862                 subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to.
       
   863                 pending.errorRef.compareAndSet(null, errorRef.get());
       
   864                 if (!readScheduler.isStopped()) {
       
   865                     subscription = pending;
       
   866                 } else {
       
   867                     if (debug.on()) debug.log("socket tube is already stopped");
       
   868                 }
       
   869                 if (debug.on()) debug.log("calling onSubscribe");
       
   870                 pending.signalOnSubscribe();
       
   871                 if (completed) {
       
   872                     pending.errorRef.compareAndSet(null, errorRef.get());
       
   873                     pending.signalCompletion();
       
   874                 }
       
   875                 return true;
       
   876             }
       
   877         }
       
   878 
       
   879 
       
   880         // A repeatable ReadEvent which is paused after firing and can
       
   881         // be resumed if required - see SocketFlowEvent;
       
   882         final class ReadEvent extends SocketFlowEvent {
       
   883             final InternalReadSubscription sub;
       
   884             ReadEvent(SocketChannel channel, InternalReadSubscription sub) {
       
   885                 super(SelectionKey.OP_READ, channel);
       
   886                 this.sub = sub;
       
   887             }
       
   888             @Override
       
   889             protected final void signalEvent() {
       
   890                 try {
       
   891                     client.eventUpdated(this);
       
   892                     sub.signalReadable();
       
   893                 } catch(Throwable t) {
       
   894                     sub.signalError(t);
       
   895                 }
       
   896             }
       
   897 
       
   898             @Override
       
   899             protected final void signalError(Throwable error) {
       
   900                 sub.signalError(error);
       
   901             }
       
   902 
       
   903             @Override
       
   904             Logger debug() { return debug; }
       
   905         }
       
   906     }
       
   907 
       
   908     // ===================================================================== //
       
   909     //                   Socket Channel Read/Write                           //
       
   910     // ===================================================================== //
       
   911     static final int MAX_BUFFERS = 3;
       
   912     static final List<ByteBuffer> EOF = List.of();
       
   913     static final List<ByteBuffer> NOTHING = List.of(Utils.EMPTY_BYTEBUFFER);
       
   914 
       
   915     // readAvailable() will read bytes into the 'current' ByteBuffer until
       
   916     // the ByteBuffer is full, or 0 or -1 (EOF) is returned by read().
       
   917     // When that happens, a slice of the data that has been read so far
       
   918     // is inserted into the returned buffer list, and if the current buffer
       
   919     // has remaining space, that space will be used to read more data when
       
   920     // the channel becomes readable again.
       
   921     private volatile ByteBuffer current;
       
   922     private List<ByteBuffer> readAvailable() throws IOException {
       
   923         ByteBuffer buf = current;
       
   924         buf = (buf == null || !buf.hasRemaining())
       
   925                 ? (current = buffersSource.get()) : buf;
       
   926         assert buf.hasRemaining();
       
   927 
       
   928         int read;
       
   929         int pos = buf.position();
       
   930         List<ByteBuffer> list = null;
       
   931         while (buf.hasRemaining()) {
       
   932             try {
       
   933                 while ((read = channel.read(buf)) > 0) {
       
   934                     if (!buf.hasRemaining())
       
   935                         break;
       
   936                 }
       
   937             } catch (IOException x) {
       
   938                 if (buf.position() == pos && list == null) {
       
   939                     // no bytes have been read, just throw...
       
   940                     throw x;
       
   941                 } else {
       
   942                     // some bytes have been read, return them and fail next time
       
   943                     errorRef.compareAndSet(null, x);
       
   944                     read = 0; // ensures outer loop will exit
       
   945                 }
       
   946             }
       
   947 
       
   948             // nothing read;
       
   949             if (buf.position() == pos) {
       
   950                 // An empty list signals the end of data, and should only be
       
   951                 // returned if read == -1. If some data has already been read,
       
   952                 // then it must be returned. -1 will be returned next time
       
   953                 // the caller attempts to read something.
       
   954                 if (list == null) {
       
   955                     // nothing read - list was null - return EOF or NOTHING
       
   956                     list = read == -1 ? EOF : NOTHING;
       
   957                 }
       
   958                 break;
       
   959             }
       
   960 
       
   961             // check whether this buffer has still some free space available.
       
   962             // if so, we will keep it for the next round.
       
   963             final boolean hasRemaining = buf.hasRemaining();
       
   964 
       
   965             // creates a slice to add to the list
       
   966             int limit = buf.limit();
       
   967             buf.limit(buf.position());
       
   968             buf.position(pos);
       
   969             ByteBuffer slice = buf.slice();
       
   970 
       
   971             // restore buffer state to what it was before creating the slice
       
   972             buf.position(buf.limit());
       
   973             buf.limit(limit);
       
   974 
       
   975             // add the buffer to the list
       
   976             list = addToList(list, slice.asReadOnlyBuffer());
       
   977             if (read <= 0 || list.size() == MAX_BUFFERS) {
       
   978                 break;
       
   979             }
       
   980 
       
   981             buf = hasRemaining ? buf : (current = buffersSource.get());
       
   982             pos = buf.position();
       
   983             assert buf.hasRemaining();
       
   984         }
       
   985         return list;
       
   986     }
       
   987 
       
   988     private <T> List<T> addToList(List<T> list, T item) {
       
   989         int size = list == null ? 0 : list.size();
       
   990         switch (size) {
       
   991             case 0: return List.of(item);
       
   992             case 1: return List.of(list.get(0), item);
       
   993             case 2: return List.of(list.get(0), list.get(1), item);
       
   994             default: // slow path if MAX_BUFFERS > 3
       
   995                 ArrayList<T> res = new ArrayList<>(list);
       
   996                 res.add(item);
       
   997                 return res;
       
   998         }
       
   999     }
       
  1000 
       
  1001     private long writeAvailable(List<ByteBuffer> bytes) throws IOException {
       
  1002         ByteBuffer[] srcs = bytes.toArray(Utils.EMPTY_BB_ARRAY);
       
  1003         final long remaining = Utils.remaining(srcs);
       
  1004         long written = 0;
       
  1005         while (remaining > written) {
       
  1006             try {
       
  1007                 long w = channel.write(srcs);
       
  1008                 assert w >= 0 : "negative number of bytes written:" + w;
       
  1009                 if (w == 0) {
       
  1010                     break;
       
  1011                 }
       
  1012                 written += w;
       
  1013             } catch (IOException x) {
       
  1014                 if (written == 0) {
       
  1015                     // no bytes were written just throw
       
  1016                     throw x;
       
  1017                 } else {
       
  1018                     // return how many bytes were written, will fail next time
       
  1019                     break;
       
  1020                 }
       
  1021             }
       
  1022         }
       
  1023         return written;
       
  1024     }
       
  1025 
       
  1026     private void resumeEvent(SocketFlowEvent event,
       
  1027                              Consumer<Throwable> errorSignaler) {
       
  1028         boolean registrationRequired;
       
  1029         synchronized(lock) {
       
  1030             registrationRequired = !event.registered();
       
  1031             event.resume();
       
  1032         }
       
  1033         try {
       
  1034             if (registrationRequired) {
       
  1035                 client.registerEvent(event);
       
  1036              } else {
       
  1037                 client.eventUpdated(event);
       
  1038             }
       
  1039         } catch(Throwable t) {
       
  1040             errorSignaler.accept(t);
       
  1041         }
       
  1042    }
       
  1043 
       
  1044     private void pauseEvent(SocketFlowEvent event,
       
  1045                             Consumer<Throwable> errorSignaler) {
       
  1046         synchronized(lock) {
       
  1047             event.pause();
       
  1048         }
       
  1049         try {
       
  1050             client.eventUpdated(event);
       
  1051         } catch(Throwable t) {
       
  1052             errorSignaler.accept(t);
       
  1053         }
       
  1054     }
       
  1055 
       
  1056     @Override
       
  1057     public void connectFlows(TubePublisher writePublisher,
       
  1058                              TubeSubscriber readSubscriber) {
       
  1059         if (debug.on()) debug.log("connecting flows");
       
  1060         this.subscribe(readSubscriber);
       
  1061         writePublisher.subscribe(this);
       
  1062     }
       
  1063 
       
  1064 
       
  1065     @Override
       
  1066     public String toString() {
       
  1067         return dbgString();
       
  1068     }
       
  1069 
       
  1070     final String dbgString() {
       
  1071         return "SocketTube("+id+")";
       
  1072     }
       
  1073 }