src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
branchhttp-client-branch
changeset 56374 e453d21310bd
parent 56372 98075dcd4ffc
child 56377 eef94a3576a4
equal deleted inserted replaced
56372:98075dcd4ffc 56374:e453d21310bd
    23  * questions.
    23  * questions.
    24  */
    24  */
    25 
    25 
    26 package jdk.internal.net.http;
    26 package jdk.internal.net.http;
    27 
    27 
    28 import java.io.EOFException;
       
    29 import java.io.IOException;
    28 import java.io.IOException;
    30 import java.lang.System.Logger.Level;
    29 import java.lang.System.Logger.Level;
    31 import java.nio.ByteBuffer;
    30 import java.nio.ByteBuffer;
    32 import java.util.List;
    31 import java.util.List;
    33 import java.util.Objects;
    32 import java.util.Objects;
    38 import java.nio.channels.SelectionKey;
    37 import java.nio.channels.SelectionKey;
    39 import java.nio.channels.SocketChannel;
    38 import java.nio.channels.SocketChannel;
    40 import java.util.ArrayList;
    39 import java.util.ArrayList;
    41 import java.util.function.Consumer;
    40 import java.util.function.Consumer;
    42 import java.util.function.Supplier;
    41 import java.util.function.Supplier;
    43 
       
    44 import jdk.internal.net.http.common.Demand;
    42 import jdk.internal.net.http.common.Demand;
    45 import jdk.internal.net.http.common.FlowTube;
    43 import jdk.internal.net.http.common.FlowTube;
    46 import jdk.internal.net.http.common.SequentialScheduler;
    44 import jdk.internal.net.http.common.SequentialScheduler;
    47 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
    45 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
    48 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask;
    46 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask;
    49 import jdk.internal.net.http.common.Utils;
    47 import jdk.internal.net.http.common.Utils;
    50 
    48 
    51 /**
    49 /**
    52  * A SocketTube is a terminal tube plugged directly into the socket.
    50  * A SocketTube is a terminal tube plugged directly into the socket.
    53  * The read subscriber should call {@code subscribe} on the SocketTube before
    51  * The read subscriber should call {@code subscribe} on the SocketTube before
    54  * the SocketTube can be subscribed to the write publisher.
    52  * the SocketTube is subscribed to the write publisher.
    55  */
    53  */
    56 final class SocketTube implements FlowTube {
    54 final class SocketTube implements FlowTube {
    57 
    55 
    58     static final boolean DEBUG = Utils.DEBUG; // revisit: temporary developer's flag
    56     static final boolean DEBUG = Utils.DEBUG; // revisit: temporary developer's flag
    59     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
    57     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
   146     // ===================================================================== //
   144     // ===================================================================== //
   147     //                           Events                                      //
   145     //                           Events                                      //
   148     // ======================================================================//
   146     // ======================================================================//
   149 
   147 
   150     void signalClosed() {
   148     void signalClosed() {
   151         // Ensure that the subscriber will be terminated
   149         // Ensures that the subscriber will be terminated and that future
   152         // and that future subscribers will be notified
   150         // subscribers will be notified when the connection is closed.
   153         // when the connection is closed.
       
   154         readPublisher.subscriptionImpl.signalError(
   151         readPublisher.subscriptionImpl.signalError(
   155                 new IOException("connection closed locally"));
   152                 new IOException("connection closed locally"));
   156     }
   153     }
   157 
   154 
   158     /**
   155     /**
   175                 taskCompleter.complete();
   172                 taskCompleter.complete();
   176             }
   173             }
   177         }
   174         }
   178     }
   175     }
   179 
   176 
   180     // This is best effort - there's no guarantee that the printed set
   177     // This is best effort - there's no guarantee that the printed set of values
   181     // of values is consistent. It should only be considered as
   178     // is consistent. It should only be considered as weakly accurate - in
   182     // weakly accurate - in particular in what concerns the events states,
   179     // particular in what concerns the events states, especially when displaying
   183     // especially when displaying a read event state from a write event
   180     // a read event state from a write event callback and conversely.
   184     // callback and conversely.
       
   185     void debugState(String when) {
   181     void debugState(String when) {
   186         if (debug.isLoggable(Level.DEBUG)) {
   182         if (debug.isLoggable(Level.DEBUG)) {
   187             StringBuilder state = new StringBuilder();
   183             StringBuilder state = new StringBuilder();
   188 
   184 
   189             InternalReadPublisher.InternalReadSubscription sub =
   185             InternalReadPublisher.InternalReadSubscription sub =
   209             debug.log(Level.DEBUG, state.toString());
   205             debug.log(Level.DEBUG, state.toString());
   210         }
   206         }
   211     }
   207     }
   212 
   208 
   213     /**
   209     /**
   214      * A repeatable event that can be paused or resumed by changing
   210      * A repeatable event that can be paused or resumed by changing its
   215      * its interestOps.
   211      * interestOps. When the event is fired, it is first paused before being
   216      * When the event is fired, it is first paused before being signaled.
   212      * signaled. It is the responsibility of the code triggered by
   217      * It is the responsibility of the code triggered by {@code signalEvent}
   213      * {@code signalEvent} to resume the event if required.
   218      * to resume the event if required.
       
   219      */
   214      */
   220     private static abstract class SocketFlowEvent extends AsyncEvent {
   215     private static abstract class SocketFlowEvent extends AsyncEvent {
   221         final SocketChannel channel;
   216         final SocketChannel channel;
   222         final int defaultInterest;
   217         final int defaultInterest;
   223         volatile int interestOps;
   218         volatile int interestOps;
   257 
   252 
   258     // ===================================================================== //
   253     // ===================================================================== //
   259     //                              Writing                                  //
   254     //                              Writing                                  //
   260     // ======================================================================//
   255     // ======================================================================//
   261 
   256 
   262     // This class makes the assumption that the publisher will call
   257     // This class makes the assumption that the publisher will call onNext
   263     // onNext sequentially, and that onNext won't be called if the demand
   258     // sequentially, and that onNext won't be called if the demand has not been
   264     // has not been incremented by request(1).
   259     // incremented by request(1).
   265     // It has a 'queue of 1' meaning that it will call request(1) in
   260     // It has a 'queue of 1' meaning that it will call request(1) in
   266     // onSubscribe, and then only after its 'current' buffer list has been
   261     // onSubscribe, and then only after its 'current' buffer list has been
   267     // fully written and current set to null;
   262     // fully written and current set to null;
   268     private final class InternalWriteSubscriber
   263     private final class InternalWriteSubscriber
   269             implements Flow.Subscriber<List<ByteBuffer>> {
   264             implements Flow.Subscriber<List<ByteBuffer>> {
   310             // ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write
   305             // ReadEvent -> rs.onNext(bytes) -> process server SETTINGS -> write
   311             // client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent
   306             // client SETTINGS -> ws.onNext(bytes) -> tryFlushCurrent
   312             debugState("leaving w.onNext");
   307             debugState("leaving w.onNext");
   313         }
   308         }
   314 
   309 
   315         // we don't use a SequentialScheduler here: we rely on
   310         // Don't use a SequentialScheduler here: rely on onNext() being invoked
   316         // onNext() being called sequentially, and not being called
   311         // sequentially, and not being invoked if there is no demand, request(1).
   317         // if we haven't call request(1)
   312         // onNext is usually called from within a user / executor thread.
   318         // onNext is usually called from within a user/executor thread.
   313         // Initial writing will be performed in that thread. If for some reason,
   319         // we will perform the initial writing in that thread.
   314         // not all the data can be written, a writeEvent will be registered, and
   320         // if for some reason, not all data can be written, the writeEvent
   315         // writing will resume in the the selector manager thread when the
   321         // will be resumed, and the rest of the data will be written from
   316         // writeEvent is fired.
   322         // the selector manager thread when the writeEvent is fired.
   317         //
   323         // If we are in the selector manager thread, then we will use the executor
   318         // If this method is invoked in the selector manager thread (because of
   324         // to call request(1), ensuring that onNext() won't be called from
   319         // a writeEvent), then the executor will be used to invoke request(1),
   325         // within the selector thread.
   320         // ensuring that onNext() won't be invoked from within the selector
   326         // If we are not in the selector manager thread, then we don't care.
   321         // thread. If not in the selector manager thread, then request(1) is
       
   322         // invoked directly.
   327         void tryFlushCurrent(boolean inSelectorThread) {
   323         void tryFlushCurrent(boolean inSelectorThread) {
   328             List<ByteBuffer> bufs = current;
   324             List<ByteBuffer> bufs = current;
   329             if (bufs == null) return;
   325             if (bufs == null) return;
   330             try {
   326             try {
   331                 assert inSelectorThread == client.isSelectorThread() :
   327                 assert inSelectorThread == client.isSelectorThread() :
   333                         + " be in the selector thread";
   329                         + " be in the selector thread";
   334                 long remaining = Utils.remaining(bufs);
   330                 long remaining = Utils.remaining(bufs);
   335                 debug.log(Level.DEBUG, "trying to write: %d", remaining);
   331                 debug.log(Level.DEBUG, "trying to write: %d", remaining);
   336                 long written = writeAvailable(bufs);
   332                 long written = writeAvailable(bufs);
   337                 debug.log(Level.DEBUG, "wrote: %d", written);
   333                 debug.log(Level.DEBUG, "wrote: %d", written);
   338                 if (written == -1) {
   334                 assert written >= 0 : "negative number of bytes written:" + written;
   339                     signalError(new EOFException("EOF reached while writing"));
       
   340                     return;
       
   341                 }
       
   342                 assert written <= remaining;
   335                 assert written <= remaining;
   343                 if (remaining - written == 0) {
   336                 if (remaining - written == 0) {
   344                     current = null;
   337                     current = null;
   345                     if (writeDemand.tryDecrement()) {
   338                     if (writeDemand.tryDecrement()) {
   346                         Runnable requestMore = this::requestMore;
   339                         Runnable requestMore = this::requestMore;
   359                 signalError(t);
   352                 signalError(t);
   360                 subscription.cancel();
   353                 subscription.cancel();
   361             }
   354             }
   362         }
   355         }
   363 
   356 
   364         // Kick off the initial request:1 that will start
   357         // Kick off the initial request:1 that will start the writing side.
   365         // the writing side. Called from the selector manager
   358         // Invoked in the selector manager thread.
   366         // thread.
       
   367         void startSubscription() {
   359         void startSubscription() {
   368             try {
   360             try {
   369                 debug.log(Level.DEBUG, "write: starting subscription");
   361                 debug.log(Level.DEBUG, "write: starting subscription");
   370                 assert client.isSelectorThread();
   362                 assert client.isSelectorThread();
   371                 // make sure read registrations are handled before;
   363                 // make sure read registrations are handled before;
   971         final long remaining = Utils.remaining(srcs);
   963         final long remaining = Utils.remaining(srcs);
   972         long written = 0;
   964         long written = 0;
   973         while (remaining > written) {
   965         while (remaining > written) {
   974             try {
   966             try {
   975                 long w = channel.write(srcs);
   967                 long w = channel.write(srcs);
   976                 if (w == -1 && written == 0) return -1;
   968                 assert w >= 0 : "negative number of bytes written:" + w;
   977                 if (w == 0) break;
   969                 if (w == 0) {
       
   970                     break;
       
   971                 }
   978                 written += w;
   972                 written += w;
   979             } catch (IOException x) {
   973             } catch (IOException x) {
   980                 // if no bytes were written just throws...
   974                 if (written == 0) {
   981                 if (written == 0) throw x;
   975                     // no bytes were written just throw
   982                 // otherwise return how many bytes were
   976                     throw x;
   983                 // written: we will fail next time.
   977                 } else {
   984                 break;
   978                     // return how many bytes were written, will fail next time
       
   979                     break;
       
   980                 }
   985             }
   981             }
   986         }
   982         }
   987         return written;
   983         return written;
   988     }
   984     }
   989 
   985