# HG changeset patch # User chegar # Date 1513875531 0 # Node ID 4f830b447edf04fb4a52151a5ad44d9bb60723cd # Parent fcb5b835bf329911cc2044ac96c4f6fd6655b774 8193365: Improve interoperability between HTTP Client's BodyPublisher/BodySubscriber and Flow.Subscriber/Publisher Reviewed-by: dfuchs diff -r fcb5b835bf32 -r 4f830b447edf src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpRequest.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpRequest.java Thu Dec 21 10:26:03 2017 +0100 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpRequest.java Thu Dec 21 16:58:51 2017 +0000 @@ -622,6 +622,49 @@ public interface BodyPublisher extends Flow.Publisher { /** + * Returns a request body publisher whose body is retrieved from the + * given {@code Flow.Publisher}. The returned request body publisher + * has an unknown content length. + * + * @apiNote This method can be used as an adapter between {@code + * BodyPublisher} and {@code Flow.Publisher}, where the amount of + * request body that the publisher will publish is unknown. + * + * @param publisher the publisher responsible for publishing the body + * @return a BodyPublisher + */ + static BodyPublisher fromPublisher(Flow.Publisher publisher) { + return new RequestPublishers.PublisherAdapter(publisher, -1L); + } + + /** + * Returns a request body publisher whose body is retrieved from the + * given {@code Flow.Publisher}. The returned request body publisher + * has the given content length. + * + *

