--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Sun Mar 25 09:02:36 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Mon Mar 26 19:54:18 2018 +0100
@@ -32,7 +32,6 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Flow;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.nio.channels.SelectableChannel;
@@ -78,13 +77,6 @@
this.writeSubscriber = new InternalWriteSubscriber();
}
-// private static Flow.Subscription nopSubscription() {
-// return new Flow.Subscription() {
-// @Override public void request(long n) { }
-// @Override public void cancel() { }
-// };
-// }
-
/**
* Returns {@code true} if this flow is finished.
* This happens when this flow internal read subscription is completed,
@@ -276,7 +268,7 @@
private final class InternalWriteSubscriber
implements Flow.Subscriber<List<ByteBuffer>> {
- volatile Flow.Subscription subscription;
+ volatile WriteSubscription subscription;
volatile List<ByteBuffer> current;
volatile boolean completed;
final AsyncTriggerEvent startSubscription =
@@ -286,14 +278,13 @@
@Override
public void onSubscribe(Flow.Subscription subscription) {
- Flow.Subscription previous = this.subscription;
- this.subscription = subscription;
+ WriteSubscription previous = this.subscription;
+ this.subscription = new WriteSubscription(subscription);
debug.log(Level.DEBUG, "subscribed for writing");
try {
if (current == null) {
- if (previous != subscription && previous != null) {
- debug.log(Level.DEBUG, "write: resetting demand to 0");
- writeDemand.reset();
+ if (previous != null && previous.upstreamSubscription != subscription) {
+ previous.dropSubscription();
}
debug.log(Level.DEBUG, "write: registering startSubscription event");
client.registerEvent(startSubscription);
@@ -388,23 +379,8 @@
}
void requestMore() {
- try {
- if (completed) return;
- long d = writeDemand.get();
- if (writeDemand.increaseIfFulfilled()) {
- debug.log(Level.DEBUG, "write: requesting more...");
- subscription.request(1);
- } else {
- debug.log(Level.DEBUG, "write: no need to request more: %d", d);
- }
- } catch (Throwable t) {
- debug.log(Level.DEBUG, () ->
- "write: error while requesting more: " + t);
- signalError(t);
- subscription.cancel();
- } finally {
- debugState("leaving requestMore: ");
- }
+ WriteSubscription subscription = this.subscription;
+ subscription.requestMore();
}
@Override
@@ -469,6 +445,60 @@
}
+ final class WriteSubscription implements Flow.Subscription {
+ final Flow.Subscription upstreamSubscription;
+ volatile boolean cancelled;
+ WriteSubscription(Flow.Subscription subscription) {
+ this.upstreamSubscription = subscription;
+ }
+
+ @Override
+ public void request(long n) {
+ upstreamSubscription.request(n);
+ }
+
+ @Override
+ public void cancel() {
+ dropSubscription();
+ upstreamSubscription.cancel();
+ }
+
+ synchronized void dropSubscription() {
+ cancelled = true;
+ debug.log(Level.DEBUG, "write: resetting demand to 0");
+ writeDemand.reset();
+ }
+
+ void requestMore() {
+ try {
+ if (completed || cancelled) return;
+ boolean requestMore;
+ long d;
+ // don't fiddle with demand after cancel.
+ // see dropSubscription.
+ synchronized (this) {
+ if (cancelled) return;
+ d = writeDemand.get();
+ requestMore = writeDemand.increaseIfFulfilled();
+ }
+ if (requestMore) {
+ debug.log(Level.DEBUG, "write: requesting more...");
+ upstreamSubscription.request(1);
+ } else {
+ debug.log(Level.DEBUG, "write: no need to request more: %d", d);
+ }
+ } catch (Throwable t) {
+ debug.log(Level.DEBUG, () ->
+ "write: error while requesting more: " + t);
+ cancelled = true;
+ signalError(t);
+ subscription.cancel();
+ } finally {
+ debugState("leaving requestMore: ");
+ }
+ }
+ }
+
}
// ===================================================================== //
@@ -893,10 +923,14 @@
if (!buf.hasRemaining()) break;
}
} catch (IOException x) {
- if (buf.position() == pos) {
+ if (buf.position() == pos && (list == null || list.isEmpty())) {
+ // if we have read no bytes, just throw...
throw x;
} else {
+ // we have some bytes. return them and fail
+ // next time.
errorRef.compareAndSet(null, x);
+ read = 0; // make sure we will exit the outer loop
}
}
@@ -935,10 +969,18 @@
final long remaining = Utils.remaining(srcs);
long written = 0;
while (remaining > written) {
- long w = channel.write(srcs);
- if (w == -1 && written == 0) return -1;
- if (w == 0) break;
- written += w;
+ try {
+ long w = channel.write(srcs);
+ if (w == -1 && written == 0) return -1;
+ if (w == 0) break;
+ written += w;
+ } catch (IOException x) {
+ // if no bytes were written just throws...
+ if (written == 0) throw x;
+ // otherwise return how many bytes were
+ // written: we will fail next time.
+ break;
+ }
}
return written;
}