http-client-branch: fixed race condition when resubscribing with SocketTube http-client-branch
authordfuchs
Mon, 26 Mar 2018 19:54:18 +0100
branchhttp-client-branch
changeset 56355 bf89fba643d9
parent 56347 bac3a660249a
child 56365 7b2e4c363335
http-client-branch: fixed race condition when resubscribing with SocketTube
src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Sun Mar 25 09:02:36 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Mon Mar 26 19:54:18 2018 +0100
@@ -32,7 +32,6 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.Flow;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.nio.channels.SelectableChannel;
@@ -78,13 +77,6 @@
         this.writeSubscriber = new InternalWriteSubscriber();
     }
 
-//    private static Flow.Subscription nopSubscription() {
-//        return new Flow.Subscription() {
-//            @Override public void request(long n) { }
-//            @Override public void cancel() { }
-//        };
-//    }
-
     /**
      * Returns {@code true} if this flow is finished.
      * This happens when this flow internal read subscription is completed,
@@ -276,7 +268,7 @@
     private final class InternalWriteSubscriber
             implements Flow.Subscriber<List<ByteBuffer>> {
 
-        volatile Flow.Subscription subscription;
+        volatile WriteSubscription subscription;
         volatile List<ByteBuffer> current;
         volatile boolean completed;
         final AsyncTriggerEvent startSubscription =
@@ -286,14 +278,13 @@
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
-            Flow.Subscription previous = this.subscription;
-            this.subscription = subscription;
+            WriteSubscription previous = this.subscription;
+            this.subscription = new WriteSubscription(subscription);
             debug.log(Level.DEBUG, "subscribed for writing");
             try {
                 if (current == null) {
-                    if (previous != subscription && previous != null) {
-                        debug.log(Level.DEBUG, "write: resetting demand to 0");
-                        writeDemand.reset();
+                    if (previous != null && previous.upstreamSubscription != subscription) {
+                        previous.dropSubscription();
                     }
                     debug.log(Level.DEBUG, "write: registering startSubscription event");
                     client.registerEvent(startSubscription);
@@ -388,23 +379,8 @@
         }
 
         void requestMore() {
-            try {
-                if (completed) return;
-                long d =  writeDemand.get();
-                if (writeDemand.increaseIfFulfilled()) {
-                    debug.log(Level.DEBUG, "write: requesting more...");
-                    subscription.request(1);
-                } else {
-                    debug.log(Level.DEBUG, "write: no need to request more: %d", d);
-                }
-            } catch (Throwable t) {
-                debug.log(Level.DEBUG, () ->
-                        "write: error while requesting more: " + t);
-                signalError(t);
-                subscription.cancel();
-            } finally {
-                debugState("leaving requestMore: ");
-            }
+           WriteSubscription subscription = this.subscription;
+           subscription.requestMore();
         }
 
         @Override
@@ -469,6 +445,60 @@
 
         }
 
+        final class WriteSubscription implements Flow.Subscription {
+            final Flow.Subscription upstreamSubscription;
+            volatile boolean cancelled;
+            WriteSubscription(Flow.Subscription subscription) {
+                this.upstreamSubscription = subscription;
+            }
+
+            @Override
+            public void request(long n) {
+                upstreamSubscription.request(n);
+            }
+
+            @Override
+            public void cancel() {
+                dropSubscription();
+                upstreamSubscription.cancel();
+            }
+
+            synchronized void dropSubscription() {
+                cancelled = true;
+                debug.log(Level.DEBUG, "write: resetting demand to 0");
+                writeDemand.reset();
+            }
+
+            void requestMore() {
+                try {
+                    if (completed || cancelled) return;
+                    boolean requestMore;
+                    long d;
+                    // don't fiddle with demand after cancel.
+                    // see dropSubscription.
+                    synchronized (this) {
+                        if (cancelled) return;
+                        d = writeDemand.get();
+                        requestMore = writeDemand.increaseIfFulfilled();
+                    }
+                    if (requestMore) {
+                        debug.log(Level.DEBUG, "write: requesting more...");
+                        upstreamSubscription.request(1);
+                    } else {
+                        debug.log(Level.DEBUG, "write: no need to request more: %d", d);
+                    }
+                } catch (Throwable t) {
+                    debug.log(Level.DEBUG, () ->
+                            "write: error while requesting more: " + t);
+                    cancelled = true;
+                    signalError(t);
+                    subscription.cancel();
+                } finally {
+                    debugState("leaving requestMore: ");
+                }
+            }
+        }
+
     }
 
     // ===================================================================== //
@@ -893,10 +923,14 @@
                     if (!buf.hasRemaining()) break;
                 }
             } catch (IOException x) {
-                if (buf.position() == pos) {
+                if (buf.position() == pos && (list == null || list.isEmpty())) {
+                    // if we have read no bytes, just throw...
                     throw x;
                 } else {
+                    // we have some bytes. return them and fail
+                    // next time.
                     errorRef.compareAndSet(null, x);
+                    read = 0; // make sure we will exit the outer loop
                 }
             }
 
@@ -935,10 +969,18 @@
         final long remaining = Utils.remaining(srcs);
         long written = 0;
         while (remaining > written) {
-            long w = channel.write(srcs);
-            if (w == -1 && written == 0) return -1;
-            if (w == 0) break;
-            written += w;
+            try {
+                long w = channel.write(srcs);
+                if (w == -1 && written == 0) return -1;
+                if (w == 0) break;
+                written += w;
+            } catch (IOException x) {
+                // if no bytes were written just throws...
+                if (written == 0) throw x;
+                // otherwise return how many bytes were
+                // written: we will fail next time.
+                break;
+            }
         }
         return written;
     }