http-client-branch: review comment - invalid Subscription::request arguments should be relayed to Subscriber::onError (part I: HTTP/1.1)
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Fri Apr 06 16:05:55 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Fri Apr 06 17:04:25 2018 +0100
@@ -39,7 +39,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
+import java.util.function.Consumer;
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.FlowTube.TubeSubscriber;
import jdk.internal.net.http.common.SequentialScheduler;
@@ -124,21 +124,34 @@
extends AbstractSubscription
{
private final Runnable onCancel;
+ private final Consumer<Throwable> onError;
private final SequentialScheduler scheduler;
+ private volatile boolean cancelled;
Http1AsyncDelegateSubscription(SequentialScheduler scheduler,
- Runnable onCancel) {
+ Runnable onCancel,
+ Consumer<Throwable> onError) {
this.scheduler = scheduler;
this.onCancel = onCancel;
+ this.onError = onError;
}
@Override
public void request(long n) {
- final Demand demand = demand();
- if (demand.increase(n)) {
- scheduler.runOrSchedule();
+ if (cancelled) return;
+ try {
+ final Demand demand = demand();
+ if (demand.increase(n)) {
+ scheduler.runOrSchedule();
+ }
+ } catch (IllegalArgumentException x) {
+ cancelled = true;
+ onError.accept(x);
}
}
@Override
- public void cancel() { onCancel.run();}
+ public void cancel() {
+ cancelled = true;
+ onCancel.run();
+ }
}
private final ConcurrentLinkedDeque<ByteBuffer> queue
@@ -158,6 +171,7 @@
// Only used for checking whether we run on the selector manager thread.
private final HttpClientImpl client;
private boolean retry;
+ private volatile boolean stopRequested;
public Http1AsyncReceiver(Executor executor, Http1Exchange<?> owner) {
this.pendingDelegateRef = new AtomicReference<>();
@@ -184,7 +198,7 @@
handlePendingDelegate();
// Then start emptying the queue, if possible.
- while ((buf = queue.peek()) != null) {
+ while ((buf = queue.peek()) != null && !stopRequested) {
Http1AsyncDelegate delegate = this.delegate;
debug.log(Level.DEBUG, "Got %s bytes for delegate %s",
buf.remaining(), delegate);
@@ -219,7 +233,7 @@
// removed parsed buffer from queue, and continue with next
// if available
ByteBuffer parsed = queue.remove();
- canRequestMore.set(queue.isEmpty());
+ canRequestMore.set(queue.isEmpty() && !stopRequested);
assert parsed == buf;
}
@@ -252,7 +266,7 @@
Http1AsyncDelegate delegate = pendingDelegateRef.get();
if (delegate == null) delegate = this.delegate;
Throwable x = error;
- if (delegate != null && x != null && queue.isEmpty()) {
+ if (delegate != null && x != null && (stopRequested || queue.isEmpty())) {
// forward error only after emptying the queue.
final Object captured = delegate;
debug.log(Level.DEBUG, () -> "flushing " + x
@@ -260,6 +274,16 @@
+ "\t\t queue.isEmpty: " + queue.isEmpty());
scheduler.stop();
delegate.onReadError(x);
+ if (stopRequested) {
+ // This is the special case where the subscriber
+ // has requested an illegal number of items.
+ // In this case, the error doesn't come from
+ // upstream, but from downstream, and we need to
+ // close the upstream connection.
+ Http1Exchange<?> exchg = owner;
+ stop();
+ if (exchg != null) exchg.connection().close();
+ }
}
}
@@ -308,6 +332,11 @@
if (pending != null && pendingDelegateRef.compareAndSet(pending, null)) {
Http1AsyncDelegate delegate = this.delegate;
if (delegate != null) unsubscribe(delegate);
+ Consumer<Throwable> onIllegalArg = (x) -> {
+ setRetryOnError(false);
+ stopRequested = true;
+ onReadError(x);
+ };
Runnable cancel = () -> {
debug.log(Level.DEBUG, "Downstream subscription cancelled by %s", pending);
// The connection should be closed, as some data may
@@ -327,7 +356,7 @@
// the header/body parser work with a flow of ByteBuffer, whereas
// we have a flow List<ByteBuffer> upstream.
Http1AsyncDelegateSubscription subscription =
- new Http1AsyncDelegateSubscription(scheduler, cancel);
+ new Http1AsyncDelegateSubscription(scheduler, cancel, onIllegalArg);
pending.onSubscribe(subscription);
this.delegate = delegate = pending;
final Object captured = delegate;
@@ -430,7 +459,7 @@
+ "\n\t delegate: " + delegate
+ "\t\t queue.isEmpty: " + queue.isEmpty(), ex);
}
- if (queue.isEmpty() || pendingDelegateRef.get() != null) {
+ if (queue.isEmpty() || pendingDelegateRef.get() != null || stopRequested) {
// This callback is called from within the selector thread.
// Use an executor here to avoid doing the heavy lifting in the
// selector.
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Fri Apr 06 16:05:55 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Fri Apr 06 17:04:25 2018 +0100
@@ -27,10 +27,16 @@
import java.io.EOFException;
import java.lang.System.Logger.Level;
+import java.net.http.HttpClient;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.net.http.HttpHeaders;
@@ -39,6 +45,8 @@
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.Utils;
+import java.lang.ref.Reference;
+
import static java.net.http.HttpClient.Version.HTTP_1_1;
import static java.net.http.HttpResponse.BodySubscribers.discarding;
@@ -215,12 +223,113 @@
}
}
+ static final Flow.Subscription NOP = new Flow.Subscription() {
+ @Override
+ public void request(long n) { }
+ public void cancel() { }
+ };
+
+ /**
+ * The Http1AsyncReceiver ensures that all calls to
+ * the subscriber, including onSubscribe, occur sequentially.
+ * There could however be some race conditions that could happen
+ * in case of unexpected errors thrown at unexpected places, which
+ * may cause onError to be called multiple times.
+ * The Http1BodySubscriber will ensure that the user subscriber
+ * is actually completed only once - and only after it is
+ * subscribed.
+ * @param <U> The type of response.
+ */
+ final static class Http1BodySubscriber<U> implements HttpResponse.BodySubscriber<U> {
+ final HttpResponse.BodySubscriber<U> userSubscriber;
+ final AtomicBoolean completed = new AtomicBoolean();
+ volatile Throwable withError;
+ volatile boolean subscribed;
+ Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber) {
+ this.userSubscriber = userSubscriber;
+ }
+
+ // propagate the error to the user subscriber, even if not
+ // subscribed yet.
+ private void propagateError(Throwable t) {
+ assert t != null;
+ try {
+ // if unsubscribed at this point, it will not
+ // get subscribed later - so do it now and
+ // propagate the error
+ if (subscribed == false) {
+ subscribed = true;
+ userSubscriber.onSubscribe(NOP);
+ }
+ } finally {
+ // if onError throws then there is nothing to do
+ // here: let the caller deal with it by logging
+ // and closing the connection.
+ userSubscriber.onError(t);
+ }
+ }
+
+ // complete the subscriber, either normally or exceptionally
+ // ensure that the subscriber is completed only once.
+ private void complete(Throwable t) {
+ if (completed.compareAndSet(false, true)) {
+ t = withError = Utils.getCompletionCause(t);
+ if (t == null) {
+ assert subscribed;
+ try {
+ userSubscriber.onComplete();
+ } catch (Throwable x) {
+ propagateError(t = withError = Utils.getCompletionCause(x));
+ // rethrow and let the caller deal with it.
+ // (i.e: log and close the connection)
+ // arguably we could decide to not throw and let the
+ // connection be reused since we should have received and
+ // parsed all the bytes when we reach here.
+ throw x;
+ }
+ } else {
+ propagateError(t);
+ }
+ }
+ }
+
+ @Override
+ public CompletionStage<U> getBody() {
+ return userSubscriber.getBody();
+ }
+ @Override
+ public void onSubscribe(Flow.Subscription subscription) {
+ if (!subscribed) {
+ subscribed = true;
+ userSubscriber.onSubscribe(subscription);
+ } else {
+ // could be already subscribed and completed
+ // if an unexpected error occurred before the actual
+ // subscription - though that's not supposed
+ // happen.
+ assert completed.get();
+ }
+ }
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ assert !completed.get();
+ userSubscriber.onNext(item);
+ }
+ @Override
+ public void onError(Throwable throwable) {
+ complete(throwable);
+ }
+ @Override
+ public void onComplete() {
+ complete(null);
+ }
+ }
public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
boolean return2Cache,
Executor executor) {
this.return2Cache = return2Cache;
- final HttpResponse.BodySubscriber<U> pusher = p;
+ final Http1BodySubscriber<U> subscriber = new Http1BodySubscriber<>(p);
final CompletableFuture<U> cf = new MinimalFuture<>();
@@ -234,10 +343,13 @@
headersReader.reset();
ClientRefCountTracker refCountTracker = new ClientRefCountTracker();
+ // We need to keep hold on the client facade until the
+ // tracker has been incremented.
+ connection.client().reference();
executor.execute(() -> {
try {
content = new ResponseContent(
- connection, clen, headers, pusher,
+ connection, clen, headers, subscriber,
this::onFinished
);
if (cf.isCompletedExceptionally()) {
@@ -253,10 +365,9 @@
(t) -> {
try {
if (t != null) {
- pusher.onError(t);
+ subscriber.onError(t);
connection.close();
- if (!cf.isDone())
- cf.completeExceptionally(t);
+ cf.completeExceptionally(t);
}
} finally {
bodyReader.onComplete(t);
@@ -275,8 +386,8 @@
"Finished reading body: " + s);
assert s == State.READING_BODY;
}
- if (t != null && !cf.isDone()) {
- pusher.onError(t);
+ if (t != null) {
+ subscriber.onError(t);
cf.completeExceptionally(t);
}
} catch (Throwable x) {
@@ -292,13 +403,13 @@
} catch (Throwable t) {
debug.log(Level.DEBUG, () -> "Failed reading body: " + t);
try {
- if (!cf.isDone()) {
- pusher.onError(t);
- cf.completeExceptionally(t);
- }
+ subscriber.onError(t);
+ cf.completeExceptionally(t);
} finally {
asyncReceiver.onReadError(t);
}
+ } finally {
+ connection.client().unreference();
}
});
try {
@@ -367,7 +478,7 @@
+ (cf == null ? "null"
: (cf.isDone() ? "already completed"
: "not yet completed")));
- if (cf != null && !cf.isDone()) cf.completeExceptionally(t);
+ if (cf != null) cf.completeExceptionally(t);
else { debug.log(Level.DEBUG, "onReadError", t); }
debug.log(Level.DEBUG, () -> "closing connection: cause is " + t);
connection.close();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/InvalidInputStreamSubscriptionRequest.java Fri Apr 06 17:04:25 2018 +0100
@@ -0,0 +1,549 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @summary Tests an asynchronous BodySubscriber that completes
+ * immediately with an InputStream which issues bad
+ * requests
+ * @library /lib/testlibrary http2/server
+ * @build jdk.testlibrary.SimpleSSLContext
+ * @modules java.base/sun.net.www.http
+ * java.net.http/jdk.internal.net.http.common
+ * java.net.http/jdk.internal.net.http.frame
+ * java.net.http/jdk.internal.net.http.hpack
+ * @run testng/othervm InvalidInputStreamSubscriptionRequest
+ */
+
+import com.sun.net.httpserver.HttpServer;
+import com.sun.net.httpserver.HttpsConfigurator;
+import com.sun.net.httpserver.HttpsServer;
+import jdk.testlibrary.SimpleSSLContext;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodyHandler;
+import java.net.http.HttpResponse.BodyHandlers;
+import java.net.http.HttpResponse.BodySubscriber;
+import java.net.http.HttpResponse.BodySubscribers;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+import static java.lang.System.out;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+
+public class InvalidInputStreamSubscriptionRequest implements HttpServerAdapters {
+
+ SSLContext sslContext;
+ HttpTestServer httpTestServer; // HTTP/1.1 [ 4 servers ]
+ HttpTestServer httpsTestServer; // HTTPS/1.1
+ HttpTestServer http2TestServer; // HTTP/2 ( h2c )
+ HttpTestServer https2TestServer; // HTTP/2 ( h2 )
+ String httpURI_fixed;
+ String httpURI_chunk;
+ String httpsURI_fixed;
+ String httpsURI_chunk;
+ String http2URI_fixed;
+ String http2URI_chunk;
+ String https2URI_fixed;
+ String https2URI_chunk;
+
+ static final int ITERATION_COUNT = 3;
+ // a shared executor helps reduce the amount of threads created by the test
+ static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
+
+ static final long start = System.nanoTime();
+ public static String now() {
+ long now = System.nanoTime() - start;
+ long secs = now / 1000_000_000;
+ long mill = (now % 1000_000_000) / 1000_000;
+ long nan = now % 1000_000;
+ return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);
+ }
+ static volatile boolean tasksFailed;
+ static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
+
+ static class TestExecutor implements Executor {
+ final AtomicLong tasks = new AtomicLong();
+ Executor executor;
+ TestExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ long id = tasks.incrementAndGet();
+ executor.execute(() -> {
+ try {
+ command.run();
+ } catch (Throwable t) {
+ tasksFailed = true;
+ System.out.printf(now() + "Task %s failed: %s%n", id, t);
+ System.err.printf(now() + "Task %s failed: %s%n", id, t);
+ FAILURES.putIfAbsent("Task " + id, t);
+ throw t;
+ }
+ });
+ }
+ }
+
+ @AfterClass
+ static final void printFailedTests() {
+ out.println("\n=========================");
+ try {
+ out.println("Failed tasks: ");
+ FAILURES.entrySet().forEach((e) -> {
+ out.printf("\t%s: %s%n", e.getKey(), e.getValue());
+ e.getValue().printStackTrace(out);
+ });
+ if (tasksFailed) {
+ System.out.println("WARNING: Some tasks failed");
+ }
+ } finally {
+ out.println("\n=========================\n");
+ }
+ }
+
+ interface BHS extends Supplier<BodyHandler<InputStream>> {
+ static BHS of(BHS impl, String name) {
+ return new BHSImpl(impl, name);
+ }
+ }
+
+ static final class BHSImpl implements BHS {
+ final BHS supplier;
+ final String name;
+ BHSImpl(BHS impl, String name) {
+ this.supplier = impl;
+ this.name = name;
+ }
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ @Override
+ public BodyHandler<InputStream> get() {
+ return supplier.get();
+ }
+ }
+
+ static final Supplier<BodyHandler<InputStream>> OF_INPUTSTREAM =
+ BHS.of(BodyHandlers::ofInputStream, "BodyHandlers::ofInputStream");
+
+ @DataProvider(name = "variants")
+ public Object[][] variants() {
+ return new Object[][]{
+ { httpURI_fixed, false, OF_INPUTSTREAM },
+ { httpURI_chunk, false, OF_INPUTSTREAM },
+ { httpsURI_fixed, false, OF_INPUTSTREAM },
+ { httpsURI_chunk, false, OF_INPUTSTREAM },
+// { http2URI_fixed, false, OF_INPUTSTREAM },
+// { http2URI_chunk, false, OF_INPUTSTREAM },
+// { https2URI_fixed, false, OF_INPUTSTREAM },
+// { https2URI_chunk, false, OF_INPUTSTREAM },
+
+ { httpURI_fixed, true, OF_INPUTSTREAM },
+ { httpURI_chunk, true, OF_INPUTSTREAM },
+ { httpsURI_fixed, true, OF_INPUTSTREAM },
+ { httpsURI_chunk, true, OF_INPUTSTREAM },
+// { http2URI_fixed, true, OF_INPUTSTREAM },
+// { http2URI_chunk, true, OF_INPUTSTREAM },
+// { https2URI_fixed, true, OF_INPUTSTREAM },
+// { https2URI_chunk, true, OF_INPUTSTREAM },
+ };
+ }
+
+ HttpClient newHttpClient() {
+ return HttpClient.newBuilder()
+ .executor(executor)
+ .sslContext(sslContext)
+ .build();
+ }
+
+ @Test(dataProvider = "variants")
+ public void testNoBody(String uri, boolean sameClient, BHS handlers)
+ throws Exception
+ {
+ HttpClient client = null;
+ for (int i=0; i< ITERATION_COUNT; i++) {
+ if (!sameClient || client == null)
+ client = newHttpClient();
+
+ HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
+ .build();
+ BodyHandler<InputStream> handler = handlers.get();
+ BodyHandler<InputStream> badHandler = (c,h) ->
+ new BadBodySubscriber<>(handler.apply(c,h));
+ try {
+ HttpResponse<InputStream> response = client.send(req, badHandler);
+ try (InputStream is = response.body()) {
+ String body = new String(is.readAllBytes(), UTF_8);
+ assertEquals(body, "");
+ if (uri.endsWith("/chunk")) {
+ // with /fixed and 0 length
+ // there's no need for any call to request()
+ throw new RuntimeException("Expected IAE not thrown");
+ }
+ }
+ } catch (Exception x) {
+ Throwable cause = x;
+ if (x instanceof CompletionException || x instanceof ExecutionException) {
+ cause = x.getCause();
+ }
+ if (cause instanceof IOException && cause.getCause() != null) {
+ cause = cause.getCause();
+ }
+ if (cause instanceof IllegalArgumentException) {
+ System.out.println("Got expected exception: " + cause);
+ } else throw x;
+ }
+ }
+ }
+
+ @Test(dataProvider = "variants")
+ public void testNoBodyAsync(String uri, boolean sameClient, BHS handlers)
+ throws Exception
+ {
+ HttpClient client = null;
+ for (int i=0; i< ITERATION_COUNT; i++) {
+ if (!sameClient || client == null)
+ client = newHttpClient();
+
+ HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
+ .build();
+ BodyHandler<InputStream> handler = handlers.get();
+ BodyHandler<InputStream> badHandler = (c,h) ->
+ new BadBodySubscriber<>(handler.apply(c,h));
+ CompletableFuture<String> result =
+ client.sendAsync(req, badHandler).thenCompose(
+ (responsePublisher) -> {
+ try (InputStream is = responsePublisher.body()) {
+ return CompletableFuture.completedFuture(
+ new String(is.readAllBytes(), UTF_8));
+ } catch (Exception x) {
+ return CompletableFuture.failedFuture(x);
+ }
+ });
+ try {
+ // Get the final result and compare it with the expected body
+ assertEquals(result.get(), "");
+ if (uri.endsWith("/chunk")) {
+ // with /fixed and 0 length
+ // there's no need for any call to request()
+ throw new RuntimeException("Expected IAE not thrown");
+ }
+ } catch (Exception x) {
+ Throwable cause = x;
+ if (x instanceof CompletionException || x instanceof ExecutionException) {
+ cause = x.getCause();
+ }
+ if (cause instanceof IOException && cause.getCause() != null) {
+ cause = cause.getCause();
+ }
+ if (cause instanceof IllegalArgumentException) {
+ System.out.println("Got expected exception: " + cause);
+ } else throw x;
+ }
+ }
+ }
+
+ @Test(dataProvider = "variants")
+ public void testAsString(String uri, boolean sameClient, BHS handlers)
+ throws Exception
+ {
+ HttpClient client = null;
+ for (int i=0; i< ITERATION_COUNT; i++) {
+ if (!sameClient || client == null)
+ client = newHttpClient();
+
+ HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody"))
+ .build();
+ BodyHandler<InputStream> handler = handlers.get();
+ BodyHandler<InputStream> badHandler = (c,h) ->
+ new BadBodySubscriber<>(handler.apply(c,h));
+ try {
+ HttpResponse<InputStream> response = client.send(req, badHandler);
+ try (InputStream is = response.body()) {
+ String body = new String(is.readAllBytes(), UTF_8);
+ assertEquals(body, WITH_BODY);
+ throw new RuntimeException("Expected IAE not thrown");
+ }
+ } catch (Exception x) {
+ Throwable cause = x;
+ if (x instanceof CompletionException || x instanceof ExecutionException) {
+ cause = x.getCause();
+ }
+ if (cause instanceof IOException && cause.getCause() != null) {
+ cause = cause.getCause();
+ }
+ if (cause instanceof IllegalArgumentException) {
+ System.out.println("Got expected exception: " + cause);
+ } else throw x;
+ }
+ }
+ }
+
+ @Test(dataProvider = "variants")
+ public void testAsStringAsync(String uri, boolean sameClient, BHS handlers)
+ throws Exception
+ {
+ HttpClient client = null;
+ for (int i=0; i< ITERATION_COUNT; i++) {
+ if (!sameClient || client == null)
+ client = newHttpClient();
+
+ HttpRequest req = HttpRequest.newBuilder(URI.create(uri+"/withBody"))
+ .build();
+ BodyHandler<InputStream> handler = handlers.get();
+ BodyHandler<InputStream> badHandler = (c,h) ->
+ new BadBodySubscriber<>(handler.apply(c,h));
+ CompletableFuture<String> result = client.sendAsync(req, badHandler)
+ .thenCompose((responsePublisher) -> {
+ try (InputStream is = responsePublisher.body()) {
+ return CompletableFuture.completedFuture(
+ new String(is.readAllBytes(), UTF_8));
+ } catch (Exception x) {
+ return CompletableFuture.failedFuture(x);
+ }
+ });
+ // Get the final result and compare it with the expected body
+ try {
+ String body = result.get();
+ assertEquals(body, WITH_BODY);
+ throw new RuntimeException("Expected IAE not thrown");
+ } catch (Exception x) {
+ Throwable cause = x;
+ if (x instanceof CompletionException || x instanceof ExecutionException) {
+ cause = x.getCause();
+ }
+ if (cause instanceof IOException && cause.getCause() != null) {
+ cause = cause.getCause();
+ }
+ if (cause instanceof IllegalArgumentException) {
+ System.out.println("Got expected exception: " + cause);
+ } else throw x;
+ }
+ }
+ }
+
+ static final class BadSubscription implements Flow.Subscription {
+ Flow.Subscription subscription;
+ Executor executor;
+ BadSubscription(Flow.Subscription subscription) {
+ this.subscription = subscription;
+ }
+
+ @Override
+ public void request(long n) {
+ if (executor == null) {
+ subscription.request(-n);
+ } else {
+ executor.execute(() -> subscription.request(-n));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ subscription.cancel();
+ }
+ }
+
+ static final class BadBodySubscriber<T> implements BodySubscriber<T> {
+ final BodySubscriber<T> subscriber;
+ BadBodySubscriber(BodySubscriber<T> subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ @Override
+ public CompletionStage<T> getBody() {
+ return subscriber.getBody();
+ }
+
+ @Override
+ public void onSubscribe(Flow.Subscription subscription) {
+ System.out.println("Subscription is: " + subscription);
+ subscriber.onSubscribe(new BadSubscription(subscription));
+ }
+
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ subscriber.onNext(item);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ subscriber.onError(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ subscriber.onComplete();
+ }
+ }
+
+ static String serverAuthority(HttpServer server) {
+ return InetAddress.getLoopbackAddress().getHostName() + ":"
+ + server.getAddress().getPort();
+ }
+
+ @BeforeTest
+ public void setup() throws Exception {
+ sslContext = new SimpleSSLContext().get();
+ if (sslContext == null)
+ throw new AssertionError("Unexpected null sslContext");
+
+ // HTTP/1.1
+ HttpTestHandler h1_fixedLengthHandler = new HTTP_FixedLengthHandler();
+ HttpTestHandler h1_chunkHandler = new HTTP_VariableLengthHandler();
+ InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+ httpTestServer = HttpTestServer.of(HttpServer.create(sa, 0));
+ httpTestServer.addHandler( h1_fixedLengthHandler, "/http1/fixed");
+ httpTestServer.addHandler(h1_chunkHandler,"/http1/chunk");
+ httpURI_fixed = "http://" + httpTestServer.serverAuthority() + "/http1/fixed";
+ httpURI_chunk = "http://" + httpTestServer.serverAuthority() + "/http1/chunk";
+
+ HttpsServer httpsServer = HttpsServer.create(sa, 0);
+ httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
+ httpsTestServer = HttpTestServer.of(httpsServer);
+ httpsTestServer.addHandler(h1_fixedLengthHandler, "/https1/fixed");
+ httpsTestServer.addHandler(h1_chunkHandler, "/https1/chunk");
+ httpsURI_fixed = "https://" + httpsTestServer.serverAuthority() + "/https1/fixed";
+ httpsURI_chunk = "https://" + httpsTestServer.serverAuthority() + "/https1/chunk";
+
+ // HTTP/2
+ HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler();
+ HttpTestHandler h2_chunkedHandler = new HTTP_VariableLengthHandler();
+
+ http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
+ http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
+ http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
+ http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed";
+ http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk";
+
+ https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, 0));
+ https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
+ https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
+ https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed";
+ https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk";
+
+ httpTestServer.start();
+ httpsTestServer.start();
+ http2TestServer.start();
+ https2TestServer.start();
+ }
+
+ @AfterTest
+ public void teardown() throws Exception {
+ httpTestServer.stop();
+ httpsTestServer.stop();
+ http2TestServer.stop();
+ https2TestServer.stop();
+ }
+
+ static final String WITH_BODY = "Lorem ipsum dolor sit amet, consectetur" +
+ " adipiscing elit, sed do eiusmod tempor incididunt ut labore et" +
+ " dolore magna aliqua. Ut enim ad minim veniam, quis nostrud" +
+ " exercitation ullamco laboris nisi ut aliquip ex ea" +
+ " commodo consequat. Duis aute irure dolor in reprehenderit in " +
+ "voluptate velit esse cillum dolore eu fugiat nulla pariatur." +
+ " Excepteur sint occaecat cupidatat non proident, sunt in culpa qui" +
+ " officia deserunt mollit anim id est laborum.";
+
+ static class HTTP_FixedLengthHandler implements HttpTestHandler {
+ @Override
+ public void handle(HttpTestExchange t) throws IOException {
+ out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI());
+ try (InputStream is = t.getRequestBody()) {
+ is.readAllBytes();
+ }
+ if (t.getRequestURI().getPath().endsWith("/withBody")) {
+ byte[] bytes = WITH_BODY.getBytes(UTF_8);
+ t.sendResponseHeaders(200, bytes.length); // body
+ try (OutputStream os = t.getResponseBody()) {
+ os.write(bytes);
+ }
+ } else {
+ t.sendResponseHeaders(200, 0); //no body
+ }
+ }
+ }
+
+ static class HTTP_VariableLengthHandler implements HttpTestHandler {
+ @Override
+ public void handle(HttpTestExchange t) throws IOException {
+ out.println("HTTP_VariableLengthHandler received request to " + t.getRequestURI());
+ try (InputStream is = t.getRequestBody()) {
+ is.readAllBytes();
+ }
+ t.sendResponseHeaders(200, -1); //chunked or variable
+ if (t.getRequestURI().getPath().endsWith("/withBody")) {
+ byte[] bytes = WITH_BODY.getBytes(UTF_8);
+ try (OutputStream os = t.getResponseBody()) {
+ int chunkLen = bytes.length/10;
+ if (chunkLen == 0) {
+ os.write(bytes);
+ } else {
+ int count = 0;
+ for (int i=0; i<10; i++) {
+ os.write(bytes, count, chunkLen);
+ os.flush();
+ count += chunkLen;
+ }
+ os.write(bytes, count, bytes.length % chunkLen);
+ count += bytes.length % chunkLen;
+ assert count == bytes.length;
+ }
+ }
+ } else {
+ t.getResponseBody().close(); // no body
+ }
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/StreamingBody.java Fri Apr 06 17:04:25 2018 +0100
@@ -0,0 +1,168 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @summary Exercise a streaming subscriber ( InputStream ) without holding a
+ * strong (or any ) reference to the client.
+ * @modules java.base/sun.net.www.http
+ * java.net.http/jdk.internal.net.http.common
+ * java.net.http/jdk.internal.net.http.frame
+ * java.net.http/jdk.internal.net.http.hpack
+ * java.logging
+ * jdk.httpserver
+ * @library /lib/testlibrary /test/lib http2/server
+ * @build Http2TestServer
+ * @build jdk.testlibrary.SimpleSSLContext
+ * @run testng/othervm
+ * -Djdk.httpclient.HttpClient.log=trace,headers,requests
+ * StreamingBody
+ */
+
+import com.sun.net.httpserver.HttpServer;
+import com.sun.net.httpserver.HttpsConfigurator;
+import com.sun.net.httpserver.HttpsServer;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodyHandlers;
+import javax.net.ssl.SSLContext;
+import jdk.testlibrary.SimpleSSLContext;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import static java.lang.System.out;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+
+public class StreamingBody implements HttpServerAdapters {
+
+ SSLContext sslContext;
+ HttpTestServer httpTestServer; // HTTP/1.1 [ 4 servers ]
+ HttpTestServer httpsTestServer; // HTTPS/1.1
+ HttpTestServer http2TestServer; // HTTP/2 ( h2c )
+ HttpTestServer https2TestServer; // HTTP/2 ( h2 )
+ String httpURI;
+ String httpsURI;
+ String http2URI;
+ String https2URI;
+
+ static final String MESSAGE = "StreamingBody message body";
+ static final int ITERATIONS = 100;
+
+ @DataProvider(name = "positive")
+ public Object[][] positive() {
+ return new Object[][] {
+ { httpURI, },
+ { httpsURI, },
+ { http2URI, },
+ { https2URI, },
+ };
+ }
+
+ @Test(dataProvider = "positive")
+ void test(String uriString) throws Exception {
+ out.printf("%n---- starting (%s) ----%n", uriString);
+ URI uri = URI.create(uriString);
+ HttpRequest request = HttpRequest.newBuilder(uri).build();
+
+ for (int i=0; i< ITERATIONS; i++) {
+ out.println("iteration: " + i);
+ HttpResponse<InputStream> response = HttpClient.newBuilder()
+ .sslContext(sslContext)
+ .build()
+ .sendAsync(request, BodyHandlers.ofInputStream())
+ .join();
+
+ String body = new String(response.body().readAllBytes(), UTF_8);
+ out.println("Got response: " + response);
+ out.println("Got body Path: " + body);
+
+ assertEquals(response.statusCode(), 200);
+ assertEquals(body, MESSAGE);
+ }
+ }
+
+
+ // -- Infrastructure
+
+ @BeforeTest
+ public void setup() throws Exception {
+ sslContext = new SimpleSSLContext().get();
+ if (sslContext == null)
+ throw new AssertionError("Unexpected null sslContext");
+
+ InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+
+ httpTestServer = HttpTestServer.of(HttpServer.create(sa, 0));
+ httpTestServer.addHandler(new MessageHandler(), "/http1/streamingbody/");
+ httpURI = "http://" + httpTestServer.serverAuthority() + "/http1/streamingbody/w";
+
+ HttpsServer httpsServer = HttpsServer.create(sa, 0);
+ httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
+ httpsTestServer = HttpTestServer.of(httpsServer);
+ httpsTestServer.addHandler(new MessageHandler(),"/https1/streamingbody/");
+ httpsURI = "https://" + httpsTestServer.serverAuthority() + "/https1/streamingbody/x";
+
+ http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
+ http2TestServer.addHandler(new MessageHandler(), "/http2/streamingbody/");
+ http2URI = "http://" + http2TestServer.serverAuthority() + "/http2/streamingbody/y";
+ https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, 0));
+ https2TestServer.addHandler(new MessageHandler(), "/https2/streamingbody/");
+ https2URI = "https://" + https2TestServer.serverAuthority() + "/https2/streamingbody/z";
+
+ httpTestServer.start();
+ httpsTestServer.start();
+ http2TestServer.start();
+ https2TestServer.start();
+ }
+
+ @AfterTest
+ public void teardown() throws Exception {
+ httpTestServer.stop();
+ httpsTestServer.stop();
+ http2TestServer.stop();
+ https2TestServer.stop();
+ }
+
+ static class MessageHandler implements HttpTestHandler {
+ @Override
+ public void handle(HttpTestExchange t) throws IOException {
+ System.out.println("MessageHandler for: " + t.getRequestURI());
+ try (InputStream is = t.getRequestBody();
+ OutputStream os = t.getResponseBody()) {
+ is.readAllBytes();
+ byte[] bytes = MESSAGE.getBytes(UTF_8);
+ t.sendResponseHeaders(200, bytes.length);
+ os.write(bytes);
+ }
+ }
+ }
+}