The given {@code contentLength} is a positive number, that + * represents the exact amount of bytes the {@code publisher} must + * publish. + * + * @apiNote This method can be used as an adapter between {@code + * BodyPublisher} and {@code Flow.Publisher}, where the amount of + * request body that the publisher will publish is known. + * + * @param publisher the publisher responsible for publishing the body + * @param contentLength a positive number representing the exact + * amount of bytes the publisher will publish + * @throws IllegalArgumentException if the content length is + * non-positive + * @return a BodyPublisher + */ + static BodyPublisher fromPublisher(Flow.Publisher publisher, + long contentLength) { + if (contentLength < 1) + throw new IllegalArgumentException("non-positive contentLength: " + contentLength); + return new RequestPublishers.PublisherAdapter(publisher, contentLength); + } + + /** * Returns a request body publisher whose body is the given {@code * String}, converted using the {@link StandardCharsets#UTF_8 UTF_8} * character set. diff -r fcb5b835bf32 -r 4f830b447edf src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Thu Dec 21 10:26:03 2017 +0100 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Thu Dec 21 16:58:51 2017 +0000 @@ -46,6 +46,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscriber; import java.util.function.Consumer; import java.util.function.Function; import javax.net.ssl.SSLParameters; @@ -329,6 +330,75 @@ public BodySubscriber apply(int statusCode, HttpHeaders responseHeaders); /** + * Returns a response body handler that returns a {@link BodySubscriber + * BodySubscriber}{@code } obtained from {@linkplain + * BodySubscriber#fromSubscriber(Subscriber)}, with the given + * {@code subscriber}. + * + *

The response body is not available through this, or the {@code + * HttpResponse} API, but instead all response body is forwarded to the + * given {@code subscriber}, which should make it available, if + * appropriate, through some other mechanism, e.g. an entry in a + * database, etc. + * + * @apiNote This method can be used as an adapter between {@code + * BodySubscriber} and {@code Flow.Subscriber}. + * + *

For example: + *

 {@code
+         *  TextSubscriber subscriber = new TextSubscriber();
+         *  HttpResponse response = client.sendAsync(request,
+         *      BodyHandler.fromSubscriber(subscriber)).join();
+         *  System.out.println(response.statusCode());
+         * }
+ * + * @param subscriber the subscriber + * @return a response body handler + */ + public static BodyHandler + fromSubscriber(Subscriber> subscriber) { + Objects.requireNonNull(subscriber); + return (status, headers) -> BodySubscriber.fromSubscriber(subscriber, + s -> null); + } + + /** + * Returns a response body handler that returns a {@link BodySubscriber + * BodySubscriber}{@code } obtained from {@link + * BodySubscriber#fromSubscriber(Subscriber, Function)}, with the + * given {@code subscriber} and {@code finisher} function. + * + *

The given {@code finisher} function is applied after the given + * subscriber's {@code onComplete} has been invoked. The {@code finisher} + * function is invoked with the given subscriber, and returns a value + * that is set as the response's body. + * + * @apiNote This method can be used as an adapter between {@code + * BodySubscriber} and {@code Flow.Subscriber}. + * + *

For example: + *

 {@code
+         * TextSubscriber subscriber = ...;  // accumulates bytes and transforms them into a String
+         * HttpResponse response = client.sendAsync(request,
+         *     BodyHandler.fromSubscriber(subscriber, TextSubscriber::getTextResult)).join();
+         * String text = response.body();
+         * }
+ * + * @param the type of the Subscriber + * @param the type of the response body + * @param subscriber the subscriber + * @param finisher a function to be applied after the subscriber has completed + * @return a response body handler + */ + public static >,T> BodyHandler + fromSubscriber(S subscriber, Function finisher) { + Objects.requireNonNull(subscriber); + Objects.requireNonNull(finisher); + return (status, headers) -> BodySubscriber.fromSubscriber(subscriber, + finisher); + } + + /** * Returns a response body handler which discards the response body and * uses the given value as a replacement for it. * @@ -595,6 +665,53 @@ public CompletionStage getBody(); /** + * Returns a body subscriber that forwards all response body to the + * given {@code Flow.Subscriber}. The {@linkplain #getBody()} completion + * stage} of the returned body subscriber completes after one of the + * given subscribers {@code onComplete} or {@code onError} has been + * invoked. + * + * @apiNote This method can be used as an adapter between {@code + * BodySubscriber} and {@code Flow.Subscriber}. + * + * @param the type of the Subscriber + * @param subscriber the subscriber + * @return a body subscriber + */ + public static >> BodySubscriber + fromSubscriber(S subscriber) { + return new ResponseSubscribers.SubscriberAdapter(subscriber, s -> null); + } + + /** + * Returns a body subscriber that forwards all response body to the + * given {@code Flow.Subscriber}. The {@linkplain #getBody()} completion + * stage} of the returned body subscriber completes after one of the + * given subscribers {@code onComplete} or {@code onError} has been + * invoked. + * + *

The given {@code finisher} function is applied after the given + * subscriber's {@code onComplete} has been invoked. The {@code finisher} + * function is invoked with the given subscriber, and returns a value + * that is set as the response's body. + * + * @apiNote This method can be used as an adapter between {@code + * BodySubscriber} and {@code Flow.Subscriber}. + * + * @param the type of the Subscriber + * @param the type of the response body + * @param subscriber the subscriber + * @param finisher a function to be applied after the subscriber has + * completed + * @return a body subscriber + */ + public static >,T> BodySubscriber + fromSubscriber(S subscriber, + Function finisher) { + return new ResponseSubscribers.SubscriberAdapter(subscriber, finisher); + } + + /** * Returns a body subscriber which stores the response body as a {@code * String} converted using the given {@code Charset}. * diff -r fcb5b835bf32 -r 4f830b447edf src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java Thu Dec 21 10:26:03 2017 +0100 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java Thu Dec 21 16:58:51 2017 +0000 @@ -46,6 +46,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Publisher; import java.util.function.Supplier; import jdk.incubator.http.HttpRequest.BodyPublisher; import jdk.incubator.http.internal.common.Utils; @@ -349,4 +350,26 @@ return -1; } } + + static final class PublisherAdapter implements BodyPublisher { + + private final Publisher publisher; + private final long contentLength; + + PublisherAdapter(Publisher publisher, + long contentLength) { + this.publisher = Objects.requireNonNull(publisher); + this.contentLength = contentLength; + } + + @Override + public final long contentLength() { + return contentLength; + } + + @Override + public final void subscribe(Flow.Subscriber subscriber) { + publisher.subscribe(subscriber); + } + } } diff -r fcb5b835bf32 -r 4f830b447edf src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Thu Dec 21 10:26:03 2017 +0100 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Thu Dec 21 16:58:51 2017 +0000 @@ -47,6 +47,8 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; @@ -558,4 +560,69 @@ return cf; } } + + /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber}. */ + static final class SubscriberAdapter>,R> + implements HttpResponse.BodySubscriber + { + private final CompletableFuture cf = new MinimalFuture<>(); + private final S subscriber; + private final Function finisher; + private volatile Subscription subscription; + + SubscriberAdapter(S subscriber, Function finisher) { + this.subscriber = Objects.requireNonNull(subscriber); + this.finisher = Objects.requireNonNull(finisher); + } + + @Override + public void onSubscribe(Subscription subscription) { + Objects.requireNonNull(subscription); + if (this.subscription != null) { + subscription.cancel(); + } else { + this.subscription = subscription; + subscriber.onSubscribe(subscription); + } + } + + @Override + public void onNext(List item) { + Objects.requireNonNull(item); + try { + subscriber.onNext(item); + } catch (Throwable throwable) { + subscription.cancel(); + onError(throwable); + } + } + + @Override + public void onError(Throwable throwable) { + Objects.requireNonNull(throwable); + try { + subscriber.onError(throwable); + } finally { + cf.completeExceptionally(throwable); + } + } + + @Override + public void onComplete() { + try { + subscriber.onComplete(); + } finally { + try { + cf.complete(finisher.apply(subscriber)); + } catch (Throwable throwable) { + cf.completeExceptionally(throwable); + } + } + } + + @Override + public CompletionStage getBody() { + return cf; + } + } } diff -r fcb5b835bf32 -r 4f830b447edf test/jdk/java/net/httpclient/FlowAdapterPublisherTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/jdk/java/net/httpclient/FlowAdapterPublisherTest.java Thu Dec 21 16:58:51 2017 +0000 @@ -0,0 +1,394 @@ +/* + * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.util.Arrays; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import com.sun.net.httpserver.HttpsConfigurator; +import com.sun.net.httpserver.HttpsServer; +import jdk.incubator.http.HttpClient; +import jdk.incubator.http.HttpRequest; +import jdk.incubator.http.HttpResponse; +import jdk.testlibrary.SimpleSSLContext; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import javax.net.ssl.SSLContext; +import static java.util.stream.Collectors.joining; +import static java.nio.charset.StandardCharsets.UTF_8; +import static jdk.incubator.http.HttpRequest.BodyPublisher.fromPublisher; +import static jdk.incubator.http.HttpResponse.BodyHandler.asString; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +/* + * @test + * @summary Basic tests for Flow adapter Publishers + * @modules java.base/sun.net.www.http + * jdk.incubator.httpclient/jdk.incubator.http.internal.common + * jdk.incubator.httpclient/jdk.incubator.http.internal.frame + * jdk.incubator.httpclient/jdk.incubator.http.internal.hpack + * java.logging + * jdk.httpserver + * @library /lib/testlibrary http2/server + * @build Http2TestServer + * @build jdk.testlibrary.SimpleSSLContext + * @run testng/othervm FlowAdapterPublisherTest + */ + +public class FlowAdapterPublisherTest { + + SSLContext sslContext; + HttpServer httpTestServer; // HTTP/1.1 [ 4 servers ] + HttpsServer httpsTestServer; // HTTPS/1.1 + Http2TestServer http2TestServer; // HTTP/2 ( h2c ) + Http2TestServer https2TestServer; // HTTP/2 ( h2 ) + String httpURI; + String httpsURI; + String http2URI; + String https2URI; + + @DataProvider(name = "uris") + public Object[][] variants() { + return new Object[][]{ + { httpURI }, + { httpsURI }, + { http2URI }, + { https2URI }, + }; + } + + static final Class NPE = NullPointerException.class; + static final Class IAE = IllegalArgumentException.class; + + @Test + public void testAPIExceptions() { + assertThrows(NPE, () -> fromPublisher(null)); + assertThrows(NPE, () -> fromPublisher(null, 1)); + assertThrows(IAE, () -> fromPublisher(new BBPublisher(), 0)); + assertThrows(IAE, () -> fromPublisher(new BBPublisher(), -1)); + assertThrows(IAE, () -> fromPublisher(new BBPublisher(), Long.MIN_VALUE)); + + Publisher publisher = fromPublisher(new BBPublisher()); + assertThrows(NPE, () -> publisher.subscribe(null)); + } + + // Flow.Publisher + + @Test(dataProvider = "uris") + void testByteBufferPublisherUnknownLength(String url) { + String[] body = new String[] { "You know ", "it's summer ", "in Ireland ", + "when the ", "rain gets ", "warmer." }; + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromPublisher(new BBPublisher(body))).build(); + + HttpResponse response = client.sendAsync(request, asString(UTF_8)).join(); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, Arrays.stream(body).collect(joining())); + } + + @Test(dataProvider = "uris") + void testByteBufferPublisherFixedLength(String url) { + String[] body = new String[] { "You know ", "it's summer ", "in Ireland ", + "when the ", "rain gets ", "warmer." }; + int cl = Arrays.stream(body).mapToInt(String::length).sum(); + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromPublisher(new BBPublisher(body), cl)).build(); + + HttpResponse response = client.sendAsync(request, asString(UTF_8)).join(); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, Arrays.stream(body).collect(joining())); + } + + // Flow.Publisher + + @Test(dataProvider = "uris") + void testMappedByteBufferPublisherUnknownLength(String url) { + String[] body = new String[] { "God invented ", "whiskey to ", "keep the ", + "Irish from ", "ruling the ", "world." }; + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromPublisher(new MBBPublisher(body))).build(); + + HttpResponse response = client.sendAsync(request, asString(UTF_8)).join(); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, Arrays.stream(body).collect(joining())); + } + + @Test(dataProvider = "uris") + void testMappedByteBufferPublisherFixedLength(String url) { + String[] body = new String[] { "God invented ", "whiskey to ", "keep the ", + "Irish from ", "ruling the ", "world." }; + int cl = Arrays.stream(body).mapToInt(String::length).sum(); + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromPublisher(new MBBPublisher(body), cl)).build(); + + HttpResponse response = client.sendAsync(request, asString(UTF_8)).join(); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, Arrays.stream(body).collect(joining())); + } + + // The following two tests depend on Exception detail messages, which is + // not ideal, but necessary to discern correct behavior. They should be + // updated if the exception message is updated. + + @Test(dataProvider = "uris") + void testPublishTooFew(String url) throws InterruptedException { + String[] body = new String[] { "You know ", "it's summer ", "in Ireland ", + "when the ", "rain gets ", "warmer." }; + int cl = Arrays.stream(body).mapToInt(String::length).sum() + 1; // length + 1 + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromPublisher(new BBPublisher(body), cl)).build(); + + try { + HttpResponse response = client.send(request, asString(UTF_8)); + fail("Unexpected response: " + response); + } catch (IOException expected) { + assertTrue(expected.getMessage().contains("Too few bytes returned"), + "Exception message:[" + expected.toString() + "]"); + } + } + + @Test(dataProvider = "uris") + void testPublishTooMany(String url) throws InterruptedException { + String[] body = new String[] { "You know ", "it's summer ", "in Ireland ", + "when the ", "rain gets ", "warmer." }; + int cl = Arrays.stream(body).mapToInt(String::length).sum() - 1; // length - 1 + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromPublisher(new BBPublisher(body), cl)).build(); + + try { + HttpResponse response = client.send(request, asString(UTF_8)); + fail("Unexpected response: " + response); + } catch (IOException expected) { + assertTrue(expected.getMessage().contains("Too many bytes in request body"), + "Exception message:[" + expected.toString() + "]"); + } + } + + static class BBPublisher extends AbstractPublisher + implements Flow.Publisher + { + BBPublisher(String... bodyParts) { super(bodyParts); } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + this.subscriber = subscriber; + subscriber.onSubscribe(new InternalSubscription()); + } + } + + static class MBBPublisher extends AbstractPublisher + implements Flow.Publisher + { + MBBPublisher(String... bodyParts) { super(bodyParts); } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + this.subscriber = subscriber; + subscriber.onSubscribe(new InternalSubscription()); + } + } + + static abstract class AbstractPublisher { + private final String[] bodyParts; + protected volatile Flow.Subscriber subscriber; + + AbstractPublisher(String... bodyParts) { + this.bodyParts = bodyParts; + } + + class InternalSubscription implements Flow.Subscription { + + private final AtomicLong demand = new AtomicLong(); + private final AtomicBoolean cancelled = new AtomicBoolean(); + private volatile int position; + + private static final int IDLE = 1; + private static final int PUSHING = 2; + private static final int AGAIN = 4; + private final AtomicInteger state = new AtomicInteger(IDLE); + + @Override + public void request(long n) { + if (n <= 0L) { + subscriber.onError(new IllegalArgumentException( + "non-positive subscription request")); + return; + } + if (cancelled.get()) { + return; + } + + while (true) { + long prev = demand.get(), d; + if ((d = prev + n) < prev) // saturate + d = Long.MAX_VALUE; + if (demand.compareAndSet(prev, d)) + break; + } + + while (true) { + int s = state.get(); + if (s == IDLE) { + if (state.compareAndSet(IDLE, PUSHING)) { + while (true) { + push(); + if (state.compareAndSet(PUSHING, IDLE)) + return; + else if (state.compareAndSet(AGAIN, PUSHING)) + continue; + } + } + } else if (s == PUSHING) { + if (state.compareAndSet(PUSHING, AGAIN)) + return; + } else if (s == AGAIN){ + // do nothing, the pusher will already rerun + return; + } else { + throw new AssertionError("Unknown state:" + s); + } + } + } + + private void push() { + long prev; + while ((prev = demand.get()) > 0) { + if (!demand.compareAndSet(prev, prev -1)) + continue; + + int index = position; + if (index < bodyParts.length) { + position++; + subscriber.onNext(ByteBuffer.wrap(bodyParts[index].getBytes(UTF_8))); + } + } + + if (position == bodyParts.length && !cancelled.get()) { + cancelled.set(true); + subscriber.onComplete(); + } + } + + @Override + public void cancel() { + if (cancelled.compareAndExchange(false, true)) + return; // already cancelled + } + } + } + + @BeforeTest + public void setup() throws Exception { + sslContext = new SimpleSSLContext().get(); + if (sslContext == null) + throw new AssertionError("Unexpected null sslContext"); + + InetSocketAddress sa = new InetSocketAddress("localhost", 0); + httpTestServer = HttpServer.create(sa, 0); + httpTestServer.createContext("/http1/echo", new Http1EchoHandler()); + httpURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/echo"; + + httpsTestServer = HttpsServer.create(sa, 0); + httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext)); + httpsTestServer.createContext("/https1/echo", new Http1EchoHandler()); + httpsURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/echo"; + + http2TestServer = new Http2TestServer("127.0.0.1", false, 0); + http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo"); + int port = http2TestServer.getAddress().getPort(); + http2URI = "http://127.0.0.1:" + port + "/http2/echo"; + + https2TestServer = new Http2TestServer("127.0.0.1", true, 0); + https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo"); + port = https2TestServer.getAddress().getPort(); + https2URI = "https://127.0.0.1:" + port + "/https2/echo"; + + httpTestServer.start(); + httpsTestServer.start(); + http2TestServer.start(); + https2TestServer.start(); + } + + @AfterTest + public void teardown() throws Exception { + httpTestServer.stop(0); + httpsTestServer.stop(0); + http2TestServer.stop(); + https2TestServer.stop(); + } + + static class Http1EchoHandler implements HttpHandler { + @Override + public void handle(HttpExchange t) throws IOException { + try (InputStream is = t.getRequestBody(); + OutputStream os = t.getResponseBody()) { + byte[] bytes = is.readAllBytes(); + t.sendResponseHeaders(200, bytes.length); + os.write(bytes); + } + } + } + + static class Http2EchoHandler implements Http2Handler { + @Override + public void handle(Http2TestExchange t) throws IOException { + try (InputStream is = t.getRequestBody(); + OutputStream os = t.getResponseBody()) { + byte[] bytes = is.readAllBytes(); + t.sendResponseHeaders(200, bytes.length); + os.write(bytes); + } + } + } +} diff -r fcb5b835bf32 -r 4f830b447edf test/jdk/java/net/httpclient/FlowAdapterSubscriberTest.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/jdk/java/net/httpclient/FlowAdapterSubscriberTest.java Thu Dec 21 16:58:51 2017 +0000 @@ -0,0 +1,500 @@ +/* + * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Subscriber; +import java.util.function.Function; +import java.util.function.Supplier; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import com.sun.net.httpserver.HttpsConfigurator; +import com.sun.net.httpserver.HttpsServer; +import jdk.incubator.http.HttpClient; +import jdk.incubator.http.HttpRequest; +import jdk.incubator.http.HttpResponse; +import jdk.incubator.http.HttpResponse.BodyHandler; +import jdk.incubator.http.HttpResponse.BodySubscriber; +import jdk.testlibrary.SimpleSSLContext; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import javax.net.ssl.SSLContext; +import static java.nio.charset.StandardCharsets.UTF_8; +import static jdk.incubator.http.HttpRequest.BodyPublisher.fromString; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; + +/* + * @test + * @summary Basic tests for Flow adapter Subscribers + * @modules java.base/sun.net.www.http + * jdk.incubator.httpclient/jdk.incubator.http.internal.common + * jdk.incubator.httpclient/jdk.incubator.http.internal.frame + * jdk.incubator.httpclient/jdk.incubator.http.internal.hpack + * java.logging + * jdk.httpserver + * @library /lib/testlibrary http2/server + * @build Http2TestServer + * @build jdk.testlibrary.SimpleSSLContext + * @run testng/othervm FlowAdapterSubscriberTest + */ + +public class FlowAdapterSubscriberTest { + + SSLContext sslContext; + HttpServer httpTestServer; // HTTP/1.1 [ 4 servers ] + HttpsServer httpsTestServer; // HTTPS/1.1 + Http2TestServer http2TestServer; // HTTP/2 ( h2c ) + Http2TestServer https2TestServer; // HTTP/2 ( h2 ) + String httpURI; + String httpsURI; + String http2URI; + String https2URI; + + @DataProvider(name = "uris") + public Object[][] variants() { + return new Object[][]{ + { httpURI }, + { httpsURI }, + { http2URI }, + { https2URI }, + }; + } + + static final Class NPE = NullPointerException.class; + + @Test + public void testNull() { + assertThrows(NPE, () -> BodyHandler.fromSubscriber(null)); + assertThrows(NPE, () -> BodyHandler.fromSubscriber(null, Function.identity())); + assertThrows(NPE, () -> BodyHandler.fromSubscriber(new ListSubscriber(), null)); + assertThrows(NPE, () -> BodyHandler.fromSubscriber(null, null)); + + assertThrows(NPE, () -> BodySubscriber.fromSubscriber(null)); + assertThrows(NPE, () -> BodySubscriber.fromSubscriber(null, Function.identity())); + assertThrows(NPE, () -> BodySubscriber.fromSubscriber(new ListSubscriber(), null)); + assertThrows(NPE, () -> BodySubscriber.fromSubscriber(null, null)); + + Subscriber subscriber = BodySubscriber.fromSubscriber(new ListSubscriber()); + assertThrows(NPE, () -> subscriber.onSubscribe(null)); + assertThrows(NPE, () -> subscriber.onNext(null)); + assertThrows(NPE, () -> subscriber.onError(null)); + } + + // List + + @Test(dataProvider = "uris") + void testListWithFinisher(String url) { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("May the luck of the Irish be with you!")).build(); + + ListSubscriber subscriber = new ListSubscriber(); + HttpResponse response = client.sendAsync(request, + BodyHandler.fromSubscriber(subscriber, Supplier::get)).join(); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, "May the luck of the Irish be with you!"); + } + + @Test(dataProvider = "uris") + void testListWithoutFinisher(String url) { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("May the luck of the Irish be with you!")).build(); + + ListSubscriber subscriber = new ListSubscriber(); + HttpResponse response = client.sendAsync(request, + BodyHandler.fromSubscriber(subscriber)).join(); + String text = subscriber.get(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, "May the luck of the Irish be with you!"); + } + + @Test(dataProvider = "uris") + void testListWithFinisherBlocking(String url) throws Exception { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("May the luck of the Irish be with you!")).build(); + + ListSubscriber subscriber = new ListSubscriber(); + HttpResponse response = client.send(request, + BodyHandler.fromSubscriber(subscriber, Supplier::get)); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, "May the luck of the Irish be with you!"); + } + + @Test(dataProvider = "uris") + void testListWithoutFinisherBlocking(String url) throws Exception { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("May the luck of the Irish be with you!")).build(); + + ListSubscriber subscriber = new ListSubscriber(); + HttpResponse response = client.send(request, + BodyHandler.fromSubscriber(subscriber)); + String text = subscriber.get(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, "May the luck of the Irish be with you!"); + } + + // Collection + + @Test(dataProvider = "uris") + void testCollectionWithFinisher(String url) { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("What's the craic?")).build(); + + CollectionSubscriber subscriber = new CollectionSubscriber(); + HttpResponse response = client.sendAsync(request, + BodyHandler.fromSubscriber(subscriber, CollectionSubscriber::get)).join(); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, "What's the craic?"); + } + + @Test(dataProvider = "uris") + void testCollectionWithoutFinisher(String url) { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("What's the craic?")).build(); + + CollectionSubscriber subscriber = new CollectionSubscriber(); + HttpResponse response = client.sendAsync(request, + BodyHandler.fromSubscriber(subscriber)).join(); + String text = subscriber.get(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, "What's the craic?"); + } + + @Test(dataProvider = "uris") + void testCollectionWithFinisherBlocking(String url) throws Exception { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("What's the craic?")).build(); + + CollectionSubscriber subscriber = new CollectionSubscriber(); + HttpResponse response = client.send(request, + BodyHandler.fromSubscriber(subscriber, CollectionSubscriber::get)); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, "What's the craic?"); + } + + @Test(dataProvider = "uris") + void testCollectionWithoutFinisheBlocking(String url) throws Exception { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("What's the craic?")).build(); + + CollectionSubscriber subscriber = new CollectionSubscriber(); + HttpResponse response = client.send(request, + BodyHandler.fromSubscriber(subscriber)); + String text = subscriber.get(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, "What's the craic?"); + } + + // Iterable + + @Test(dataProvider = "uris") + void testIterableWithFinisher(String url) { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("We're sucking diesel now!")).build(); + + IterableSubscriber subscriber = new IterableSubscriber(); + HttpResponse response = client.sendAsync(request, + BodyHandler.fromSubscriber(subscriber, Supplier::get)).join(); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, "We're sucking diesel now!"); + } + + @Test(dataProvider = "uris") + void testIterableWithoutFinisher(String url) { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("We're sucking diesel now!")).build(); + + IterableSubscriber subscriber = new IterableSubscriber(); + HttpResponse response = client.sendAsync(request, + BodyHandler.fromSubscriber(subscriber)).join(); + String text = subscriber.get(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, "We're sucking diesel now!"); + } + + @Test(dataProvider = "uris") + void testIterableWithFinisherBlocking(String url) throws Exception { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("We're sucking diesel now!")).build(); + + IterableSubscriber subscriber = new IterableSubscriber(); + HttpResponse response = client.send(request, + BodyHandler.fromSubscriber(subscriber, Supplier::get)); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, "We're sucking diesel now!"); + } + + @Test(dataProvider = "uris") + void testIterableWithoutFinisherBlocking(String url) throws Exception{ + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("We're sucking diesel now!")).build(); + + IterableSubscriber subscriber = new IterableSubscriber(); + HttpResponse response = client.send(request, + BodyHandler.fromSubscriber(subscriber)); + String text = subscriber.get(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertEquals(text, "We're sucking diesel now!"); + } + + // Subscriber + + @Test(dataProvider = "uris") + void testObjectWithFinisher(String url) { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("May the wind always be at your back.")).build(); + + ObjectSubscriber subscriber = new ObjectSubscriber(); + HttpResponse response = client.sendAsync(request, + BodyHandler.fromSubscriber(subscriber, ObjectSubscriber::get)).join(); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertTrue(text.length() != 0); // what else can be asserted! + } + + @Test(dataProvider = "uris") + void testObjectWithoutFinisher(String url) { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("May the wind always be at your back.")).build(); + + ObjectSubscriber subscriber = new ObjectSubscriber(); + HttpResponse response = client.sendAsync(request, + BodyHandler.fromSubscriber(subscriber)).join(); + String text = subscriber.get(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertTrue(text.length() != 0); // what else can be asserted! + } + + @Test(dataProvider = "uris") + void testObjectWithFinisherBlocking(String url) throws Exception { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("May the wind always be at your back.")).build(); + + ObjectSubscriber subscriber = new ObjectSubscriber(); + HttpResponse response = client.send(request, + BodyHandler.fromSubscriber(subscriber, ObjectSubscriber::get)); + String text = response.body(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertTrue(text.length() != 0); // what else can be asserted! + } + + @Test(dataProvider = "uris") + void testObjectWithoutFinisherBlocking(String url) throws Exception { + HttpClient client = HttpClient.newBuilder().sslContext(sslContext).build(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .POST(fromString("May the wind always be at your back.")).build(); + + ObjectSubscriber subscriber = new ObjectSubscriber(); + HttpResponse response = client.send(request, + BodyHandler.fromSubscriber(subscriber)); + String text = subscriber.get(); + System.out.println(text); + assertEquals(response.statusCode(), 200); + assertTrue(text.length() != 0); // what else can be asserted! + } + + /** An abstract Subscriber that converts all received data into a String. */ + static abstract class AbstractSubscriber implements Supplier { + protected volatile Flow.Subscription subscription; + protected volatile ByteArrayOutputStream baos = new ByteArrayOutputStream(); + protected volatile String text; + + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(Long.MAX_VALUE); + } + public void onError(Throwable throwable) { + throw new RuntimeException(throwable); + } + public void onComplete() { + text = new String(baos.toByteArray(), UTF_8); + } + @Override public String get() { return text; } + } + + static class ListSubscriber extends AbstractSubscriber + implements Flow.Subscriber>, Supplier + { + @Override public void onNext(List item) { + for (ByteBuffer bb : item) { + byte[] ba = new byte[bb.remaining()]; + bb.get(ba); + uncheckedWrite(baos, ba); + } + } + } + + static class CollectionSubscriber extends AbstractSubscriber + implements Flow.Subscriber>, Supplier + { + @Override public void onNext(Collection item) { + for (ByteBuffer bb : item) { + byte[] ba = new byte[bb.remaining()]; + bb.get(ba); + uncheckedWrite(baos, ba); + } + } + } + + static class IterableSubscriber extends AbstractSubscriber + implements Flow.Subscriber>, Supplier + { + @Override public void onNext(Iterable item) { + for (ByteBuffer bb : item) { + byte[] ba = new byte[bb.remaining()]; + bb.get(ba); + uncheckedWrite(baos, ba); + } + } + } + + static class ObjectSubscriber extends AbstractSubscriber + implements Flow.Subscriber, Supplier + { + @Override public void onNext(Object item) { + // What can anyone do with Object, cast or toString it ? + uncheckedWrite(baos, item.toString().getBytes(UTF_8)); + } + } + + static void uncheckedWrite(ByteArrayOutputStream baos, byte[] ba) { + try { + baos.write(ba); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @BeforeTest + public void setup() throws Exception { + sslContext = new SimpleSSLContext().get(); + if (sslContext == null) + throw new AssertionError("Unexpected null sslContext"); + + InetSocketAddress sa = new InetSocketAddress("localhost", 0); + httpTestServer = HttpServer.create(sa, 0); + httpTestServer.createContext("/http1/echo", new Http1EchoHandler()); + httpURI = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/echo"; + + httpsTestServer = HttpsServer.create(sa, 0); + httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext)); + httpsTestServer.createContext("/https1/echo", new Http1EchoHandler()); + httpsURI = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/echo"; + + http2TestServer = new Http2TestServer("127.0.0.1", false, 0); + http2TestServer.addHandler(new Http2EchoHandler(), "/http2/echo"); + int port = http2TestServer.getAddress().getPort(); + http2URI = "http://127.0.0.1:" + port + "/http2/echo"; + + https2TestServer = new Http2TestServer("127.0.0.1", true, 0); + https2TestServer.addHandler(new Http2EchoHandler(), "/https2/echo"); + port = https2TestServer.getAddress().getPort(); + https2URI = "https://127.0.0.1:" + port + "/https2/echo"; + + httpTestServer.start(); + httpsTestServer.start(); + http2TestServer.start(); + https2TestServer.start(); + } + + @AfterTest + public void teardown() throws Exception { + httpTestServer.stop(0); + httpsTestServer.stop(0); + http2TestServer.stop(); + https2TestServer.stop(); + } + + static class Http1EchoHandler implements HttpHandler { + @Override + public void handle(HttpExchange t) throws IOException { + try (InputStream is = t.getRequestBody(); + OutputStream os = t.getResponseBody()) { + byte[] bytes = is.readAllBytes(); + t.sendResponseHeaders(200, bytes.length); + os.write(bytes); + } + } + } + + static class Http2EchoHandler implements Http2Handler { + @Override + public void handle(Http2TestExchange t) throws IOException { + try (InputStream is = t.getRequestBody(); + OutputStream os = t.getResponseBody()) { + byte[] bytes = is.readAllBytes(); + t.sendResponseHeaders(200, bytes.length); + os.write(bytes); + } + } + } +} diff -r fcb5b835bf32 -r 4f830b447edf test/jdk/java/net/httpclient/FlowAdaptersCompileOnly.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/jdk/java/net/httpclient/FlowAdaptersCompileOnly.java Thu Dec 21 16:58:51 2017 +0000 @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Flow; +import java.util.function.Function; +import jdk.incubator.http.HttpRequest.BodyPublisher; +import jdk.incubator.http.HttpResponse.BodyHandler; +import jdk.incubator.http.HttpResponse.BodySubscriber; + +/* + * @test + * @summary Basic test for Flow adapters with generic type parameters + * @compile FlowAdaptersCompileOnly.java + */ + +public class FlowAdaptersCompileOnly { + + static void makesSureDifferentGenericSignaturesCompile() { + BodyPublisher.fromPublisher(new BBPublisher()); + BodyPublisher.fromPublisher(new MBBPublisher()); + + BodyHandler.fromSubscriber(new ListSubscriber()); + BodyHandler.fromSubscriber(new CollectionSubscriber()); + BodyHandler.fromSubscriber(new IterableSubscriber()); + BodyHandler.fromSubscriber(new ObjectSubscriber()); + + BodySubscriber.fromSubscriber(new ListSubscriber()); + BodySubscriber.fromSubscriber(new CollectionSubscriber()); + BodySubscriber.fromSubscriber(new IterableSubscriber()); + BodySubscriber.fromSubscriber(new ObjectSubscriber()); + + BodyPublisher.fromPublisher(new BBPublisher(), 1); + BodyPublisher.fromPublisher(new MBBPublisher(), 1); + + BodyHandler.fromSubscriber(new ListSubscriber(), Function.identity()); + BodyHandler.fromSubscriber(new CollectionSubscriber(), Function.identity()); + BodyHandler.fromSubscriber(new IterableSubscriber(), Function.identity()); + BodyHandler.fromSubscriber(new ObjectSubscriber(), Function.identity()); + + BodySubscriber.fromSubscriber(new ListSubscriber(), Function.identity()); + BodySubscriber.fromSubscriber(new CollectionSubscriber(), Function.identity()); + BodySubscriber.fromSubscriber(new IterableSubscriber(), Function.identity()); + BodySubscriber.fromSubscriber(new ObjectSubscriber(), Function.identity()); + } + + static class BBPublisher implements Flow.Publisher { + @Override + public void subscribe(Flow.Subscriber subscriber) { } + } + + static class MBBPublisher implements Flow.Publisher { + @Override + public void subscribe(Flow.Subscriber subscriber) { } + } + + static class ListSubscriber implements Flow.Subscriber> { + @Override public void onSubscribe(Flow.Subscription subscription) { } + @Override public void onNext(List item) { } + @Override public void onError(Throwable throwable) { } + @Override public void onComplete() { } + } + + static class CollectionSubscriber implements Flow.Subscriber> { + @Override public void onSubscribe(Flow.Subscription subscription) { } + @Override public void onNext(Collection item) { } + @Override public void onError(Throwable throwable) { } + @Override public void onComplete() { } + } + + static class IterableSubscriber implements Flow.Subscriber> { + @Override public void onSubscribe(Flow.Subscription subscription) { } + @Override public void onNext(Iterable item) { } + @Override public void onError(Throwable throwable) { } + @Override public void onComplete() { } + } + + static class ObjectSubscriber implements Flow.Subscriber { + @Override public void onSubscribe(Flow.Subscription subscription) { } + @Override public void onNext(Object item) { } + @Override public void onError(Throwable throwable) { } + @Override public void onComplete() { } + } +}