src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
branchhttp-client-branch
changeset 56768 d6b08065edf5
parent 56756 ba60eaef37d7
child 56770 f1626bc0a010
equal deleted inserted replaced
56767:014ef88f5066 56768:d6b08065edf5
    39 import java.util.function.Consumer;
    39 import java.util.function.Consumer;
    40 import java.util.function.Supplier;
    40 import java.util.function.Supplier;
    41 import jdk.internal.net.http.common.BufferSupplier;
    41 import jdk.internal.net.http.common.BufferSupplier;
    42 import jdk.internal.net.http.common.Demand;
    42 import jdk.internal.net.http.common.Demand;
    43 import jdk.internal.net.http.common.FlowTube;
    43 import jdk.internal.net.http.common.FlowTube;
       
    44 import jdk.internal.net.http.common.Log;
    44 import jdk.internal.net.http.common.Logger;
    45 import jdk.internal.net.http.common.Logger;
    45 import jdk.internal.net.http.common.SequentialScheduler;
    46 import jdk.internal.net.http.common.SequentialScheduler;
    46 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
    47 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
    47 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask;
    48 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask;
    48 import jdk.internal.net.http.common.Utils;
    49 import jdk.internal.net.http.common.Utils;
   147     // ======================================================================//
   148     // ======================================================================//
   148 
   149 
   149     void signalClosed() {
   150     void signalClosed() {
   150         // Ensures that the subscriber will be terminated and that future
   151         // Ensures that the subscriber will be terminated and that future
   151         // subscribers will be notified when the connection is closed.
   152         // subscribers will be notified when the connection is closed.
       
   153         if (Log.channel()) {
       
   154             Log.logChannel("Connection close signalled: connection closed locally ({0})",
       
   155                     channelDescr());
       
   156         }
   152         readPublisher.subscriptionImpl.signalError(
   157         readPublisher.subscriptionImpl.signalError(
   153                 new IOException("connection closed locally"));
   158                 new IOException("connection closed locally"));
   154     }
   159     }
   155 
   160 
   156     /**
   161     /**
   362         // Kick off the initial request:1 that will start the writing side.
   367         // Kick off the initial request:1 that will start the writing side.
   363         // Invoked in the selector manager thread.
   368         // Invoked in the selector manager thread.
   364         void startSubscription() {
   369         void startSubscription() {
   365             try {
   370             try {
   366                 if (debug.on()) debug.log("write: starting subscription");
   371                 if (debug.on()) debug.log("write: starting subscription");
       
   372                 if (Log.channel()) {
       
   373                     Log.logChannel("Start requesting bytes for writing to channel: {0}",
       
   374                             channelDescr());
       
   375                 }
   367                 assert client.isSelectorThread();
   376                 assert client.isSelectorThread();
   368                 // make sure read registrations are handled before;
   377                 // make sure read registrations are handled before;
   369                 readPublisher.subscriptionImpl.handlePending();
   378                 readPublisher.subscriptionImpl.handlePending();
   370                 if (debug.on()) debug.log("write: offloading requestMore");
   379                 if (debug.on()) debug.log("write: offloading requestMore");
   371                 // start writing;
   380                 // start writing;
   407             tryFlushCurrent(true);
   416             tryFlushCurrent(true);
   408         }
   417         }
   409 
   418 
   410         void signalError(Throwable error) {
   419         void signalError(Throwable error) {
   411             debug.log(() -> "write error: " + error);
   420             debug.log(() -> "write error: " + error);
       
   421             if (Log.channel()) {
       
   422                 Log.logChannel("Failed to write on channel ({0}: {1})",
       
   423                         channelDescr(), error);
       
   424             }
   412             completed = true;
   425             completed = true;
   413             readPublisher.signalError(error);
   426             readPublisher.signalError(error);
   414         }
   427         }
   415 
   428 
   416         // A repeatable WriteEvent which is paused after firing and can
   429         // A repeatable WriteEvent which is paused after firing and can
   557         void signalError(Throwable error) {
   570         void signalError(Throwable error) {
   558             if (debug.on()) debug.log("error signalled " + error);
   571             if (debug.on()) debug.log("error signalled " + error);
   559             if (!errorRef.compareAndSet(null, error)) {
   572             if (!errorRef.compareAndSet(null, error)) {
   560                 return;
   573                 return;
   561             }
   574             }
       
   575             if (Log.channel()) {
       
   576                 Log.logChannel("Error signalled on channel {0}: {1}",
       
   577                         channelDescr(), error);
       
   578             }
   562             subscriptionImpl.handleError();
   579             subscriptionImpl.handleError();
   563         }
   580         }
   564 
   581 
   565         final class ReadSubscription implements Flow.Subscription {
   582         final class ReadSubscription implements Flow.Subscription {
   566             final InternalReadSubscription impl;
   583             final InternalReadSubscription impl;
   664             }
   681             }
   665 
   682 
   666             final void handleSubscribeEvent() {
   683             final void handleSubscribeEvent() {
   667                 assert client.isSelectorThread();
   684                 assert client.isSelectorThread();
   668                 debug.log("subscribe event raised");
   685                 debug.log("subscribe event raised");
       
   686                 if (Log.channel()) Log.logChannel("Start reading from {0}", channelDescr());
   669                 readScheduler.runOrSchedule();
   687                 readScheduler.runOrSchedule();
   670                 if (readScheduler.isStopped() || completed) {
   688                 if (readScheduler.isStopped() || completed) {
   671                     // if already completed or stopped we can handle any
   689                     // if already completed or stopped we can handle any
   672                     // pending connection directly from here.
   690                     // pending connection directly from here.
   673                     if (debug.on())
   691                     if (debug.on())
   787                             try {
   805                             try {
   788                                 List<ByteBuffer> bytes = readAvailable(current.bufferSource);
   806                                 List<ByteBuffer> bytes = readAvailable(current.bufferSource);
   789                                 if (bytes == EOF) {
   807                                 if (bytes == EOF) {
   790                                     if (!completed) {
   808                                     if (!completed) {
   791                                         if (debug.on()) debug.log("got read EOF");
   809                                         if (debug.on()) debug.log("got read EOF");
       
   810                                         if (Log.channel()) {
       
   811                                             Log.logChannel("EOF reached from channel: {0}",
       
   812                                                         channelDescr());
       
   813                                         }
   792                                         completed = true;
   814                                         completed = true;
   793                                         // safe to pause here because we're finished
   815                                         // safe to pause here because we're finished
   794                                         // anyway.
   816                                         // anyway.
   795                                         pauseReadEvent();
   817                                         pauseReadEvent();
   796                                         current.signalCompletion();
   818                                         current.signalCompletion();
  1237     }
  1259     }
  1238 
  1260 
  1239     final String dbgString() {
  1261     final String dbgString() {
  1240         return "SocketTube("+id+")";
  1262         return "SocketTube("+id+")";
  1241     }
  1263     }
       
  1264 
       
  1265     final String channelDescr() {
       
  1266         return String.valueOf(channel);
       
  1267     }
  1242 }
  1268 }