http-client-branch: review comment - additional small cleanups in SocketTube http-client-branch
authorchegar
Thu, 29 Mar 2018 21:45:58 +0100
branchhttp-client-branch
changeset 56374 e453d21310bd
parent 56372 98075dcd4ffc
child 56377 eef94a3576a4
http-client-branch: review comment - additional small cleanups in SocketTube
src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Thu Mar 29 20:49:13 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Thu Mar 29 21:45:58 2018 +0100
@@ -25,7 +25,6 @@
 
 package jdk.internal.net.http;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.lang.System.Logger.Level;
 import java.nio.ByteBuffer;
@@ -40,7 +39,6 @@
 import java.util.ArrayList;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
-
 import jdk.internal.net.http.common.Demand;
 import jdk.internal.net.http.common.FlowTube;
 import jdk.internal.net.http.common.SequentialScheduler;
@@ -51,7 +49,7 @@
 /**
  * A SocketTube is a terminal tube plugged directly into the socket.
  * The read subscriber should call {@code subscribe} on the SocketTube before
- * the SocketTube can be subscribed to the write publisher.
+ * the SocketTube is subscribed to the write publisher.
  */
 final class SocketTube implements FlowTube {
 
@@ -148,9 +146,8 @@
     // ======================================================================//
 
     void signalClosed() {
-        // Ensure that the subscriber will be terminated
-        // and that future subscribers will be notified
-        // when the connection is closed.
+        // Ensures that the subscriber will be terminated and that future
+        // subscribers will be notified when the connection is closed.
         readPublisher.subscriptionImpl.signalError(
                 new IOException("connection closed locally"));
     }
@@ -177,11 +174,10 @@
         }
     }
 
-    // This is best effort - there's no guarantee that the printed set
-    // of values is consistent. It should only be considered as
-    // weakly accurate - in particular in what concerns the events states,
-    // especially when displaying a read event state from a write event
-    // callback and conversely.
+    // This is best effort - there's no guarantee that the printed set of values
+    // is consistent. It should only be considered as weakly accurate - in
+    // particular in what concerns the events states, especially when displaying
+    // a read event state from a write event callback and conversely.
     void debugState(String when) {
         if (debug.isLoggable(Level.DEBUG)) {
             StringBuilder state = new StringBuilder();
@@ -211,11 +207,10 @@
     }
 
     /**
-     * A repeatable event that can be paused or resumed by changing
-     * its interestOps.
-     * When the event is fired, it is first paused before being signaled.
-     * It is the responsibility of the code triggered by {@code signalEvent}
-     * to resume the event if required.
+     * A repeatable event that can be paused or resumed by changing its
+     * interestOps. When the event is fired, it is first paused before being
+     * signaled. It is the responsibility of the code triggered by
+     * {@code signalEvent} to resume the event if required.
      */
     private static abstract class SocketFlowEvent extends AsyncEvent {
         final SocketChannel channel;
@@ -259,9 +254,9 @@
     //                              Writing                                  //
     // ======================================================================//
 
-    // This class makes the assumption that the publisher will call
-    // onNext sequentially, and that onNext won't be called if the demand
-    // has not been incremented by request(1).
+    // This class makes the assumption that the publisher will call onNext
+    // sequentially, and that onNext won't be called if the demand has not been
+    // incremented by request(1).
     // It has a 'queue of 1' meaning that it will call request(1) in
     // onSubscribe, and then only after its 'current' buffer list has been
     // fully written and current set to null;
@@ -312,18 +307,19 @@
             debugState("leaving w.onNext");
         }
 
-        // we don't use a SequentialScheduler here: we rely on
-        // onNext() being called sequentially, and not being called
-        // if we haven't call request(1)
-        // onNext is usually called from within a user/executor thread.
-        // we will perform the initial writing in that thread.
-        // if for some reason, not all data can be written, the writeEvent
-        // will be resumed, and the rest of the data will be written from
-        // the selector manager thread when the writeEvent is fired.
-        // If we are in the selector manager thread, then we will use the executor
-        // to call request(1), ensuring that onNext() won't be called from
-        // within the selector thread.
-        // If we are not in the selector manager thread, then we don't care.
+        // Don't use a SequentialScheduler here: rely on onNext() being invoked
+        // sequentially, and not being invoked if there is no demand, request(1).
+        // onNext is usually called from within a user / executor thread.
+        // Initial writing will be performed in that thread. If for some reason,
+        // not all the data can be written, a writeEvent will be registered, and
+        // writing will resume in the the selector manager thread when the
+        // writeEvent is fired.
+        //
+        // If this method is invoked in the selector manager thread (because of
+        // a writeEvent), then the executor will be used to invoke request(1),
+        // ensuring that onNext() won't be invoked from within the selector
+        // thread. If not in the selector manager thread, then request(1) is
+        // invoked directly.
         void tryFlushCurrent(boolean inSelectorThread) {
             List<ByteBuffer> bufs = current;
             if (bufs == null) return;
@@ -335,10 +331,7 @@
                 debug.log(Level.DEBUG, "trying to write: %d", remaining);
                 long written = writeAvailable(bufs);
                 debug.log(Level.DEBUG, "wrote: %d", written);
-                if (written == -1) {
-                    signalError(new EOFException("EOF reached while writing"));
-                    return;
-                }
+                assert written >= 0 : "negative number of bytes written:" + written;
                 assert written <= remaining;
                 if (remaining - written == 0) {
                     current = null;
@@ -361,9 +354,8 @@
             }
         }
 
-        // Kick off the initial request:1 that will start
-        // the writing side. Called from the selector manager
-        // thread.
+        // Kick off the initial request:1 that will start the writing side.
+        // Invoked in the selector manager thread.
         void startSubscription() {
             try {
                 debug.log(Level.DEBUG, "write: starting subscription");
@@ -973,15 +965,19 @@
         while (remaining > written) {
             try {
                 long w = channel.write(srcs);
-                if (w == -1 && written == 0) return -1;
-                if (w == 0) break;
+                assert w >= 0 : "negative number of bytes written:" + w;
+                if (w == 0) {
+                    break;
+                }
                 written += w;
             } catch (IOException x) {
-                // if no bytes were written just throws...
-                if (written == 0) throw x;
-                // otherwise return how many bytes were
-                // written: we will fail next time.
-                break;
+                if (written == 0) {
+                    // no bytes were written just throw
+                    throw x;
+                } else {
+                    // return how many bytes were written, will fail next time
+                    break;
+                }
             }
         }
         return written;