http-client-branch: Fix race candition triggered by HTTP/1.1 connection reuse in SocketTube::connectFlows http-client-branch
authordfuchs
Tue, 06 Mar 2018 12:09:12 +0000
branchhttp-client-branch
changeset 56252 e4b05854c51f
parent 56235 6218673d7fa0
child 56253 875dbf6234f2
http-client-branch: Fix race candition triggered by HTTP/1.1 connection reuse in SocketTube::connectFlows
src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java
src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java
src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java
src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
src/java.net.http/share/classes/jdk/internal/net/http/common/Demand.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java	Sat Mar 03 20:21:35 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java	Tue Mar 06 12:09:12 2018 +0000
@@ -641,9 +641,9 @@
                     : null;
             flowTag = tag = flow == null ? null: (String.valueOf(flow));
             if (flowTag != null) {
-                dbgTag = tag = flowTag + " Http1AsyncReceiver";
+                dbgTag = tag = "Http1AsyncReceiver("+ flowTag + ")";
             } else {
-                tag = "Http1AsyncReceiver";
+                tag = "Http1AsyncReceiver(?)";
             }
         }
         return tag;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Sat Mar 03 20:21:35 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Tue Mar 06 12:09:12 2018 +0000
@@ -858,7 +858,7 @@
         Log.logFrames(sf, "OUT");
         // send preface bytes and SettingsFrame together
         HttpPublisher publisher = publisher();
-        publisher.enqueue(List.of(buf));
+        publisher.enqueueUnordered(List.of(buf));
         publisher.signalEnqueued();
         // mark preface sent.
         framesController.markPrefaceSent();
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java	Sat Mar 03 20:21:35 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java	Tue Mar 06 12:09:12 2018 +0000
@@ -390,6 +390,7 @@
             this.reading = readingLock;
         }
         final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>();
+        final ConcurrentLinkedDeque<List<ByteBuffer>> priority = new ConcurrentLinkedDeque<>();
         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
         volatile HttpWriteSubscription subscription;
         final SequentialScheduler writeScheduler =
@@ -441,9 +442,18 @@
                           + getConnectionFlow());
             }
 
+            private boolean isEmpty() {
+                return queue.isEmpty() && priority.isEmpty();
+            }
+
+            private List<ByteBuffer> poll() {
+                List<ByteBuffer> elem = priority.poll();
+                return elem == null ? queue.poll() : elem;
+            }
+
             void flush() {
-                while (!queue.isEmpty() && demand.tryDecrement()) {
-                    List<ByteBuffer> elem = queue.poll();
+                while (!isEmpty() && demand.tryDecrement()) {
+                    List<ByteBuffer> elem = poll();
                     debug.log(Level.DEBUG, () -> "HttpPublisher: sending "
                                 + Utils.remaining(elem) + " bytes ("
                                 + elem.size() + " buffers) to "
@@ -464,8 +474,8 @@
         public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
             // Unordered frames are sent before existing frames.
             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
-            queue.addFirst(buffers);
-            debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes);
+            priority.add(buffers);
+            debug.log(Level.DEBUG, "added %d bytes in the priority write queue", bytes);
         }
 
         @Override
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Sat Mar 03 20:21:35 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Tue Mar 06 12:09:12 2018 +0000
@@ -200,7 +200,7 @@
             Demand rdemand = sub == null ? null : sub.demand;
             InternalWriteSubscriber.WriteEvent writeEvent =
                     writeSubscriber.writeEvent;
-            AtomicLong wdemand = writeSubscriber.writeDemand;
+            Demand wdemand = writeSubscriber.writeDemand;
             int rops = readEvent == null ? 0 : readEvent.interestOps();
             long rd = rdemand == null ? 0 : rdemand.get();
             int wops = writeEvent == null ? 0 : writeEvent.interestOps();
@@ -278,23 +278,27 @@
         volatile Flow.Subscription subscription;
         volatile List<ByteBuffer> current;
         volatile boolean completed;
+        final AsyncTriggerEvent startSubscription =
+                new AsyncTriggerEvent(this::signalError, this::startSubscription);
         final WriteEvent writeEvent = new WriteEvent(channel, this);
-        final AtomicLong writeDemand = new AtomicLong();
+        final Demand writeDemand = new Demand();
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
             Flow.Subscription previous = this.subscription;
             this.subscription = subscription;
             debug.log(Level.DEBUG, "subscribed for writing");
-            if (current == null) {
-                if (previous == subscription || previous == null) {
-                    if (writeDemand.compareAndSet(0, 1)) {
-                        subscription.request(1);
+            try {
+                if (current == null) {
+                    if (previous != subscription && previous != null) {
+                        debug.log(Level.DEBUG, "write: resetting demand to 0");
+                        writeDemand.reset();
                     }
-                } else {
-                    writeDemand.set(1);
-                    subscription.request(1);
+                    debug.log(Level.DEBUG, "write: registering startSubscription event");
+                    client.registerEvent(startSubscription);
                 }
+            } catch (Throwable t) {
+                signalError(t);
             }
         }
 
@@ -344,14 +348,15 @@
                 assert written <= remaining;
                 if (remaining - written == 0) {
                     current = null;
-                    writeDemand.decrementAndGet();
-                    Runnable requestMore = this::requestMore;
-                    if (inSelectorThread) {
-                        assert client.isSelectorThread();
-                        client.theExecutor().execute(requestMore);
-                    } else {
-                        assert !client.isSelectorThread();
-                        requestMore.run();
+                    if (writeDemand.tryDecrement()) {
+                        Runnable requestMore = this::requestMore;
+                        if (inSelectorThread) {
+                            assert client.isSelectorThread();
+                            client.theExecutor().execute(requestMore);
+                        } else {
+                            assert !client.isSelectorThread();
+                            requestMore.run();
+                        }
                     }
                 } else {
                     resumeWriteEvent(inSelectorThread);
@@ -362,11 +367,28 @@
             }
         }
 
+        // Kick off the initial request:1 that will start
+        // the writing side. Called from the selector manager
+        // thread.
+        void startSubscription() {
+            try {
+                debug.log(Level.DEBUG, "write: starting subscription");
+                assert client.isSelectorThread();
+                // make sure read registrations are handled before;
+                readPublisher.subscriptionImpl.handlePending();
+                debug.log(Level.DEBUG, "write: offloading requestMore");
+                // start writing;
+                client.theExecutor().execute(this::requestMore);
+            } catch(Throwable t) {
+                signalError(t);
+            }
+        }
+
         void requestMore() {
             try {
                 if (completed) return;
                 long d =  writeDemand.get();
-                if (writeDemand.compareAndSet(0,1)) {
+                if (writeDemand.increaseIfFulfilled()) {
                     debug.log(Level.DEBUG, "write: requesting more...");
                     subscription.request(1);
                 } else {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Demand.java	Sat Mar 03 20:21:35 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Demand.java	Tue Mar 06 12:09:12 2018 +0000
@@ -54,6 +54,14 @@
     }
 
     /**
+     * Increase this demand by 1 but only if it is fulfilled.
+     * @return true if the demand was increased, false otherwise.
+     */
+    public boolean increaseIfFulfilled() {
+        return val.compareAndSet(0, 1);
+    }
+
+    /**
      * Tries to decrease this demand by the specified positive value.
      *
      * <p> The actual value this demand has been decreased by might be less than