http-client-branch: added mapping subscriber, miscellaneous bug fixes and change to discard()/replace() subscribers
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Tue Feb 06 15:34:08 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Tue Feb 06 16:07:43 2018 +0000
@@ -78,7 +78,7 @@
* read (or discard) the body and convert it into some useful Java object type.
* The handler can return one of the pre-defined subscriber types, or a custom
* subscriber, or if the body is to be discarded it can call {@link
- * BodySubscriber#discard(Object) discard} and return a subscriber which
+ * BodySubscriber#discard() discard} and return a subscriber which
* discards the response body. Static implementations of both handlers and
* subscribers are provided in {@linkplain BodyHandler BodyHandler} and
* {@linkplain BodySubscriber BodySubscriber} respectively. In all cases, the
@@ -198,7 +198,8 @@
* <li>{@link #asFileDownload(java.nio.file.Path,OpenOption...)
* asFileDownload(Path,OpenOption...)}</li>
* <li>{@link #asInputStream() asInputStream()}</li>
- * <li>{@link #discard(Object) }</li>
+ * <li>{@link #discard() }</li>
+ * <li>{@link #replace(Object) }</li>
* <li>{@link #buffering(BodyHandler, int)
* buffering(BodyHandler,int)}</li>
* </ul>
@@ -234,7 +235,7 @@
* .response(
* (status, headers) -> status == 200
* ? BodySubscriber.asFile(Paths.get("/tmp/f"))
- * : BodySubscriber.discard(Paths.get("/NULL")));
+ * : BodySubscriber.replace(Paths.get("/NULL")));
* }
* </pre>
*
@@ -247,7 +248,7 @@
* response status code and headers. This method is always called before
* the body is read and its implementation can decide to keep the body
* and store it somewhere, or else discard it by returning the {@code
- * BodySubscriber} returned from {@link BodySubscriber#discard(Object)
+ * BodySubscriber} returned from {@link BodySubscriber#discard()
* discard}.
*
* @param statusCode the HTTP status code received
@@ -412,6 +413,15 @@
}
/**
+ * Returns a response body handler which discards the response body.
+ *
+ * @return a response body handler
+ */
+ public static BodyHandler<Void> discard() {
+ return (status, headers) -> BodySubscriber.discard();
+ }
+
+ /**
* Returns a response body handler which discards the response body and
* uses the given value as a replacement for it.
*
@@ -419,8 +429,8 @@
* @param value the value of U to return as the body, may be {@code null}
* @return a response body handler
*/
- public static <U> BodyHandler<U> discard(U value) {
- return (status, headers) -> BodySubscriber.discard(value);
+ public static <U> BodyHandler<U> replace(U value) {
+ return (status, headers) -> BodySubscriber.replace(value);
}
/**
@@ -557,7 +567,7 @@
/**
* Returns a {@code BodyHandler<Stream<String>>} that returns a
* {@link BodySubscriber BodySubscriber}{@code <Stream<String>>} obtained from
- * {@link BodySubscriber#asLines(Charset)}
+ * {@link BodySubscriber#asLines(Charset)
* BodySubscriber.asLines(charset)}.
* The {@link Charset charset} used to decode the response body bytes is
* obtained from the HTTP response headers as specified by {@link #asString()},
@@ -581,6 +591,10 @@
*
* <p> When the {@code HttpResponse} object is returned, the body has
* been completely written to the consumer.
+ * @apiNote
+ * The subscriber returned by this handler is not flow controlled.
+ * Therefore, the supplied consumer must be able to process whatever
+ * amount of data is delivered in a timely fashion.
*
* @param consumer a Consumer to accept the response body
* @return a response body handler
@@ -777,7 +791,9 @@
/**
* Returns a {@code CompletionStage} which when completed will return
- * the response body object.
+ * the response body object. This method can be called at any time
+ * relative to the other {@link Flow.Subscriber} methods and is invoked
+ * using the client's {@link Executor}.
*
* @return a CompletionStage for the response body
*/
@@ -785,7 +801,7 @@
/**
* Returns a body subscriber that forwards all response body to the
- * given {@code Flow.Subscriber}. The {@linkplain #getBody()} completion
+ * given {@code Flow.Subscriber}. The {@linkplain #getBody() completion
* stage} of the returned body subscriber completes after one of the
* given subscribers {@code onComplete} or {@code onError} has been
* invoked.
@@ -804,7 +820,7 @@
/**
* Returns a body subscriber that forwards all response body to the
- * given {@code Flow.Subscriber}. The {@linkplain #getBody()} completion
+ * given {@code Flow.Subscriber}. The {@linkplain #getBody() completion
* stage} of the returned body subscriber completes after one of the
* given subscribers {@code onComplete} or {@code onError} has been
* invoked.
@@ -833,7 +849,7 @@
/**
* Returns a body subscriber that forwards all response body to the
* given {@code Flow.Subscriber}, lines by lines.
- * The {@linkplain #getBody()} completion
+ * The {@linkplain #getBody() completion
* stage} of the returned body subscriber completes after one of the
* given subscribers {@code onComplete} or {@code onError} has been
* invoked.
@@ -861,7 +877,7 @@
/**
* Returns a body subscriber that forwards all response body to the
* given {@code Flow.Subscriber}, lines by lines.
- * The {@linkplain #getBody()} completion
+ * The {@linkplain #getBody() completion
* stage} of the returned body subscriber completes after one of the
* given subscribers {@code onComplete} or {@code onError} has been
* invoked.
@@ -999,6 +1015,11 @@
* <p> The {@link HttpResponse} using this subscriber is available after
* the entire response has been read.
*
+ * @apiNote
+ * This subscriber is not flow controlled.
+ * Therefore, the supplied consumer must be able to process whatever
+ * amount of data is delivered in a timely fashion.
+ *
* @param consumer a Consumer of byte arrays
* @return a BodySubscriber
*/
@@ -1048,13 +1069,14 @@
* the underlying HTTP connection to be closed and prevent it
* from being reused for subsequent operations.
*
+ * @param charset the character set to use when converting bytes to characters
* @return a body subscriber that streams the response body as a
* {@link Stream Stream<String>}.
*
* @see BufferedReader#lines()
*/
public static BodySubscriber<Stream<String>> asLines(Charset charset) {
- return ResponseSubscribers.HttpLineStream.create(charset);
+ return ResponseSubscribers.createLineStream(charset);
}
/**
@@ -1066,11 +1088,20 @@
* @param value the value to return from HttpResponse.body(), may be {@code null}
* @return a {@code BodySubscriber}
*/
- public static <U> BodySubscriber<U> discard(U value) {
+ public static <U> BodySubscriber<U> replace(U value) {
return new ResponseSubscribers.NullSubscriber<>(Optional.ofNullable(value));
}
/**
+ * Returns a response subscriber which discards the response body.
+ *
+ * @return a response body subscriber
+ */
+ public static BodySubscriber<Void> discard() {
+ return new ResponseSubscribers.NullSubscriber<>(Optional.ofNullable(null));
+ }
+
+ /**
* Returns a {@code BodySubscriber} which buffers data before delivering
* it to the given downstream subscriber. The subscriber guarantees to
* deliver {@code buffersize} bytes of data to each invocation of the
@@ -1093,5 +1124,48 @@
throw new IllegalArgumentException("must be greater than 0");
return new BufferingSubscriber<T>(downstream, bufferSize);
}
+
+ /**
+ * Returns a {@code BodySubscriber} whose response body is mapped
+ * using the supplied mapping function from one type {@code <T>} to
+ * another type {@code <U>}. The mapping function is executed
+ * using the {@link Executor} of the sending client and can
+ * therefore be used to map any response body type, including
+ * blocking {@link java.io.InputStream}s as shown in the following
+ * example which uses a well-known JSON parser to convert an {@code InputStream}
+ * into any annotated Java object type.
+ * <p>
+ * <b>Example usage</b>
+ * <p> <pre> {@code
+ * public static <W> BodySubscriber<W> asJSON(Class<W> targetType) {
+ * BodySubscriber<InputStream> upstream = BodySubscriber.asInputStream();
+ *
+ * BodySubscriber<W> downstream = mappedFrom(
+ * upstream,
+ * (InputStream is) -> {
+ * try (InputStream stream = is) {
+ * ObjectMapper objectMapper = new ObjectMapper();
+ * W result = objectMapper.readValue(stream, targetType);
+ * return result;
+ * } catch (IOException e) {
+ * throw new UncheckedIOException(e);
+ * }
+ * });
+ * return downstream;
+ * }
+ * }</pre>
+ *
+ * @param <T> the type of the body subscriber to be mapped
+ * @param <U> the type of the body subscriber returned
+ * @param upstream the body subscriber to be mapped
+ * @param mapper the mapping function
+ * @return a mapped body subscriber
+ */
+ public static <T,U> BodySubscriber<U> mappedFrom(
+ BodySubscriber<T> upstream,
+ Function<T, U> mapper)
+ {
+ return new ResponseSubscribers.MappedSubscriber<T, U>(upstream, mapper);
+ }
}
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Exchange.java Tue Feb 06 15:34:08 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Exchange.java Tue Feb 06 16:07:43 2018 +0000
@@ -408,7 +408,7 @@
}
HttpResponse.BodySubscriber<T> ignoreBody(int status, HttpHeaders hdrs) {
- return HttpResponse.BodySubscriber.discard((T)null);
+ return HttpResponse.BodySubscriber.replace(null);
}
// if this response was received in reply to an upgrade
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Http1Response.java Tue Feb 06 15:34:08 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Http1Response.java Tue Feb 06 16:07:43 2018 +0000
@@ -152,7 +152,7 @@
connection.close();
return MinimalFuture.completedFuture(null); // not treating as error
} else {
- return readBody(HttpResponse.BodySubscriber.discard((Void)null), true, executor);
+ return readBody(HttpResponse.BodySubscriber.discard(), true, executor);
}
}
@@ -161,8 +161,8 @@
Executor executor) {
this.return2Cache = return2Cache;
final HttpResponse.BodySubscriber<U> pusher = p;
- final CompletionStage<U> bodyCF = p.getBody();
- final CompletableFuture<U> cf = MinimalFuture.of(bodyCF);
+
+ final CompletableFuture<U> cf = new MinimalFuture<>();
int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
@@ -242,6 +242,13 @@
}
}
});
+ p.getBody().whenComplete((U u, Throwable t) -> {
+ if (t == null)
+ cf.complete(u);
+ else
+ cf.completeExceptionally(t);
+ });
+
return cf;
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/PlainTunnelingConnection.java Tue Feb 06 15:34:08 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/PlainTunnelingConnection.java Tue Feb 06 16:07:43 2018 +0000
@@ -69,7 +69,7 @@
assert client != null;
HttpRequestImpl req = new HttpRequestImpl("CONNECT", address, proxyHeaders);
MultiExchange<Void> mulEx = new MultiExchange<>(null, req,
- client, discard(null), null, null);
+ client, discard(), null, null);
Exchange<Void> connectExchange = new Exchange<>(req, mulEx);
return connectExchange
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/ResponseSubscribers.java Tue Feb 06 15:34:08 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/ResponseSubscribers.java Tue Feb 06 16:07:43 2018 +0000
@@ -49,6 +49,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
@@ -467,56 +468,18 @@
}
- /**
- * A {@code Stream<String>} built on top of the Flow API.
- */
- public static final class HttpLineStream implements BodySubscriber<Stream<String>> {
-
- private final HttpResponseInputStream responseInputStream;
- private final Charset charset;
- private HttpLineStream(Charset charset) {
- this.charset = Objects.requireNonNull(charset);
- responseInputStream = new HttpResponseInputStream();
- }
-
- @Override
- public CompletionStage<Stream<String>> getBody() {
- return responseInputStream.getBody().thenApply((is) ->
- new BufferedReader(new InputStreamReader(is, charset))
- .lines().onClose(this::close));
- }
-
- @Override
- public void onSubscribe(Subscription subscription) {
- responseInputStream.onSubscribe(subscription);
- }
+ public static BodySubscriber<Stream<String>> createLineStream() {
+ return createLineStream(UTF_8);
+ }
- @Override
- public void onNext(List<ByteBuffer> item) {
- responseInputStream.onNext(item);
- }
-
- @Override
- public void onError(Throwable throwable) {
- responseInputStream.onError(throwable);
- }
-
- @Override
- public void onComplete() {
- responseInputStream.onComplete();
- }
-
- void close() {
- try {
- responseInputStream.close();
- } catch (IOException x) {
- // ignore
- }
- }
-
- public static HttpLineStream create(Charset charset) {
- return new HttpLineStream(Optional.ofNullable(charset).orElse(UTF_8));
- }
+ public static BodySubscriber<Stream<String>> createLineStream(Charset charset) {
+ Objects.requireNonNull(charset);
+ BodySubscriber<InputStream> s = new HttpResponseInputStream();
+ return new MappedSubscriber<InputStream,Stream<String>>(s,
+ (InputStream stream) -> {
+ return new BufferedReader(new InputStreamReader(stream, charset))
+ .lines().onClose(() -> Utils.close(stream));
+ });
}
/**
@@ -630,4 +593,58 @@
return cf;
}
}
+
+ /**
+ * A body subscriber which receives input from an upstream subscriber
+ * and maps that subscriber's body type to a new type. The upstream subscriber
+ * delegates all flow operations directly to this object. The
+ * {@link CompletionStage} returned by {@link #getBody()}} takes the output
+ * of the upstream {@code getBody()} and applies the mapper function to
+ * obtain the new {@code CompletionStage} type.
+ *
+ * Uses an Executor that must be set externally.
+ *
+ * @param <T> the upstream body type
+ * @param <U> this subscriber's body type
+ */
+ public static class MappedSubscriber<T,U> implements BodySubscriber<U> {
+ final BodySubscriber<T> upstream;
+ final Function<T,U> mapper;
+
+ /**
+ *
+ * @param upstream
+ * @param mapper
+ */
+ public MappedSubscriber(BodySubscriber<T> upstream, Function<T,U> mapper) {
+ this.upstream = upstream;
+ this.mapper = mapper;
+ }
+
+ @Override
+ public CompletionStage<U> getBody() {
+ return upstream.getBody()
+ .thenApply(mapper);
+ }
+
+ @Override
+ public void onSubscribe(Flow.Subscription subscription) {
+ upstream.onSubscribe(subscription);
+ }
+
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ upstream.onNext(item);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ upstream.onError(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ upstream.onComplete();
+ }
+ }
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Stream.java Tue Feb 06 15:34:08 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Stream.java Tue Feb 06 16:07:43 2018 +0000
@@ -231,7 +231,7 @@
{
Log.logTrace("Reading body on stream {0}", streamid);
BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
- CompletableFuture<T> cf = receiveData(bodySubscriber);
+ CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
PushGroup<?> pg = exchange.getPushGroup();
if (pg != null) {
@@ -262,8 +262,18 @@
// pushes entire response body into response subscriber
// blocking when required by local or remote flow control
- CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber) {
- responseBodyCF = MinimalFuture.of(bodySubscriber.getBody());
+ CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
+ responseBodyCF = new MinimalFuture<>();
+ // We want to allow the subscriber's getBody() method to block so it
+ // can work with InputStreams. So, we offload execution.
+ executor.execute(() -> {
+ bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
+ if (t == null)
+ responseBodyCF.complete(body);
+ else
+ responseBodyCF.completeExceptionally(t);
+ });
+ });
if (isCanceled()) {
Throwable t = getCancelCause();
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java Tue Feb 06 15:34:08 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java Tue Feb 06 16:07:43 2018 +0000
@@ -186,7 +186,7 @@
public CompletableFuture<Result> send() {
PrivilegedAction<CompletableFuture<Result>> pa = () ->
- client.sendAsync(this.request, BodyHandler.<Void>discard(null))
+ client.sendAsync(this.request, BodyHandler.discard())
.thenCompose(this::resultFrom);
return AccessController.doPrivileged(pa);
}
--- a/test/jdk/java/net/httpclient/ConcurrentResponses.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/ConcurrentResponses.java Tue Feb 06 16:07:43 2018 +0000
@@ -158,7 +158,7 @@
}
// initial connection to seed the cache so next parallel connections reuse it
- client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discard(null)).join();
+ client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discard()).join();
// will reuse connection cached from the previous request ( when HTTP/2 )
CompletableFuture.allOf(requests.keySet().parallelStream()
@@ -183,7 +183,7 @@
}
// initial connection to seed the cache so next parallel connections reuse it
- client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discard(null)).join();
+ client.sendAsync(HttpRequest.newBuilder(URI.create(uri)).build(), discard()).join();
// will reuse connection cached from the previous request ( when HTTP/2 )
CompletableFuture.allOf(requests.keySet().parallelStream()
--- a/test/jdk/java/net/httpclient/HandshakeFailureTest.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/HandshakeFailureTest.java Tue Feb 06 16:07:43 2018 +0000
@@ -87,7 +87,7 @@
.version(version)
.build();
try {
- HttpResponse<Void> response = client.send(request, discard(null));
+ HttpResponse<Void> response = client.send(request, discard());
String msg = String.format("UNEXPECTED response=%s%n", response);
throw new RuntimeException(msg);
} catch (SSLHandshakeException expected) {
@@ -106,7 +106,7 @@
.version(version)
.build();
try {
- HttpResponse<Void> response = client.send(request, discard(null));
+ HttpResponse<Void> response = client.send(request, discard());
String msg = String.format("UNEXPECTED response=%s%n", response);
throw new RuntimeException(msg);
} catch (SSLHandshakeException expected) {
@@ -124,7 +124,7 @@
.version(version)
.build();
CompletableFuture<HttpResponse<Void>> response =
- client.sendAsync(request, discard(null));
+ client.sendAsync(request, discard());
try {
response.join();
String msg = String.format("UNEXPECTED response=%s%n", response);
@@ -150,7 +150,7 @@
.version(version)
.build();
CompletableFuture<HttpResponse<Void>> response =
- client.sendAsync(request, discard(null));
+ client.sendAsync(request, discard());
try {
response.join();
String msg = String.format("UNEXPECTED response=%s%n", response);
--- a/test/jdk/java/net/httpclient/ImmutableHeaders.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/ImmutableHeaders.java Tue Feb 06 16:07:43 2018 +0000
@@ -81,7 +81,7 @@
throw new RuntimeException("Test failed");
} catch (UnsupportedOperationException ex) {
}
- HttpResponse resp = client.send(req, discard(null));
+ HttpResponse resp = client.send(req, discard());
try {
HttpHeaders hd = resp.headers();
List<String> v = hd.allValues("X-Foo-Response");
--- a/test/jdk/java/net/httpclient/InterruptedBlockingSend.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/InterruptedBlockingSend.java Tue Feb 06 16:07:43 2018 +0000
@@ -47,7 +47,7 @@
Thread t = new Thread(() -> {
try {
- client.send(request, discard(null));
+ client.send(request, discard());
} catch (InterruptedException e) {
throwable = e;
} catch (Throwable th) {
--- a/test/jdk/java/net/httpclient/InvalidSSLContextTest.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/InvalidSSLContextTest.java Tue Feb 06 16:07:43 2018 +0000
@@ -81,7 +81,7 @@
.build();
try {
- HttpResponse<?> response = client.send(request, BodyHandler.discard(""));
+ HttpResponse<?> response = client.send(request, BodyHandler.discard());
Assert.fail("UNEXPECTED response" + response);
} catch (SSLException sslex) {
System.out.println("Caught expected: " + sslex);
@@ -100,7 +100,7 @@
.build();
assertExceptionally(SSLException.class,
- client.sendAsync(request, BodyHandler.discard("")));
+ client.sendAsync(request, BodyHandler.discard()));
}
static void assertExceptionally(Class<? extends Throwable> clazz,
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/MappedResponseSubscriber.java Tue Feb 06 16:07:43 2018 +0000
@@ -0,0 +1,307 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @summary Tests mapped response subscriber
+ * @library /lib/testlibrary http2/server
+ * @build jdk.testlibrary.SimpleSSLContext
+ * @modules java.base/sun.net.www.http
+ * jdk.incubator.httpclient/jdk.incubator.http.internal.common
+ * jdk.incubator.httpclient/jdk.incubator.http.internal.frame
+ * jdk.incubator.httpclient/jdk.incubator.http.internal.hpack
+ * @run testng/othervm MappedResponseSubscriber
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Flow;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import com.sun.net.httpserver.HttpsConfigurator;
+import com.sun.net.httpserver.HttpsServer;
+import jdk.incubator.http.HttpClient;
+import jdk.incubator.http.HttpHeaders;
+import jdk.incubator.http.HttpRequest;
+import jdk.incubator.http.HttpResponse;
+import jdk.incubator.http.HttpResponse.BodyHandler;
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+import javax.net.ssl.SSLContext;
+import jdk.testlibrary.SimpleSSLContext;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import static java.lang.System.out;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static jdk.incubator.http.HttpResponse.BodySubscriber.asString;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class MappedResponseSubscriber {
+
+ SSLContext sslContext;
+ HttpServer httpTestServer; // HTTP/1.1 [ 4 servers ]
+ HttpsServer httpsTestServer; // HTTPS/1.1
+ Http2TestServer http2TestServer; // HTTP/2 ( h2c )
+ Http2TestServer https2TestServer; // HTTP/2 ( h2 )
+ String httpURI_fixed;
+ String httpURI_chunk;
+ String httpsURI_fixed;
+ String httpsURI_chunk;
+ String http2URI_fixed;
+ String http2URI_chunk;
+ String https2URI_fixed;
+ String https2URI_chunk;
+
+ static final int ITERATION_COUNT = 10;
+ // a shared executor helps reduce the amount of threads created by the test
+ static final Executor executor = Executors.newCachedThreadPool();
+
+ @DataProvider(name = "variants")
+ public Object[][] variants() {
+ return new Object[][]{
+ { httpURI_fixed, false },
+ { httpURI_chunk, false },
+ { httpsURI_fixed, false },
+ { httpsURI_chunk, false },
+ { http2URI_fixed, false },
+ { http2URI_chunk, false },
+ { https2URI_fixed, false,},
+ { https2URI_chunk, false },
+
+ { httpURI_fixed, true },
+ { httpURI_chunk, true },
+ { httpsURI_fixed, true },
+ { httpsURI_chunk, true },
+ { http2URI_fixed, true },
+ { http2URI_chunk, true },
+ { https2URI_fixed, true,},
+ { https2URI_chunk, true },
+ };
+ }
+
+ HttpClient newHttpClient() {
+ return HttpClient.newBuilder()
+ .executor(executor)
+ .sslContext(sslContext)
+ .build();
+ }
+
+ @Test(dataProvider = "variants")
+ public void testAsBytes(String uri, boolean sameClient) throws Exception {
+ HttpClient client = null;
+ for (int i=0; i< ITERATION_COUNT; i++) {
+ if (!sameClient || client == null)
+ client = newHttpClient();
+
+ HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
+ .build();
+ BodyHandler<byte[]> handler = new CRSBodyHandler();
+ HttpResponse<byte[]> response = client.send(req, handler);
+ byte[] body = response.body();
+ assertEquals(body, bytes);
+ }
+ }
+
+ static class CRSBodyHandler implements BodyHandler<byte[]> {
+ @Override
+ public BodySubscriber<byte[]> apply(int statusCode, HttpHeaders responseHeaders) {
+ assertEquals(statusCode, 200);
+ return HttpResponse.BodySubscriber.mappedFrom(
+ new CRSBodySubscriber(), (s) -> s.getBytes()
+ );
+ }
+ }
+
+ static class CRSBodySubscriber implements BodySubscriber<String> {
+ private final BodySubscriber<String> asString = asString(UTF_8);
+ volatile boolean onSubscribeCalled;
+
+ @Override
+ public void onSubscribe(Flow.Subscription subscription) {
+ //out.println("onSubscribe ");
+ onSubscribeCalled = true;
+ asString.onSubscribe(subscription);
+ }
+
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ // out.println("onNext " + item);
+ assertTrue(onSubscribeCalled);
+ asString.onNext(item);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ //out.println("onError");
+ assertTrue(onSubscribeCalled);
+ asString.onError(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ //out.println("onComplete");
+ assertTrue(onSubscribeCalled, "onComplete called before onSubscribe");
+ asString.onComplete();
+ }
+
+ @Override
+ public CompletionStage<String> getBody() {
+ return asString.getBody();
+ }
+ }
+
+
+ @BeforeTest
+ public void setup() throws Exception {
+ sslContext = new SimpleSSLContext().get();
+ if (sslContext == null)
+ throw new AssertionError("Unexpected null sslContext");
+
+ // HTTP/1.1
+ HttpHandler h1_fixedLengthHandler = new HTTP1_FixedLengthHandler();
+ HttpHandler h1_chunkHandler = new HTTP1_ChunkedHandler();
+ InetSocketAddress sa = new InetSocketAddress("localhost", 0);
+ httpTestServer = HttpServer.create(sa, 0);
+ httpTestServer.createContext("/http1/fixed", h1_fixedLengthHandler);
+ httpTestServer.createContext("/http1/chunk", h1_chunkHandler);
+ httpURI_fixed = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/fixed";
+ httpURI_chunk = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/chunk";
+
+ httpsTestServer = HttpsServer.create(sa, 0);
+ httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
+ httpsTestServer.createContext("/https1/fixed", h1_fixedLengthHandler);
+ httpsTestServer.createContext("/https1/chunk", h1_chunkHandler);
+ httpsURI_fixed = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/fixed";
+ httpsURI_chunk = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/chunk";
+
+ // HTTP/2
+ Http2Handler h2_fixedLengthHandler = new HTTP2_FixedLengthHandler();
+ Http2Handler h2_chunkedHandler = new HTTP2_VariableHandler();
+
+ http2TestServer = new Http2TestServer("127.0.0.1", false, 0);
+ http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
+ http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
+ int port = http2TestServer.getAddress().getPort();
+ http2URI_fixed = "http://127.0.0.1:" + port + "/http2/fixed";
+ http2URI_chunk = "http://127.0.0.1:" + port + "/http2/chunk";
+
+ https2TestServer = new Http2TestServer("127.0.0.1", true, 0);
+ https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
+ https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
+ port = https2TestServer.getAddress().getPort();
+ https2URI_fixed = "https://127.0.0.1:" + port + "/https2/fixed";
+ https2URI_chunk = "https://127.0.0.1:" + port + "/https2/chunk";
+
+ httpTestServer.start();
+ httpsTestServer.start();
+ http2TestServer.start();
+ https2TestServer.start();
+ }
+
+ @AfterTest
+ public void teardown() throws Exception {
+ httpTestServer.stop(0);
+ httpsTestServer.stop(0);
+ http2TestServer.stop();
+ https2TestServer.stop();
+ }
+
+ static byte[] bytes;
+
+ static {
+ bytes = new byte[128 * 1024];
+ int b = 'A';
+
+ for (int i=0; i< bytes.length; i++) {
+ bytes[i] = (byte)b;
+ b = b == 'Z'? 'A' : b + 1;
+ }
+ }
+
+ static class HTTP1_FixedLengthHandler implements HttpHandler {
+ @Override
+ public void handle(HttpExchange t) throws IOException {
+ out.println("HTTP1_FixedLengthHandler received request to " + t.getRequestURI());
+ try (InputStream is = t.getRequestBody()) {
+ is.readAllBytes();
+ }
+ t.sendResponseHeaders(200, bytes.length); //no body
+ OutputStream os = t.getResponseBody();
+ os.write(bytes);
+ os.close();
+ }
+ }
+
+ static class HTTP1_ChunkedHandler implements HttpHandler {
+ @Override
+ public void handle(HttpExchange t) throws IOException {
+ out.println("HTTP1_ChunkedHandler received request to " + t.getRequestURI());
+ try (InputStream is = t.getRequestBody()) {
+ is.readAllBytes();
+ }
+ t.sendResponseHeaders(200, 0); // chunked
+ OutputStream os = t.getResponseBody();
+ os.write(bytes);
+ os.close();
+ }
+ }
+
+ static class HTTP2_FixedLengthHandler implements Http2Handler {
+ @Override
+ public void handle(Http2TestExchange t) throws IOException {
+ out.println("HTTP2_FixedLengthHandler received request to " + t.getRequestURI());
+ try (InputStream is = t.getRequestBody()) {
+ is.readAllBytes();
+ }
+ t.sendResponseHeaders(200, 0); // chunked
+ OutputStream os = t.getResponseBody();
+ os.write(bytes);
+ os.close();
+ }
+ }
+
+ static class HTTP2_VariableHandler implements Http2Handler {
+ @Override
+ public void handle(Http2TestExchange t) throws IOException {
+ out.println("HTTP2_VariableHandler received request to " + t.getRequestURI());
+ try (InputStream is = t.getRequestBody()) {
+ is.readAllBytes();
+ }
+ t.sendResponseHeaders(200, bytes.length); //no body
+ OutputStream os = t.getResponseBody();
+ os.write(bytes);
+ os.close();
+ }
+ }
+}
--- a/test/jdk/java/net/httpclient/NoBodyPartTwo.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/NoBodyPartTwo.java Tue Feb 06 16:07:43 2018 +0000
@@ -47,7 +47,7 @@
import static jdk.incubator.http.HttpResponse.BodyHandler.asByteArrayConsumer;
import static jdk.incubator.http.HttpResponse.BodyHandler.asInputStream;
import static jdk.incubator.http.HttpResponse.BodyHandler.buffering;
-import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
+import static jdk.incubator.http.HttpResponse.BodyHandler.replace;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -128,7 +128,7 @@
.PUT(fromString(SIMPLE_STRING))
.build();
Object obj = new Object();
- HttpResponse<Object> response = client.send(req, discard(obj));
+ HttpResponse<Object> response = client.send(req, replace(obj));
assertEquals(response.body(), obj);
}
// We have created many clients here. Try to speed up their release.
--- a/test/jdk/java/net/httpclient/ProxyAuthTest.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/ProxyAuthTest.java Tue Feb 06 16:07:43 2018 +0000
@@ -76,7 +76,7 @@
.authenticator(auth)
.build();
HttpRequest req = HttpRequest.newBuilder(uri).GET().build();
- HttpResponse<?> resp = client.sendAsync(req, discard(null)).get();
+ HttpResponse<?> resp = client.sendAsync(req, discard()).get();
if (resp.statusCode() != 404) {
throw new RuntimeException("Unexpected status code: " + resp.statusCode());
}
--- a/test/jdk/java/net/httpclient/RequestBodyTest.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/RequestBodyTest.java Tue Feb 06 16:07:43 2018 +0000
@@ -260,7 +260,7 @@
break;
case DISCARD:
Object o = new Object();
- BodyHandler<Object> bh2 = discard(o);
+ BodyHandler<Object> bh2 = replace(o);
if (bufferResponseBody) bh2 = buffering(bh2, 51);
HttpResponse<Object> or = getResponse(client, request, bh2, async);
assertEquals(or.statusCode(), 200);
--- a/test/jdk/java/net/httpclient/ShortRequestBody.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/ShortRequestBody.java Tue Feb 06 16:07:43 2018 +0000
@@ -164,7 +164,7 @@
HttpRequest request = HttpRequest.newBuilder(uri)
.POST(publisher)
.build();
- cf = clientSupplier.get().sendAsync(request, discard(null));
+ cf = clientSupplier.get().sendAsync(request, discard());
HttpResponse<Void> resp = cf.get(30, TimeUnit.SECONDS);
err.println("Response code: " + resp.statusCode());
@@ -180,7 +180,7 @@
HttpRequest request = HttpRequest.newBuilder(uri)
.POST(publisher)
.build();
- cf = clientSupplier.get().sendAsync(request, discard(null));
+ cf = clientSupplier.get().sendAsync(request, discard());
try {
HttpResponse<Void> r = cf.get(30, TimeUnit.SECONDS);
@@ -207,7 +207,7 @@
.POST(publisher)
.build();
try {
- HttpResponse<Void> r = clientSupplier.get().send(request, discard(null));
+ HttpResponse<Void> r = clientSupplier.get().send(request, discard());
throw new RuntimeException("Unexpected response: " + r.statusCode());
} catch (HttpTimeoutException x) {
throw new RuntimeException("Unexpected timeout", x);
--- a/test/jdk/java/net/httpclient/SmallTimeout.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/SmallTimeout.java Tue Feb 06 16:07:43 2018 +0000
@@ -34,7 +34,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import static java.lang.System.out;
-import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
+import static jdk.incubator.http.HttpResponse.BodyHandler.replace;
/**
* @test
@@ -92,7 +92,7 @@
final HttpRequest req = requests[i];
CompletableFuture<HttpResponse<Object>> response = client
- .sendAsync(req, discard(null))
+ .sendAsync(req, replace(null))
.whenComplete((HttpResponse<Object> r, Throwable t) -> {
Throwable cause = null;
if (r != null) {
@@ -142,7 +142,7 @@
executor.execute(() -> {
Throwable cause = null;
try {
- client.send(req, discard(null));
+ client.send(req, replace(null));
} catch (HttpTimeoutException e) {
out.println("Caught expected timeout: " + e);
} catch (Throwable ee) {
--- a/test/jdk/java/net/httpclient/TimeoutBasic.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/TimeoutBasic.java Tue Feb 06 16:07:43 2018 +0000
@@ -152,7 +152,7 @@
if (request == null) continue;
count++;
try {
- HttpResponse<?> resp = client.sendAsync(request, discard(null)).join();
+ HttpResponse<?> resp = client.sendAsync(request, discard()).join();
out.println("Unexpected response for: " + request);
out.println("\t from " + ss.getLocalSocketAddress());
out.println("Response is: " + resp);
@@ -177,7 +177,7 @@
if (request == null) continue;
count++;
try {
- client.send(request, discard(null));
+ client.send(request, discard());
} catch (HttpTimeoutException e) {
out.println("Caught expected timeout: " + e);
}
--- a/test/jdk/java/net/httpclient/TimeoutOrdering.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/TimeoutOrdering.java Tue Feb 06 16:07:43 2018 +0000
@@ -34,7 +34,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import static java.lang.System.out;
-import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
+import static jdk.incubator.http.HttpResponse.BodyHandler.replace;
/**
* @test
@@ -74,7 +74,7 @@
final HttpRequest req = requests[i];
CompletableFuture<HttpResponse<Object>> response = client
- .sendAsync(req, discard(null))
+ .sendAsync(req, replace(null))
.whenComplete((HttpResponse<Object> r, Throwable t) -> {
if (r != null) {
out.println("Unexpected response: " + r);
@@ -115,7 +115,7 @@
final HttpRequest req = requests[i];
executor.execute(() -> {
try {
- client.send(req, discard(null));
+ client.send(req, replace(null));
} catch (HttpTimeoutException e) {
out.println("Caught expected timeout: " + e);
queue.offer(req);
--- a/test/jdk/java/net/httpclient/VersionTest.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/VersionTest.java Tue Feb 06 16:07:43 2018 +0000
@@ -98,7 +98,7 @@
.GET()
.build();
HttpClient c = proxy ? clientWithProxy : client;
- HttpResponse<Void> resp = c.send(r, discard(null));
+ HttpResponse<Void> resp = c.send(r, discard());
System.out.printf("Client: response is %d\n", resp.statusCode());
if (resp.version() != HTTP_1_1) {
throw new RuntimeException();
--- a/test/jdk/java/net/httpclient/ZeroRedirects.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/ZeroRedirects.java Tue Feb 06 16:07:43 2018 +0000
@@ -77,7 +77,7 @@
HttpRequest r = HttpRequest.newBuilder(uri)
.GET()
.build();
- HttpResponse<Void> resp = client.send(r, discard(null));
+ HttpResponse<Void> resp = client.send(r, discard());
System.out.printf("Client: response is %d\n", resp.statusCode());
if (resp.statusCode() != 200)
throw new RuntimeException();
--- a/test/jdk/java/net/httpclient/http2/ErrorTest.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/ErrorTest.java Tue Feb 06 16:07:43 2018 +0000
@@ -96,7 +96,7 @@
.build();
HttpResponse response;
try {
- response = client.send(req, discard(null));
+ response = client.send(req, discard());
throw new RuntimeException("Unexpected response: " + response);
} catch (IOException e) {
System.err.println("Caught Expected IOException: " + e);
--- a/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java Tue Feb 06 16:07:43 2018 +0000
@@ -86,8 +86,14 @@
throw new IllegalStateException("sendResponseHeaders must be called first");
}
try {
- waitForWindow(len);
- send(buf, offset, len, 0);
+ int max = conn.getMaxFrameSize();
+ while (len > 0) {
+ int n = len > max ? max : len;
+ waitForWindow(n);
+ send(buf, offset, n, 0);
+ offset += n;
+ len -= n;
+ }
} catch (InterruptedException ex) {
throw new IOException(ex);
}
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Tue Feb 06 16:07:43 2018 +0000
@@ -312,6 +312,10 @@
return (SettingsFrame)frame;
}
+ public int getMaxFrameSize() {
+ return clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE);
+ }
+
void run() throws Exception {
Http1InitialRequest upgrade = null;
if (!secure) {
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/internal/RawChannelTest.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/internal/RawChannelTest.java Tue Feb 06 16:07:43 2018 +0000
@@ -193,7 +193,7 @@
((WebSocketRequest)req).isWebSocket(true);
HttpClient client = HttpClient.newHttpClient();
try {
- HttpResponse<?> r = client.send(req, discard(null));
+ HttpResponse<?> r = client.send(req, discard());
r.body();
return ((HttpResponseImpl) r).rawChannel();
} finally {
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/internal/SelectorTest.java Tue Feb 06 15:34:08 2018 +0000
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/internal/SelectorTest.java Tue Feb 06 16:07:43 2018 +0000
@@ -153,7 +153,7 @@
// thus all ordinary procedures apply to it, e.g. it must be put into
// the cache
((HttpRequestImpl) req).isWebSocket(true);
- HttpResponse<?> r = defaultClient().send(req, discard(null));
+ HttpResponse<?> r = defaultClient().send(req, discard());
r.body();
return ((HttpResponseImpl) r).rawChannel();
}