http-client-branch: Publisher/Subscriber/Subscription conformance fixes http-client-branch
authorchegar
Wed, 08 Nov 2017 09:34:56 +0000
branchhttp-client-branch
changeset 55777 e62cbcc08cae
parent 55776 9950bc2ee874
child 55780 d4b5b95da972
http-client-branch: Publisher/Subscriber/Subscription conformance fixes
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PullPublisher.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PullPublisher.java	Tue Nov 07 19:46:59 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PullPublisher.java	Wed Nov 08 09:34:56 2017 +0000
@@ -36,15 +36,37 @@
  */
 class PullPublisher<T> implements Flow.Publisher<T> {
 
+    // Only one of `iterable` and `throwable` can be non-null. throwable is
+    // non-null when an error has been encountered, by the creator of
+    // PullPublisher, while subscribing the subscriber, but before subscribe has
+    // completed.
     private final Iterable<T> iterable;
+    private final Throwable throwable;
+
+    PullPublisher(Iterable<T> iterable, Throwable throwable) {
+        this.iterable = iterable;
+        this.throwable = throwable;
+    }
 
     PullPublisher(Iterable<T> iterable) {
-        this.iterable = iterable;
+        this(iterable, null);
     }
 
     @Override
     public void subscribe(Flow.Subscriber<? super T> subscriber) {
-        subscriber.onSubscribe(new Subscription(subscriber, iterable.iterator()));
+        Subscription sub;
+        if (throwable != null) {
+            assert iterable == null : "non-null iterable: " + iterable;
+            sub = new Subscription(subscriber, null, throwable);
+        } else {
+            assert throwable == null : "non-null exception: " + throwable;
+            sub = new Subscription(subscriber, iterable.iterator(), null);
+        }
+        subscriber.onSubscribe(sub);
+
+        if (throwable != null) {
+            sub.pullScheduler.runOrSchedule();
+        }
     }
 
     private class Subscription implements Flow.Subscription {
@@ -57,39 +79,51 @@
         final SequentialScheduler pullScheduler = new SequentialScheduler(new PullTask());
         private final Demand demand = new Demand();
 
-        Subscription(Flow.Subscriber<? super T> subscriber, Iterator<T> iter) {
+        Subscription(Flow.Subscriber<? super T> subscriber,
+                     Iterator<T> iter,
+                     Throwable throwable) {
             this.subscriber = subscriber;
             this.iter = iter;
+            this.error = throwable;
         }
 
         final class PullTask extends SequentialScheduler.CompleteRestartableTask {
             @Override
             protected void run() {
-                if (completed) {
-                    pullScheduler.stop();
+                if (completed || cancelled) {
                     return;
                 }
+
                 Throwable t = error;
                 if (t != null) {
                     completed = true;
                     pullScheduler.stop();
                     subscriber.onError(t);
+                    return;
                 }
-                if (demand.tryDecrement()) {
-                    boolean done = completed = !iter.hasNext();
-                    if (done)
-                        subscriber.onComplete();
-                    else
+
+                while (demand.tryDecrement() && !cancelled) {
+                    if (!iter.hasNext()) {
+                        break;
+                    } else {
                         subscriber.onNext(iter.next());
+                    }
+                }
+                if (!iter.hasNext() && !cancelled) {
+                    completed = true;
+                    pullScheduler.stop();
+                    subscriber.onComplete();
                 }
             }
         }
 
         @Override
         public void request(long n) {
-            if (cancelled) {
-                error = new IllegalArgumentException("request("
-                                                      + n + "): cancelled");
+            if (cancelled)
+                return;  // no-op
+
+            if (n <= 0) {
+                error = new IllegalArgumentException("illegal non-positive request:" + n);
             } else {
                 demand.increase(n);
             }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java	Tue Nov 07 19:46:59 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java	Wed Nov 08 09:34:56 2017 +0000
@@ -329,13 +329,14 @@
 
         @Override
         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
-
+            PullPublisher<ByteBuffer> publisher;
             InputStream is = streamSupplier.get();
             if (is == null) {
-                throw new UncheckedIOException(new IOException("no inputstream supplied"));
+                Throwable t = new IOException("streamSupplier returned null");
+                publisher = new PullPublisher<>(null, t);
+            } else  {
+                publisher = new PullPublisher<>(iterableOf(is), null);
             }
-            PullPublisher<ByteBuffer> publisher =
-                    new PullPublisher<>(iterableOf(is));
             publisher.subscribe(subscriber);
         }
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Tue Nov 07 19:46:59 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Wed Nov 08 09:34:56 2017 +0000
@@ -35,6 +35,7 @@
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Flow;
@@ -602,7 +603,12 @@
         private final long contentLength;
         private volatile long remainingContentLength;
         private volatile Subscription subscription;
-        private volatile ByteBuffer current;
+
+        // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
+        //  1) The data that was published by the request body Publisher, and
+        //  2) the COMPLETED sentinel, since onComplete can be invoked without demand.
+        final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
+
         private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
         // A scheduler used to honor window updates. Writing must be paused
         // when the window is exhausted, and resumed when the window acquires
@@ -630,8 +636,13 @@
 
         @Override
         public void onNext(ByteBuffer item) {
-            debug.log(Level.DEBUG,
-                    "RequestSubscriber: onNext(%d)", item.remaining());
+            debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining());
+            int size = outgoing.size();
+            assert size == 0 : "non-zero size: " + size;
+            onNextImpl(item);
+        }
+
+        private void onNextImpl(ByteBuffer item) {
             // Got some more request body bytes to send.
             if (requestBodyCF.isDone()) {
                 // stream already cancelled, probably in timeout
@@ -639,16 +650,13 @@
                 subscription.cancel();
                 return;
             }
-            ByteBuffer prev = current;
-            assert prev == null;
-            current = item;
+            outgoing.add(item);
             sendScheduler.runOrSchedule();
         }
 
         @Override
         public void onError(Throwable throwable) {
-            debug.log(Level.DEBUG,
-                      () -> "RequestSubscriber: onError: " + throwable);
+            debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable);
             // ensure that errors are handled within the flow.
             if (errorRef.compareAndSet(null, throwable)) {
                 sendScheduler.runOrSchedule();
@@ -658,9 +666,11 @@
         @Override
         public void onComplete() {
             debug.log(Level.DEBUG, "RequestSubscriber: onComplete");
+            int size = outgoing.size();
+            assert size == 0 || size == 1 : "non-zero or one size: " + size;
             // last byte of request body has been obtained.
             // ensure that everything is completed within the flow.
-            onNext(COMPLETED);
+            onNextImpl(COMPLETED);
         }
 
         // Attempts to send the data, if any.
@@ -679,47 +689,51 @@
                     return;
                 }
 
-                // handle COMPLETED;
-                ByteBuffer item = current;
-                if (item == null) return;
-                else if (item == COMPLETED) {
-                    sendScheduler.stop();
-                    complete();
-                    return;
-                }
-
-                // handle bytes to send downstream
-                while (item.hasRemaining()) {
-                    debug.log(Level.DEBUG, "trySend: %d", item.remaining());
-                    assert !endStreamSent : "internal error, send data after END_STREAM flag";
-                    DataFrame df = getDataFrame(item);
-                    if (df == null) {
-                        debug.log(Level.DEBUG, "trySend: can't send yet: %d",
-                                  item.remaining());
-                        return; // the send window is exhausted: come back later
+                do {
+                    // handle COMPLETED;
+                    ByteBuffer item = outgoing.peekFirst();
+                    if (item == null) return;
+                    else if (item == COMPLETED) {
+                        sendScheduler.stop();
+                        complete();
+                        return;
                     }
 
-                    if (contentLength > 0) {
-                        remainingContentLength -= df.getDataLength();
-                        if (remainingContentLength < 0) {
-                            String msg = connection().getConnectionFlow()
-                                         + " stream=" + streamid + " "
-                                         + "[" + Thread.currentThread().getName() +"] "
-                                         + "Too many bytes in request body. Expected: "
-                                         + contentLength + ", got: "
-                                         + (contentLength - remainingContentLength);
-                            connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
-                            throw new IOException(msg);
-                        } else if (remainingContentLength == 0) {
-                            df.setFlag(DataFrame.END_STREAM);
-                            endStreamSent = true;
+                    // handle bytes to send downstream
+                    while (item.hasRemaining()) {
+                        debug.log(Level.DEBUG, "trySend: %d", item.remaining());
+                        assert !endStreamSent : "internal error, send data after END_STREAM flag";
+                        DataFrame df = getDataFrame(item);
+                        if (df == null) {
+                            debug.log(Level.DEBUG, "trySend: can't send yet: %d",
+                                    item.remaining());
+                            return; // the send window is exhausted: come back later
                         }
+
+                        if (contentLength > 0) {
+                            remainingContentLength -= df.getDataLength();
+                            if (remainingContentLength < 0) {
+                                String msg = connection().getConnectionFlow()
+                                        + " stream=" + streamid + " "
+                                        + "[" + Thread.currentThread().getName() + "] "
+                                        + "Too many bytes in request body. Expected: "
+                                        + contentLength + ", got: "
+                                        + (contentLength - remainingContentLength);
+                                connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
+                                throw new IOException(msg);
+                            } else if (remainingContentLength == 0) {
+                                df.setFlag(DataFrame.END_STREAM);
+                                endStreamSent = true;
+                            }
+                        }
+                        debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength());
+                        connection.sendDataFrame(df);
                     }
-                    debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength());
-                    connection.sendDataFrame(df);
-                }
-                assert !item.hasRemaining();
-                current = null;
+                    assert !item.hasRemaining();
+                    ByteBuffer b = outgoing.removeFirst();
+                    assert b == item;
+                } while (outgoing.peekFirst() != null);
+
                 debug.log(Level.DEBUG, "trySend: request 1");
                 subscription.request(1);
             } catch (Throwable ex) {