src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
changeset 50681 4254bed3c09d
parent 49944 4690a2871b44
child 50985 cd41f34e548c
child 56795 03ece2518428
equal deleted inserted replaced
50678:818a23db260c 50681:4254bed3c09d
    39 import java.util.function.Consumer;
    39 import java.util.function.Consumer;
    40 import java.util.function.Supplier;
    40 import java.util.function.Supplier;
    41 import jdk.internal.net.http.common.BufferSupplier;
    41 import jdk.internal.net.http.common.BufferSupplier;
    42 import jdk.internal.net.http.common.Demand;
    42 import jdk.internal.net.http.common.Demand;
    43 import jdk.internal.net.http.common.FlowTube;
    43 import jdk.internal.net.http.common.FlowTube;
       
    44 import jdk.internal.net.http.common.Log;
    44 import jdk.internal.net.http.common.Logger;
    45 import jdk.internal.net.http.common.Logger;
    45 import jdk.internal.net.http.common.SequentialScheduler;
    46 import jdk.internal.net.http.common.SequentialScheduler;
    46 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
    47 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
    47 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask;
    48 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask;
    48 import jdk.internal.net.http.common.Utils;
    49 import jdk.internal.net.http.common.Utils;
   147     // ======================================================================//
   148     // ======================================================================//
   148 
   149 
   149     void signalClosed() {
   150     void signalClosed() {
   150         // Ensures that the subscriber will be terminated and that future
   151         // Ensures that the subscriber will be terminated and that future
   151         // subscribers will be notified when the connection is closed.
   152         // subscribers will be notified when the connection is closed.
       
   153         if (Log.channel()) {
       
   154             Log.logChannel("Connection close signalled: connection closed locally ({0})",
       
   155                     channelDescr());
       
   156         }
   152         readPublisher.subscriptionImpl.signalError(
   157         readPublisher.subscriptionImpl.signalError(
   153                 new IOException("connection closed locally"));
   158                 new IOException("connection closed locally"));
   154     }
   159     }
   155 
   160 
   156     /**
   161     /**
   362         // Kick off the initial request:1 that will start the writing side.
   367         // Kick off the initial request:1 that will start the writing side.
   363         // Invoked in the selector manager thread.
   368         // Invoked in the selector manager thread.
   364         void startSubscription() {
   369         void startSubscription() {
   365             try {
   370             try {
   366                 if (debug.on()) debug.log("write: starting subscription");
   371                 if (debug.on()) debug.log("write: starting subscription");
       
   372                 if (Log.channel()) {
       
   373                     Log.logChannel("Start requesting bytes for writing to channel: {0}",
       
   374                             channelDescr());
       
   375                 }
   367                 assert client.isSelectorThread();
   376                 assert client.isSelectorThread();
   368                 // make sure read registrations are handled before;
   377                 // make sure read registrations are handled before;
   369                 readPublisher.subscriptionImpl.handlePending();
   378                 readPublisher.subscriptionImpl.handlePending();
   370                 if (debug.on()) debug.log("write: offloading requestMore");
   379                 if (debug.on()) debug.log("write: offloading requestMore");
   371                 // start writing;
   380                 // start writing;
   407             tryFlushCurrent(true);
   416             tryFlushCurrent(true);
   408         }
   417         }
   409 
   418 
   410         void signalError(Throwable error) {
   419         void signalError(Throwable error) {
   411             debug.log(() -> "write error: " + error);
   420             debug.log(() -> "write error: " + error);
       
   421             if (Log.channel()) {
       
   422                 Log.logChannel("Failed to write to channel ({0}: {1})",
       
   423                         channelDescr(), error);
       
   424             }
   412             completed = true;
   425             completed = true;
   413             readPublisher.signalError(error);
   426             readPublisher.signalError(error);
   414         }
   427         }
   415 
   428 
   416         // A repeatable WriteEvent which is paused after firing and can
   429         // A repeatable WriteEvent which is paused after firing and can
   453                 upstreamSubscription.request(n);
   466                 upstreamSubscription.request(n);
   454             }
   467             }
   455 
   468 
   456             @Override
   469             @Override
   457             public void cancel() {
   470             public void cancel() {
       
   471                 if (debug.on()) debug.log("write: cancel");
   458                 dropSubscription();
   472                 dropSubscription();
   459                 upstreamSubscription.cancel();
   473                 upstreamSubscription.cancel();
   460             }
   474             }
   461 
   475 
   462             void dropSubscription() {
   476             void dropSubscription() {
   556         void signalError(Throwable error) {
   570         void signalError(Throwable error) {
   557             if (debug.on()) debug.log("error signalled " + error);
   571             if (debug.on()) debug.log("error signalled " + error);
   558             if (!errorRef.compareAndSet(null, error)) {
   572             if (!errorRef.compareAndSet(null, error)) {
   559                 return;
   573                 return;
   560             }
   574             }
       
   575             if (Log.channel()) {
       
   576                 Log.logChannel("Error signalled on channel {0}: {1}",
       
   577                         channelDescr(), error);
       
   578             }
   561             subscriptionImpl.handleError();
   579             subscriptionImpl.handleError();
   562         }
   580         }
   563 
   581 
   564         final class ReadSubscription implements Flow.Subscription {
   582         final class ReadSubscription implements Flow.Subscription {
   565             final InternalReadSubscription impl;
   583             final InternalReadSubscription impl;
   663             }
   681             }
   664 
   682 
   665             final void handleSubscribeEvent() {
   683             final void handleSubscribeEvent() {
   666                 assert client.isSelectorThread();
   684                 assert client.isSelectorThread();
   667                 debug.log("subscribe event raised");
   685                 debug.log("subscribe event raised");
       
   686                 if (Log.channel()) Log.logChannel("Start reading from {0}", channelDescr());
   668                 readScheduler.runOrSchedule();
   687                 readScheduler.runOrSchedule();
   669                 if (readScheduler.isStopped() || completed) {
   688                 if (readScheduler.isStopped() || completed) {
   670                     // if already completed or stopped we can handle any
   689                     // if already completed or stopped we can handle any
   671                     // pending connection directly from here.
   690                     // pending connection directly from here.
   672                     if (debug.on())
   691                     if (debug.on())
   700             }
   719             }
   701 
   720 
   702             @Override
   721             @Override
   703             public final void cancel() {
   722             public final void cancel() {
   704                 pauseReadEvent();
   723                 pauseReadEvent();
       
   724                 if (Log.channel()) {
       
   725                     Log.logChannel("Read subscription cancelled for channel {0}",
       
   726                             channelDescr());
       
   727                 }
   705                 readScheduler.stop();
   728                 readScheduler.stop();
   706             }
   729             }
   707 
   730 
   708             private void resumeReadEvent() {
   731             private void resumeReadEvent() {
   709                 if (debug.on()) debug.log("resuming read event");
   732                 if (debug.on()) debug.log("resuming read event");
   724             final void signalError(Throwable error) {
   747             final void signalError(Throwable error) {
   725                 if (!errorRef.compareAndSet(null, error)) {
   748                 if (!errorRef.compareAndSet(null, error)) {
   726                     return;
   749                     return;
   727                 }
   750                 }
   728                 if (debug.on()) debug.log("got read error: " + error);
   751                 if (debug.on()) debug.log("got read error: " + error);
       
   752                 if (Log.channel()) {
       
   753                     Log.logChannel("Read error signalled on channel {0}: {1}",
       
   754                             channelDescr(), error);
       
   755                 }
   729                 readScheduler.runOrSchedule();
   756                 readScheduler.runOrSchedule();
   730             }
   757             }
   731 
   758 
   732             final void signalReadable() {
   759             final void signalReadable() {
   733                 readScheduler.runOrSchedule();
   760                 readScheduler.runOrSchedule();
   770                             // safe to pause here because we're finished anyway.
   797                             // safe to pause here because we're finished anyway.
   771                             pauseReadEvent();
   798                             pauseReadEvent();
   772                             if (debug.on())
   799                             if (debug.on())
   773                                 debug.log("Sending error " + error
   800                                 debug.log("Sending error " + error
   774                                           + " to subscriber " + subscriber);
   801                                           + " to subscriber " + subscriber);
       
   802                             if (Log.channel()) {
       
   803                                 Log.logChannel("Raising error with subscriber for {0}: {1}",
       
   804                                         channelDescr(), error);
       
   805                             }
   775                             current.errorRef.compareAndSet(null, error);
   806                             current.errorRef.compareAndSet(null, error);
   776                             current.signalCompletion();
   807                             current.signalCompletion();
   777                             readScheduler.stop();
   808                             readScheduler.stop();
   778                             debugState("leaving read() loop with error: ");
   809                             debugState("leaving read() loop with error: ");
   779                             return;
   810                             return;
   786                             try {
   817                             try {
   787                                 List<ByteBuffer> bytes = readAvailable(current.bufferSource);
   818                                 List<ByteBuffer> bytes = readAvailable(current.bufferSource);
   788                                 if (bytes == EOF) {
   819                                 if (bytes == EOF) {
   789                                     if (!completed) {
   820                                     if (!completed) {
   790                                         if (debug.on()) debug.log("got read EOF");
   821                                         if (debug.on()) debug.log("got read EOF");
       
   822                                         if (Log.channel()) {
       
   823                                             Log.logChannel("EOF read from channel: {0}",
       
   824                                                         channelDescr());
       
   825                                         }
   791                                         completed = true;
   826                                         completed = true;
   792                                         // safe to pause here because we're finished
   827                                         // safe to pause here because we're finished
   793                                         // anyway.
   828                                         // anyway.
   794                                         pauseReadEvent();
   829                                         pauseReadEvent();
   795                                         current.signalCompletion();
   830                                         current.signalCompletion();
   847                     }
   882                     }
   848                 } catch (Throwable t) {
   883                 } catch (Throwable t) {
   849                     if (debug.on()) debug.log("Unexpected exception in read loop", t);
   884                     if (debug.on()) debug.log("Unexpected exception in read loop", t);
   850                     signalError(t);
   885                     signalError(t);
   851                 } finally {
   886                 } finally {
       
   887                     if (readScheduler.isStopped()) {
       
   888                         if (debug.on()) debug.log("Read scheduler stopped");
       
   889                         if (Log.channel()) {
       
   890                             Log.logChannel("Stopped reading from channel {0}", channelDescr());
       
   891                         }
       
   892                     }
   852                     handlePending();
   893                     handlePending();
   853                 }
   894                 }
   854             }
   895             }
   855 
   896 
   856             boolean handlePending() {
   897             boolean handlePending() {
  1236     }
  1277     }
  1237 
  1278 
  1238     final String dbgString() {
  1279     final String dbgString() {
  1239         return "SocketTube("+id+")";
  1280         return "SocketTube("+id+")";
  1240     }
  1281     }
       
  1282 
       
  1283     final String channelDescr() {
       
  1284         return String.valueOf(channel);
       
  1285     }
  1241 }
  1286 }