--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Sat Mar 03 20:21:35 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Tue Mar 06 12:09:12 2018 +0000
@@ -200,7 +200,7 @@
Demand rdemand = sub == null ? null : sub.demand;
InternalWriteSubscriber.WriteEvent writeEvent =
writeSubscriber.writeEvent;
- AtomicLong wdemand = writeSubscriber.writeDemand;
+ Demand wdemand = writeSubscriber.writeDemand;
int rops = readEvent == null ? 0 : readEvent.interestOps();
long rd = rdemand == null ? 0 : rdemand.get();
int wops = writeEvent == null ? 0 : writeEvent.interestOps();
@@ -278,23 +278,27 @@
volatile Flow.Subscription subscription;
volatile List<ByteBuffer> current;
volatile boolean completed;
+ final AsyncTriggerEvent startSubscription =
+ new AsyncTriggerEvent(this::signalError, this::startSubscription);
final WriteEvent writeEvent = new WriteEvent(channel, this);
- final AtomicLong writeDemand = new AtomicLong();
+ final Demand writeDemand = new Demand();
@Override
public void onSubscribe(Flow.Subscription subscription) {
Flow.Subscription previous = this.subscription;
this.subscription = subscription;
debug.log(Level.DEBUG, "subscribed for writing");
- if (current == null) {
- if (previous == subscription || previous == null) {
- if (writeDemand.compareAndSet(0, 1)) {
- subscription.request(1);
+ try {
+ if (current == null) {
+ if (previous != subscription && previous != null) {
+ debug.log(Level.DEBUG, "write: resetting demand to 0");
+ writeDemand.reset();
}
- } else {
- writeDemand.set(1);
- subscription.request(1);
+ debug.log(Level.DEBUG, "write: registering startSubscription event");
+ client.registerEvent(startSubscription);
}
+ } catch (Throwable t) {
+ signalError(t);
}
}
@@ -344,14 +348,15 @@
assert written <= remaining;
if (remaining - written == 0) {
current = null;
- writeDemand.decrementAndGet();
- Runnable requestMore = this::requestMore;
- if (inSelectorThread) {
- assert client.isSelectorThread();
- client.theExecutor().execute(requestMore);
- } else {
- assert !client.isSelectorThread();
- requestMore.run();
+ if (writeDemand.tryDecrement()) {
+ Runnable requestMore = this::requestMore;
+ if (inSelectorThread) {
+ assert client.isSelectorThread();
+ client.theExecutor().execute(requestMore);
+ } else {
+ assert !client.isSelectorThread();
+ requestMore.run();
+ }
}
} else {
resumeWriteEvent(inSelectorThread);
@@ -362,11 +367,28 @@
}
}
+ // Kick off the initial request:1 that will start
+ // the writing side. Called from the selector manager
+ // thread.
+ void startSubscription() {
+ try {
+ debug.log(Level.DEBUG, "write: starting subscription");
+ assert client.isSelectorThread();
+ // make sure read registrations are handled before;
+ readPublisher.subscriptionImpl.handlePending();
+ debug.log(Level.DEBUG, "write: offloading requestMore");
+ // start writing;
+ client.theExecutor().execute(this::requestMore);
+ } catch(Throwable t) {
+ signalError(t);
+ }
+ }
+
void requestMore() {
try {
if (completed) return;
long d = writeDemand.get();
- if (writeDemand.compareAndSet(0,1)) {
+ if (writeDemand.increaseIfFulfilled()) {
debug.log(Level.DEBUG, "write: requesting more...");
subscription.request(1);
} else {