--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Thu Mar 29 20:49:13 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Thu Mar 29 21:45:58 2018 +0100
@@ -25,7 +25,6 @@
package jdk.internal.net.http;
-import java.io.EOFException;
import java.io.IOException;
import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
@@ -40,7 +39,6 @@
import java.util.ArrayList;
import java.util.function.Consumer;
import java.util.function.Supplier;
-
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.SequentialScheduler;
@@ -51,7 +49,7 @@
/**
* A SocketTube is a terminal tube plugged directly into the socket.
* The read subscriber should call {@code subscribe} on the SocketTube before
- * the SocketTube can be subscribed to the write publisher.
+ * the SocketTube is subscribed to the write publisher.
*/
final class SocketTube implements FlowTube {
@@ -148,9 +146,8 @@
// ======================================================================//
void signalClosed() {
- // Ensure that the subscriber will be terminated
- // and that future subscribers will be notified
- // when the connection is closed.
+ // Ensures that the subscriber will be terminated and that future
+ // subscribers will be notified when the connection is closed.
readPublisher.subscriptionImpl.signalError(
new IOException("connection closed locally"));
}
@@ -177,11 +174,10 @@
}
}
- // This is best effort - there's no guarantee that the printed set
- // of values is consistent. It should only be considered as
- // weakly accurate - in particular in what concerns the events states,
- // especially when displaying a read event state from a write event
- // callback and conversely.
+ // This is best effort - there's no guarantee that the printed set of values
+ // is consistent. It should only be considered as weakly accurate - in
+ // particular in what concerns the events states, especially when displaying
+ // a read event state from a write event callback and conversely.
void debugState(String when) {
if (debug.isLoggable(Level.DEBUG)) {
StringBuilder state = new StringBuilder();
@@ -211,11 +207,10 @@
}
/**
- * A repeatable event that can be paused or resumed by changing
- * its interestOps.
- * When the event is fired, it is first paused before being signaled.
- * It is the responsibility of the code triggered by {@code signalEvent}
- * to resume the event if required.
+ * A repeatable event that can be paused or resumed by changing its
+ * interestOps. When the event is fired, it is first paused before being
+ * signaled. It is the responsibility of the code triggered by
+ * {@code signalEvent} to resume the event if required.
*/
private static abstract class SocketFlowEvent extends AsyncEvent {
final SocketChannel channel;
@@ -259,9 +254,9 @@
// Writing //
// ======================================================================//
- // This class makes the assumption that the publisher will call
- // onNext sequentially, and that onNext won't be called if the demand
- // has not been incremented by request(1).
+ // This class makes the assumption that the publisher will call onNext
+ // sequentially, and that onNext won't be called if the demand has not been
+ // incremented by request(1).
// It has a 'queue of 1' meaning that it will call request(1) in
// onSubscribe, and then only after its 'current' buffer list has been
// fully written and current set to null;
@@ -312,18 +307,19 @@
debugState("leaving w.onNext");
}
- // we don't use a SequentialScheduler here: we rely on
- // onNext() being called sequentially, and not being called
- // if we haven't call request(1)
- // onNext is usually called from within a user/executor thread.
- // we will perform the initial writing in that thread.
- // if for some reason, not all data can be written, the writeEvent
- // will be resumed, and the rest of the data will be written from
- // the selector manager thread when the writeEvent is fired.
- // If we are in the selector manager thread, then we will use the executor
- // to call request(1), ensuring that onNext() won't be called from
- // within the selector thread.
- // If we are not in the selector manager thread, then we don't care.
+ // Don't use a SequentialScheduler here: rely on onNext() being invoked
+ // sequentially, and not being invoked if there is no demand, request(1).
+ // onNext is usually called from within a user / executor thread.
+ // Initial writing will be performed in that thread. If for some reason,
+ // not all the data can be written, a writeEvent will be registered, and
+ // writing will resume in the the selector manager thread when the
+ // writeEvent is fired.
+ //
+ // If this method is invoked in the selector manager thread (because of
+ // a writeEvent), then the executor will be used to invoke request(1),
+ // ensuring that onNext() won't be invoked from within the selector
+ // thread. If not in the selector manager thread, then request(1) is
+ // invoked directly.
void tryFlushCurrent(boolean inSelectorThread) {
List<ByteBuffer> bufs = current;
if (bufs == null) return;
@@ -335,10 +331,7 @@
debug.log(Level.DEBUG, "trying to write: %d", remaining);
long written = writeAvailable(bufs);
debug.log(Level.DEBUG, "wrote: %d", written);
- if (written == -1) {
- signalError(new EOFException("EOF reached while writing"));
- return;
- }
+ assert written >= 0 : "negative number of bytes written:" + written;
assert written <= remaining;
if (remaining - written == 0) {
current = null;
@@ -361,9 +354,8 @@
}
}
- // Kick off the initial request:1 that will start
- // the writing side. Called from the selector manager
- // thread.
+ // Kick off the initial request:1 that will start the writing side.
+ // Invoked in the selector manager thread.
void startSubscription() {
try {
debug.log(Level.DEBUG, "write: starting subscription");
@@ -973,15 +965,19 @@
while (remaining > written) {
try {
long w = channel.write(srcs);
- if (w == -1 && written == 0) return -1;
- if (w == 0) break;
+ assert w >= 0 : "negative number of bytes written:" + w;
+ if (w == 0) {
+ break;
+ }
written += w;
} catch (IOException x) {
- // if no bytes were written just throws...
- if (written == 0) throw x;
- // otherwise return how many bytes were
- // written: we will fail next time.
- break;
+ if (written == 0) {
+ // no bytes were written just throw
+ throw x;
+ } else {
+ // return how many bytes were written, will fail next time
+ break;
+ }
}
}
return written;