# HG changeset patch # User chegar # Date 1510133696 0 # Node ID e62cbcc08caef80385fa540e91e3151a64b41d45 # Parent 9950bc2ee8748496ff11ec9dab40cbe022a5fba3 http-client-branch: Publisher/Subscriber/Subscription conformance fixes diff -r 9950bc2ee874 -r e62cbcc08cae src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PullPublisher.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PullPublisher.java Tue Nov 07 19:46:59 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PullPublisher.java Wed Nov 08 09:34:56 2017 +0000 @@ -36,15 +36,37 @@ */ class PullPublisher implements Flow.Publisher { + // Only one of `iterable` and `throwable` can be non-null. throwable is + // non-null when an error has been encountered, by the creator of + // PullPublisher, while subscribing the subscriber, but before subscribe has + // completed. private final Iterable iterable; + private final Throwable throwable; + + PullPublisher(Iterable iterable, Throwable throwable) { + this.iterable = iterable; + this.throwable = throwable; + } PullPublisher(Iterable iterable) { - this.iterable = iterable; + this(iterable, null); } @Override public void subscribe(Flow.Subscriber subscriber) { - subscriber.onSubscribe(new Subscription(subscriber, iterable.iterator())); + Subscription sub; + if (throwable != null) { + assert iterable == null : "non-null iterable: " + iterable; + sub = new Subscription(subscriber, null, throwable); + } else { + assert throwable == null : "non-null exception: " + throwable; + sub = new Subscription(subscriber, iterable.iterator(), null); + } + subscriber.onSubscribe(sub); + + if (throwable != null) { + sub.pullScheduler.runOrSchedule(); + } } private class Subscription implements Flow.Subscription { @@ -57,39 +79,51 @@ final SequentialScheduler pullScheduler = new SequentialScheduler(new PullTask()); private final Demand demand = new Demand(); - Subscription(Flow.Subscriber subscriber, Iterator iter) { + Subscription(Flow.Subscriber subscriber, + Iterator iter, + Throwable throwable) { this.subscriber = subscriber; this.iter = iter; + this.error = throwable; } final class PullTask extends SequentialScheduler.CompleteRestartableTask { @Override protected void run() { - if (completed) { - pullScheduler.stop(); + if (completed || cancelled) { return; } + Throwable t = error; if (t != null) { completed = true; pullScheduler.stop(); subscriber.onError(t); + return; } - if (demand.tryDecrement()) { - boolean done = completed = !iter.hasNext(); - if (done) - subscriber.onComplete(); - else + + while (demand.tryDecrement() && !cancelled) { + if (!iter.hasNext()) { + break; + } else { subscriber.onNext(iter.next()); + } + } + if (!iter.hasNext() && !cancelled) { + completed = true; + pullScheduler.stop(); + subscriber.onComplete(); } } } @Override public void request(long n) { - if (cancelled) { - error = new IllegalArgumentException("request(" - + n + "): cancelled"); + if (cancelled) + return; // no-op + + if (n <= 0) { + error = new IllegalArgumentException("illegal non-positive request:" + n); } else { demand.increase(n); } diff -r 9950bc2ee874 -r e62cbcc08cae src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java Tue Nov 07 19:46:59 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java Wed Nov 08 09:34:56 2017 +0000 @@ -329,13 +329,14 @@ @Override public void subscribe(Flow.Subscriber subscriber) { - + PullPublisher publisher; InputStream is = streamSupplier.get(); if (is == null) { - throw new UncheckedIOException(new IOException("no inputstream supplied")); + Throwable t = new IOException("streamSupplier returned null"); + publisher = new PullPublisher<>(null, t); + } else { + publisher = new PullPublisher<>(iterableOf(is), null); } - PullPublisher publisher = - new PullPublisher<>(iterableOf(is)); publisher.subscribe(subscriber); } diff -r 9950bc2ee874 -r e62cbcc08cae src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Tue Nov 07 19:46:59 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Wed Nov 08 09:34:56 2017 +0000 @@ -35,6 +35,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.Flow; @@ -602,7 +603,12 @@ private final long contentLength; private volatile long remainingContentLength; private volatile Subscription subscription; - private volatile ByteBuffer current; + + // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers. + // 1) The data that was published by the request body Publisher, and + // 2) the COMPLETED sentinel, since onComplete can be invoked without demand. + final ConcurrentLinkedDeque outgoing = new ConcurrentLinkedDeque<>(); + private final AtomicReference errorRef = new AtomicReference<>(); // A scheduler used to honor window updates. Writing must be paused // when the window is exhausted, and resumed when the window acquires @@ -630,8 +636,13 @@ @Override public void onNext(ByteBuffer item) { - debug.log(Level.DEBUG, - "RequestSubscriber: onNext(%d)", item.remaining()); + debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining()); + int size = outgoing.size(); + assert size == 0 : "non-zero size: " + size; + onNextImpl(item); + } + + private void onNextImpl(ByteBuffer item) { // Got some more request body bytes to send. if (requestBodyCF.isDone()) { // stream already cancelled, probably in timeout @@ -639,16 +650,13 @@ subscription.cancel(); return; } - ByteBuffer prev = current; - assert prev == null; - current = item; + outgoing.add(item); sendScheduler.runOrSchedule(); } @Override public void onError(Throwable throwable) { - debug.log(Level.DEBUG, - () -> "RequestSubscriber: onError: " + throwable); + debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable); // ensure that errors are handled within the flow. if (errorRef.compareAndSet(null, throwable)) { sendScheduler.runOrSchedule(); @@ -658,9 +666,11 @@ @Override public void onComplete() { debug.log(Level.DEBUG, "RequestSubscriber: onComplete"); + int size = outgoing.size(); + assert size == 0 || size == 1 : "non-zero or one size: " + size; // last byte of request body has been obtained. // ensure that everything is completed within the flow. - onNext(COMPLETED); + onNextImpl(COMPLETED); } // Attempts to send the data, if any. @@ -679,47 +689,51 @@ return; } - // handle COMPLETED; - ByteBuffer item = current; - if (item == null) return; - else if (item == COMPLETED) { - sendScheduler.stop(); - complete(); - return; - } - - // handle bytes to send downstream - while (item.hasRemaining()) { - debug.log(Level.DEBUG, "trySend: %d", item.remaining()); - assert !endStreamSent : "internal error, send data after END_STREAM flag"; - DataFrame df = getDataFrame(item); - if (df == null) { - debug.log(Level.DEBUG, "trySend: can't send yet: %d", - item.remaining()); - return; // the send window is exhausted: come back later + do { + // handle COMPLETED; + ByteBuffer item = outgoing.peekFirst(); + if (item == null) return; + else if (item == COMPLETED) { + sendScheduler.stop(); + complete(); + return; } - if (contentLength > 0) { - remainingContentLength -= df.getDataLength(); - if (remainingContentLength < 0) { - String msg = connection().getConnectionFlow() - + " stream=" + streamid + " " - + "[" + Thread.currentThread().getName() +"] " - + "Too many bytes in request body. Expected: " - + contentLength + ", got: " - + (contentLength - remainingContentLength); - connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); - throw new IOException(msg); - } else if (remainingContentLength == 0) { - df.setFlag(DataFrame.END_STREAM); - endStreamSent = true; + // handle bytes to send downstream + while (item.hasRemaining()) { + debug.log(Level.DEBUG, "trySend: %d", item.remaining()); + assert !endStreamSent : "internal error, send data after END_STREAM flag"; + DataFrame df = getDataFrame(item); + if (df == null) { + debug.log(Level.DEBUG, "trySend: can't send yet: %d", + item.remaining()); + return; // the send window is exhausted: come back later } + + if (contentLength > 0) { + remainingContentLength -= df.getDataLength(); + if (remainingContentLength < 0) { + String msg = connection().getConnectionFlow() + + " stream=" + streamid + " " + + "[" + Thread.currentThread().getName() + "] " + + "Too many bytes in request body. Expected: " + + contentLength + ", got: " + + (contentLength - remainingContentLength); + connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); + throw new IOException(msg); + } else if (remainingContentLength == 0) { + df.setFlag(DataFrame.END_STREAM); + endStreamSent = true; + } + } + debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength()); + connection.sendDataFrame(df); } - debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength()); - connection.sendDataFrame(df); - } - assert !item.hasRemaining(); - current = null; + assert !item.hasRemaining(); + ByteBuffer b = outgoing.removeFirst(); + assert b == item; + } while (outgoing.peekFirst() != null); + debug.log(Level.DEBUG, "trySend: request 1"); subscription.request(1); } catch (Throwable ex) {