src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
branchhttp-client-branch
changeset 56252 e4b05854c51f
parent 56165 8a6065d830b9
child 56299 903ff8ec239d
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Sat Mar 03 20:21:35 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Tue Mar 06 12:09:12 2018 +0000
@@ -200,7 +200,7 @@
             Demand rdemand = sub == null ? null : sub.demand;
             InternalWriteSubscriber.WriteEvent writeEvent =
                     writeSubscriber.writeEvent;
-            AtomicLong wdemand = writeSubscriber.writeDemand;
+            Demand wdemand = writeSubscriber.writeDemand;
             int rops = readEvent == null ? 0 : readEvent.interestOps();
             long rd = rdemand == null ? 0 : rdemand.get();
             int wops = writeEvent == null ? 0 : writeEvent.interestOps();
@@ -278,23 +278,27 @@
         volatile Flow.Subscription subscription;
         volatile List<ByteBuffer> current;
         volatile boolean completed;
+        final AsyncTriggerEvent startSubscription =
+                new AsyncTriggerEvent(this::signalError, this::startSubscription);
         final WriteEvent writeEvent = new WriteEvent(channel, this);
-        final AtomicLong writeDemand = new AtomicLong();
+        final Demand writeDemand = new Demand();
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
             Flow.Subscription previous = this.subscription;
             this.subscription = subscription;
             debug.log(Level.DEBUG, "subscribed for writing");
-            if (current == null) {
-                if (previous == subscription || previous == null) {
-                    if (writeDemand.compareAndSet(0, 1)) {
-                        subscription.request(1);
+            try {
+                if (current == null) {
+                    if (previous != subscription && previous != null) {
+                        debug.log(Level.DEBUG, "write: resetting demand to 0");
+                        writeDemand.reset();
                     }
-                } else {
-                    writeDemand.set(1);
-                    subscription.request(1);
+                    debug.log(Level.DEBUG, "write: registering startSubscription event");
+                    client.registerEvent(startSubscription);
                 }
+            } catch (Throwable t) {
+                signalError(t);
             }
         }
 
@@ -344,14 +348,15 @@
                 assert written <= remaining;
                 if (remaining - written == 0) {
                     current = null;
-                    writeDemand.decrementAndGet();
-                    Runnable requestMore = this::requestMore;
-                    if (inSelectorThread) {
-                        assert client.isSelectorThread();
-                        client.theExecutor().execute(requestMore);
-                    } else {
-                        assert !client.isSelectorThread();
-                        requestMore.run();
+                    if (writeDemand.tryDecrement()) {
+                        Runnable requestMore = this::requestMore;
+                        if (inSelectorThread) {
+                            assert client.isSelectorThread();
+                            client.theExecutor().execute(requestMore);
+                        } else {
+                            assert !client.isSelectorThread();
+                            requestMore.run();
+                        }
                     }
                 } else {
                     resumeWriteEvent(inSelectorThread);
@@ -362,11 +367,28 @@
             }
         }
 
+        // Kick off the initial request:1 that will start
+        // the writing side. Called from the selector manager
+        // thread.
+        void startSubscription() {
+            try {
+                debug.log(Level.DEBUG, "write: starting subscription");
+                assert client.isSelectorThread();
+                // make sure read registrations are handled before;
+                readPublisher.subscriptionImpl.handlePending();
+                debug.log(Level.DEBUG, "write: offloading requestMore");
+                // start writing;
+                client.theExecutor().execute(this::requestMore);
+            } catch(Throwable t) {
+                signalError(t);
+            }
+        }
+
         void requestMore() {
             try {
                 if (completed) return;
                 long d =  writeDemand.get();
-                if (writeDemand.compareAndSet(0,1)) {
+                if (writeDemand.increaseIfFulfilled()) {
                     debug.log(Level.DEBUG, "write: requesting more...");
                     subscription.request(1);
                 } else {