--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PullPublisher.java Tue Nov 07 19:46:59 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PullPublisher.java Wed Nov 08 09:34:56 2017 +0000
@@ -36,15 +36,37 @@
*/
class PullPublisher<T> implements Flow.Publisher<T> {
+ // Only one of `iterable` and `throwable` can be non-null. throwable is
+ // non-null when an error has been encountered, by the creator of
+ // PullPublisher, while subscribing the subscriber, but before subscribe has
+ // completed.
private final Iterable<T> iterable;
+ private final Throwable throwable;
+
+ PullPublisher(Iterable<T> iterable, Throwable throwable) {
+ this.iterable = iterable;
+ this.throwable = throwable;
+ }
PullPublisher(Iterable<T> iterable) {
- this.iterable = iterable;
+ this(iterable, null);
}
@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
- subscriber.onSubscribe(new Subscription(subscriber, iterable.iterator()));
+ Subscription sub;
+ if (throwable != null) {
+ assert iterable == null : "non-null iterable: " + iterable;
+ sub = new Subscription(subscriber, null, throwable);
+ } else {
+ assert throwable == null : "non-null exception: " + throwable;
+ sub = new Subscription(subscriber, iterable.iterator(), null);
+ }
+ subscriber.onSubscribe(sub);
+
+ if (throwable != null) {
+ sub.pullScheduler.runOrSchedule();
+ }
}
private class Subscription implements Flow.Subscription {
@@ -57,39 +79,51 @@
final SequentialScheduler pullScheduler = new SequentialScheduler(new PullTask());
private final Demand demand = new Demand();
- Subscription(Flow.Subscriber<? super T> subscriber, Iterator<T> iter) {
+ Subscription(Flow.Subscriber<? super T> subscriber,
+ Iterator<T> iter,
+ Throwable throwable) {
this.subscriber = subscriber;
this.iter = iter;
+ this.error = throwable;
}
final class PullTask extends SequentialScheduler.CompleteRestartableTask {
@Override
protected void run() {
- if (completed) {
- pullScheduler.stop();
+ if (completed || cancelled) {
return;
}
+
Throwable t = error;
if (t != null) {
completed = true;
pullScheduler.stop();
subscriber.onError(t);
+ return;
}
- if (demand.tryDecrement()) {
- boolean done = completed = !iter.hasNext();
- if (done)
- subscriber.onComplete();
- else
+
+ while (demand.tryDecrement() && !cancelled) {
+ if (!iter.hasNext()) {
+ break;
+ } else {
subscriber.onNext(iter.next());
+ }
+ }
+ if (!iter.hasNext() && !cancelled) {
+ completed = true;
+ pullScheduler.stop();
+ subscriber.onComplete();
}
}
}
@Override
public void request(long n) {
- if (cancelled) {
- error = new IllegalArgumentException("request("
- + n + "): cancelled");
+ if (cancelled)
+ return; // no-op
+
+ if (n <= 0) {
+ error = new IllegalArgumentException("illegal non-positive request:" + n);
} else {
demand.increase(n);
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java Tue Nov 07 19:46:59 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java Wed Nov 08 09:34:56 2017 +0000
@@ -329,13 +329,14 @@
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
-
+ PullPublisher<ByteBuffer> publisher;
InputStream is = streamSupplier.get();
if (is == null) {
- throw new UncheckedIOException(new IOException("no inputstream supplied"));
+ Throwable t = new IOException("streamSupplier returned null");
+ publisher = new PullPublisher<>(null, t);
+ } else {
+ publisher = new PullPublisher<>(iterableOf(is), null);
}
- PullPublisher<ByteBuffer> publisher =
- new PullPublisher<>(iterableOf(is));
publisher.subscribe(subscriber);
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Tue Nov 07 19:46:59 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Wed Nov 08 09:34:56 2017 +0000
@@ -35,6 +35,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
@@ -602,7 +603,12 @@
private final long contentLength;
private volatile long remainingContentLength;
private volatile Subscription subscription;
- private volatile ByteBuffer current;
+
+ // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
+ // 1) The data that was published by the request body Publisher, and
+ // 2) the COMPLETED sentinel, since onComplete can be invoked without demand.
+ final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
+
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
// A scheduler used to honor window updates. Writing must be paused
// when the window is exhausted, and resumed when the window acquires
@@ -630,8 +636,13 @@
@Override
public void onNext(ByteBuffer item) {
- debug.log(Level.DEBUG,
- "RequestSubscriber: onNext(%d)", item.remaining());
+ debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining());
+ int size = outgoing.size();
+ assert size == 0 : "non-zero size: " + size;
+ onNextImpl(item);
+ }
+
+ private void onNextImpl(ByteBuffer item) {
// Got some more request body bytes to send.
if (requestBodyCF.isDone()) {
// stream already cancelled, probably in timeout
@@ -639,16 +650,13 @@
subscription.cancel();
return;
}
- ByteBuffer prev = current;
- assert prev == null;
- current = item;
+ outgoing.add(item);
sendScheduler.runOrSchedule();
}
@Override
public void onError(Throwable throwable) {
- debug.log(Level.DEBUG,
- () -> "RequestSubscriber: onError: " + throwable);
+ debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable);
// ensure that errors are handled within the flow.
if (errorRef.compareAndSet(null, throwable)) {
sendScheduler.runOrSchedule();
@@ -658,9 +666,11 @@
@Override
public void onComplete() {
debug.log(Level.DEBUG, "RequestSubscriber: onComplete");
+ int size = outgoing.size();
+ assert size == 0 || size == 1 : "non-zero or one size: " + size;
// last byte of request body has been obtained.
// ensure that everything is completed within the flow.
- onNext(COMPLETED);
+ onNextImpl(COMPLETED);
}
// Attempts to send the data, if any.
@@ -679,47 +689,51 @@
return;
}
- // handle COMPLETED;
- ByteBuffer item = current;
- if (item == null) return;
- else if (item == COMPLETED) {
- sendScheduler.stop();
- complete();
- return;
- }
-
- // handle bytes to send downstream
- while (item.hasRemaining()) {
- debug.log(Level.DEBUG, "trySend: %d", item.remaining());
- assert !endStreamSent : "internal error, send data after END_STREAM flag";
- DataFrame df = getDataFrame(item);
- if (df == null) {
- debug.log(Level.DEBUG, "trySend: can't send yet: %d",
- item.remaining());
- return; // the send window is exhausted: come back later
+ do {
+ // handle COMPLETED;
+ ByteBuffer item = outgoing.peekFirst();
+ if (item == null) return;
+ else if (item == COMPLETED) {
+ sendScheduler.stop();
+ complete();
+ return;
}
- if (contentLength > 0) {
- remainingContentLength -= df.getDataLength();
- if (remainingContentLength < 0) {
- String msg = connection().getConnectionFlow()
- + " stream=" + streamid + " "
- + "[" + Thread.currentThread().getName() +"] "
- + "Too many bytes in request body. Expected: "
- + contentLength + ", got: "
- + (contentLength - remainingContentLength);
- connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
- throw new IOException(msg);
- } else if (remainingContentLength == 0) {
- df.setFlag(DataFrame.END_STREAM);
- endStreamSent = true;
+ // handle bytes to send downstream
+ while (item.hasRemaining()) {
+ debug.log(Level.DEBUG, "trySend: %d", item.remaining());
+ assert !endStreamSent : "internal error, send data after END_STREAM flag";
+ DataFrame df = getDataFrame(item);
+ if (df == null) {
+ debug.log(Level.DEBUG, "trySend: can't send yet: %d",
+ item.remaining());
+ return; // the send window is exhausted: come back later
}
+
+ if (contentLength > 0) {
+ remainingContentLength -= df.getDataLength();
+ if (remainingContentLength < 0) {
+ String msg = connection().getConnectionFlow()
+ + " stream=" + streamid + " "
+ + "[" + Thread.currentThread().getName() + "] "
+ + "Too many bytes in request body. Expected: "
+ + contentLength + ", got: "
+ + (contentLength - remainingContentLength);
+ connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
+ throw new IOException(msg);
+ } else if (remainingContentLength == 0) {
+ df.setFlag(DataFrame.END_STREAM);
+ endStreamSent = true;
+ }
+ }
+ debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength());
+ connection.sendDataFrame(df);
}
- debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength());
- connection.sendDataFrame(df);
- }
- assert !item.hasRemaining();
- current = null;
+ assert !item.hasRemaining();
+ ByteBuffer b = outgoing.removeFirst();
+ assert b == item;
+ } while (outgoing.peekFirst() != null);
+
debug.log(Level.DEBUG, "trySend: request 1");
subscription.request(1);
} catch (Throwable ex) {