# HG changeset patch # User chegar # Date 1522356358 -3600 # Node ID e453d21310bd6dec4644ffc367a5eed9e2b58205 # Parent 98075dcd4ffc7368444f22bc86183e6db6628b17 http-client-branch: review comment - additional small cleanups in SocketTube diff -r 98075dcd4ffc -r e453d21310bd src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Thu Mar 29 20:49:13 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Thu Mar 29 21:45:58 2018 +0100 @@ -25,7 +25,6 @@ package jdk.internal.net.http; -import java.io.EOFException; import java.io.IOException; import java.lang.System.Logger.Level; import java.nio.ByteBuffer; @@ -40,7 +39,6 @@ import java.util.ArrayList; import java.util.function.Consumer; import java.util.function.Supplier; - import jdk.internal.net.http.common.Demand; import jdk.internal.net.http.common.FlowTube; import jdk.internal.net.http.common.SequentialScheduler; @@ -51,7 +49,7 @@ /** * A SocketTube is a terminal tube plugged directly into the socket. * The read subscriber should call {@code subscribe} on the SocketTube before - * the SocketTube can be subscribed to the write publisher. + * the SocketTube is subscribed to the write publisher. */ final class SocketTube implements FlowTube { @@ -148,9 +146,8 @@ // ======================================================================// void signalClosed() { - // Ensure that the subscriber will be terminated - // and that future subscribers will be notified - // when the connection is closed. + // Ensures that the subscriber will be terminated and that future + // subscribers will be notified when the connection is closed. readPublisher.subscriptionImpl.signalError( new IOException("connection closed locally")); } @@ -177,11 +174,10 @@ } } - // This is best effort - there's no guarantee that the printed set - // of values is consistent. It should only be considered as - // weakly accurate - in particular in what concerns the events states, - // especially when displaying a read event state from a write event - // callback and conversely. + // This is best effort - there's no guarantee that the printed set of values + // is consistent. It should only be considered as weakly accurate - in + // particular in what concerns the events states, especially when displaying + // a read event state from a write event callback and conversely. void debugState(String when) { if (debug.isLoggable(Level.DEBUG)) { StringBuilder state = new StringBuilder(); @@ -211,11 +207,10 @@ } /** - * A repeatable event that can be paused or resumed by changing - * its interestOps. - * When the event is fired, it is first paused before being signaled. - * It is the responsibility of the code triggered by {@code signalEvent} - * to resume the event if required. + * A repeatable event that can be paused or resumed by changing its + * interestOps. When the event is fired, it is first paused before being + * signaled. It is the responsibility of the code triggered by + * {@code signalEvent} to resume the event if required. */ private static abstract class SocketFlowEvent extends AsyncEvent { final SocketChannel channel; @@ -259,9 +254,9 @@ // Writing // // ======================================================================// - // This class makes the assumption that the publisher will call - // onNext sequentially, and that onNext won't be called if the demand - // has not been incremented by request(1). + // This class makes the assumption that the publisher will call onNext + // sequentially, and that onNext won't be called if the demand has not been + // incremented by request(1). // It has a 'queue of 1' meaning that it will call request(1) in // onSubscribe, and then only after its 'current' buffer list has been // fully written and current set to null; @@ -312,18 +307,19 @@ debugState("leaving w.onNext"); } - // we don't use a SequentialScheduler here: we rely on - // onNext() being called sequentially, and not being called - // if we haven't call request(1) - // onNext is usually called from within a user/executor thread. - // we will perform the initial writing in that thread. - // if for some reason, not all data can be written, the writeEvent - // will be resumed, and the rest of the data will be written from - // the selector manager thread when the writeEvent is fired. - // If we are in the selector manager thread, then we will use the executor - // to call request(1), ensuring that onNext() won't be called from - // within the selector thread. - // If we are not in the selector manager thread, then we don't care. + // Don't use a SequentialScheduler here: rely on onNext() being invoked + // sequentially, and not being invoked if there is no demand, request(1). + // onNext is usually called from within a user / executor thread. + // Initial writing will be performed in that thread. If for some reason, + // not all the data can be written, a writeEvent will be registered, and + // writing will resume in the the selector manager thread when the + // writeEvent is fired. + // + // If this method is invoked in the selector manager thread (because of + // a writeEvent), then the executor will be used to invoke request(1), + // ensuring that onNext() won't be invoked from within the selector + // thread. If not in the selector manager thread, then request(1) is + // invoked directly. void tryFlushCurrent(boolean inSelectorThread) { List bufs = current; if (bufs == null) return; @@ -335,10 +331,7 @@ debug.log(Level.DEBUG, "trying to write: %d", remaining); long written = writeAvailable(bufs); debug.log(Level.DEBUG, "wrote: %d", written); - if (written == -1) { - signalError(new EOFException("EOF reached while writing")); - return; - } + assert written >= 0 : "negative number of bytes written:" + written; assert written <= remaining; if (remaining - written == 0) { current = null; @@ -361,9 +354,8 @@ } } - // Kick off the initial request:1 that will start - // the writing side. Called from the selector manager - // thread. + // Kick off the initial request:1 that will start the writing side. + // Invoked in the selector manager thread. void startSubscription() { try { debug.log(Level.DEBUG, "write: starting subscription"); @@ -973,15 +965,19 @@ while (remaining > written) { try { long w = channel.write(srcs); - if (w == -1 && written == 0) return -1; - if (w == 0) break; + assert w >= 0 : "negative number of bytes written:" + w; + if (w == 0) { + break; + } written += w; } catch (IOException x) { - // if no bytes were written just throws... - if (written == 0) throw x; - // otherwise return how many bytes were - // written: we will fail next time. - break; + if (written == 0) { + // no bytes were written just throw + throw x; + } else { + // return how many bytes were written, will fail next time + break; + } } } return written;