src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
branchhttp-client-branch
changeset 56770 f1626bc0a010
parent 56768 d6b08065edf5
child 56795 03ece2518428
equal deleted inserted replaced
56768:d6b08065edf5 56770:f1626bc0a010
   417         }
   417         }
   418 
   418 
   419         void signalError(Throwable error) {
   419         void signalError(Throwable error) {
   420             debug.log(() -> "write error: " + error);
   420             debug.log(() -> "write error: " + error);
   421             if (Log.channel()) {
   421             if (Log.channel()) {
   422                 Log.logChannel("Failed to write on channel ({0}: {1})",
   422                 Log.logChannel("Failed to write to channel ({0}: {1})",
   423                         channelDescr(), error);
   423                         channelDescr(), error);
   424             }
   424             }
   425             completed = true;
   425             completed = true;
   426             readPublisher.signalError(error);
   426             readPublisher.signalError(error);
   427         }
   427         }
   719             }
   719             }
   720 
   720 
   721             @Override
   721             @Override
   722             public final void cancel() {
   722             public final void cancel() {
   723                 pauseReadEvent();
   723                 pauseReadEvent();
       
   724                 if (Log.channel()) {
       
   725                     Log.logChannel("Read subscription cancelled for channel {0}",
       
   726                             channelDescr());
       
   727                 }
   724                 readScheduler.stop();
   728                 readScheduler.stop();
   725             }
   729             }
   726 
   730 
   727             private void resumeReadEvent() {
   731             private void resumeReadEvent() {
   728                 if (debug.on()) debug.log("resuming read event");
   732                 if (debug.on()) debug.log("resuming read event");
   743             final void signalError(Throwable error) {
   747             final void signalError(Throwable error) {
   744                 if (!errorRef.compareAndSet(null, error)) {
   748                 if (!errorRef.compareAndSet(null, error)) {
   745                     return;
   749                     return;
   746                 }
   750                 }
   747                 if (debug.on()) debug.log("got read error: " + error);
   751                 if (debug.on()) debug.log("got read error: " + error);
       
   752                 if (Log.channel()) {
       
   753                     Log.logChannel("Read error signalled on channel {0}: {1}",
       
   754                             channelDescr(), error);
       
   755                 }
   748                 readScheduler.runOrSchedule();
   756                 readScheduler.runOrSchedule();
   749             }
   757             }
   750 
   758 
   751             final void signalReadable() {
   759             final void signalReadable() {
   752                 readScheduler.runOrSchedule();
   760                 readScheduler.runOrSchedule();
   789                             // safe to pause here because we're finished anyway.
   797                             // safe to pause here because we're finished anyway.
   790                             pauseReadEvent();
   798                             pauseReadEvent();
   791                             if (debug.on())
   799                             if (debug.on())
   792                                 debug.log("Sending error " + error
   800                                 debug.log("Sending error " + error
   793                                           + " to subscriber " + subscriber);
   801                                           + " to subscriber " + subscriber);
       
   802                             if (Log.channel()) {
       
   803                                 Log.logChannel("Raising error with subscriber for {0}: {1}",
       
   804                                         channelDescr(), error);
       
   805                             }
   794                             current.errorRef.compareAndSet(null, error);
   806                             current.errorRef.compareAndSet(null, error);
   795                             current.signalCompletion();
   807                             current.signalCompletion();
   796                             readScheduler.stop();
   808                             readScheduler.stop();
   797                             debugState("leaving read() loop with error: ");
   809                             debugState("leaving read() loop with error: ");
   798                             return;
   810                             return;
   806                                 List<ByteBuffer> bytes = readAvailable(current.bufferSource);
   818                                 List<ByteBuffer> bytes = readAvailable(current.bufferSource);
   807                                 if (bytes == EOF) {
   819                                 if (bytes == EOF) {
   808                                     if (!completed) {
   820                                     if (!completed) {
   809                                         if (debug.on()) debug.log("got read EOF");
   821                                         if (debug.on()) debug.log("got read EOF");
   810                                         if (Log.channel()) {
   822                                         if (Log.channel()) {
   811                                             Log.logChannel("EOF reached from channel: {0}",
   823                                             Log.logChannel("EOF read from channel: {0}",
   812                                                         channelDescr());
   824                                                         channelDescr());
   813                                         }
   825                                         }
   814                                         completed = true;
   826                                         completed = true;
   815                                         // safe to pause here because we're finished
   827                                         // safe to pause here because we're finished
   816                                         // anyway.
   828                                         // anyway.
   870                     }
   882                     }
   871                 } catch (Throwable t) {
   883                 } catch (Throwable t) {
   872                     if (debug.on()) debug.log("Unexpected exception in read loop", t);
   884                     if (debug.on()) debug.log("Unexpected exception in read loop", t);
   873                     signalError(t);
   885                     signalError(t);
   874                 } finally {
   886                 } finally {
       
   887                     if (readScheduler.isStopped()) {
       
   888                         if (debug.on()) debug.log("Read scheduler stopped");
       
   889                         if (Log.channel()) {
       
   890                             Log.logChannel("Stopped reading from channel {0}", channelDescr());
       
   891                         }
       
   892                     }
   875                     handlePending();
   893                     handlePending();
   876                 }
   894                 }
   877             }
   895             }
   878 
   896 
   879             boolean handlePending() {
   897             boolean handlePending() {