http-client-branch: Fix race candition triggered by HTTP/1.1 connection reuse in SocketTube::connectFlows
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Sat Mar 03 20:21:35 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Tue Mar 06 12:09:12 2018 +0000
@@ -641,9 +641,9 @@
: null;
flowTag = tag = flow == null ? null: (String.valueOf(flow));
if (flowTag != null) {
- dbgTag = tag = flowTag + " Http1AsyncReceiver";
+ dbgTag = tag = "Http1AsyncReceiver("+ flowTag + ")";
} else {
- tag = "Http1AsyncReceiver";
+ tag = "Http1AsyncReceiver(?)";
}
}
return tag;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Sat Mar 03 20:21:35 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Tue Mar 06 12:09:12 2018 +0000
@@ -858,7 +858,7 @@
Log.logFrames(sf, "OUT");
// send preface bytes and SettingsFrame together
HttpPublisher publisher = publisher();
- publisher.enqueue(List.of(buf));
+ publisher.enqueueUnordered(List.of(buf));
publisher.signalEnqueued();
// mark preface sent.
framesController.markPrefaceSent();
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java Sat Mar 03 20:21:35 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java Tue Mar 06 12:09:12 2018 +0000
@@ -390,6 +390,7 @@
this.reading = readingLock;
}
final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>();
+ final ConcurrentLinkedDeque<List<ByteBuffer>> priority = new ConcurrentLinkedDeque<>();
volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
volatile HttpWriteSubscription subscription;
final SequentialScheduler writeScheduler =
@@ -441,9 +442,18 @@
+ getConnectionFlow());
}
+ private boolean isEmpty() {
+ return queue.isEmpty() && priority.isEmpty();
+ }
+
+ private List<ByteBuffer> poll() {
+ List<ByteBuffer> elem = priority.poll();
+ return elem == null ? queue.poll() : elem;
+ }
+
void flush() {
- while (!queue.isEmpty() && demand.tryDecrement()) {
- List<ByteBuffer> elem = queue.poll();
+ while (!isEmpty() && demand.tryDecrement()) {
+ List<ByteBuffer> elem = poll();
debug.log(Level.DEBUG, () -> "HttpPublisher: sending "
+ Utils.remaining(elem) + " bytes ("
+ elem.size() + " buffers) to "
@@ -464,8 +474,8 @@
public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
// Unordered frames are sent before existing frames.
int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
- queue.addFirst(buffers);
- debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes);
+ priority.add(buffers);
+ debug.log(Level.DEBUG, "added %d bytes in the priority write queue", bytes);
}
@Override
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Sat Mar 03 20:21:35 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Tue Mar 06 12:09:12 2018 +0000
@@ -200,7 +200,7 @@
Demand rdemand = sub == null ? null : sub.demand;
InternalWriteSubscriber.WriteEvent writeEvent =
writeSubscriber.writeEvent;
- AtomicLong wdemand = writeSubscriber.writeDemand;
+ Demand wdemand = writeSubscriber.writeDemand;
int rops = readEvent == null ? 0 : readEvent.interestOps();
long rd = rdemand == null ? 0 : rdemand.get();
int wops = writeEvent == null ? 0 : writeEvent.interestOps();
@@ -278,23 +278,27 @@
volatile Flow.Subscription subscription;
volatile List<ByteBuffer> current;
volatile boolean completed;
+ final AsyncTriggerEvent startSubscription =
+ new AsyncTriggerEvent(this::signalError, this::startSubscription);
final WriteEvent writeEvent = new WriteEvent(channel, this);
- final AtomicLong writeDemand = new AtomicLong();
+ final Demand writeDemand = new Demand();
@Override
public void onSubscribe(Flow.Subscription subscription) {
Flow.Subscription previous = this.subscription;
this.subscription = subscription;
debug.log(Level.DEBUG, "subscribed for writing");
- if (current == null) {
- if (previous == subscription || previous == null) {
- if (writeDemand.compareAndSet(0, 1)) {
- subscription.request(1);
+ try {
+ if (current == null) {
+ if (previous != subscription && previous != null) {
+ debug.log(Level.DEBUG, "write: resetting demand to 0");
+ writeDemand.reset();
}
- } else {
- writeDemand.set(1);
- subscription.request(1);
+ debug.log(Level.DEBUG, "write: registering startSubscription event");
+ client.registerEvent(startSubscription);
}
+ } catch (Throwable t) {
+ signalError(t);
}
}
@@ -344,14 +348,15 @@
assert written <= remaining;
if (remaining - written == 0) {
current = null;
- writeDemand.decrementAndGet();
- Runnable requestMore = this::requestMore;
- if (inSelectorThread) {
- assert client.isSelectorThread();
- client.theExecutor().execute(requestMore);
- } else {
- assert !client.isSelectorThread();
- requestMore.run();
+ if (writeDemand.tryDecrement()) {
+ Runnable requestMore = this::requestMore;
+ if (inSelectorThread) {
+ assert client.isSelectorThread();
+ client.theExecutor().execute(requestMore);
+ } else {
+ assert !client.isSelectorThread();
+ requestMore.run();
+ }
}
} else {
resumeWriteEvent(inSelectorThread);
@@ -362,11 +367,28 @@
}
}
+ // Kick off the initial request:1 that will start
+ // the writing side. Called from the selector manager
+ // thread.
+ void startSubscription() {
+ try {
+ debug.log(Level.DEBUG, "write: starting subscription");
+ assert client.isSelectorThread();
+ // make sure read registrations are handled before;
+ readPublisher.subscriptionImpl.handlePending();
+ debug.log(Level.DEBUG, "write: offloading requestMore");
+ // start writing;
+ client.theExecutor().execute(this::requestMore);
+ } catch(Throwable t) {
+ signalError(t);
+ }
+ }
+
void requestMore() {
try {
if (completed) return;
long d = writeDemand.get();
- if (writeDemand.compareAndSet(0,1)) {
+ if (writeDemand.increaseIfFulfilled()) {
debug.log(Level.DEBUG, "write: requesting more...");
subscription.request(1);
} else {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Demand.java Sat Mar 03 20:21:35 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Demand.java Tue Mar 06 12:09:12 2018 +0000
@@ -54,6 +54,14 @@
}
/**
+ * Increase this demand by 1 but only if it is fulfilled.
+ * @return true if the demand was increased, false otherwise.
+ */
+ public boolean increaseIfFulfilled() {
+ return val.compareAndSet(0, 1);
+ }
+
+ /**
* Tries to decrease this demand by the specified positive value.
*
* <p> The actual value this demand has been decreased by might be less than