# HG changeset patch # User dfuchs # Date 1518708497 0 # Node ID be9720a556c7376c6cfac7c049eb60878f4263c0 # Parent c8a1eccbc719c3282921bfa0039c5de8bd49f887 http-client-branch: Add a test for Handlers/Subscribers as described in JDK-8165220 diff -r c8a1eccbc719 -r be9720a556c7 test/jdk/java/net/httpclient/HttpServerAdapters.java --- a/test/jdk/java/net/httpclient/HttpServerAdapters.java Thu Feb 15 14:10:27 2018 +0000 +++ b/test/jdk/java/net/httpclient/HttpServerAdapters.java Thu Feb 15 15:28:17 2018 +0000 @@ -258,6 +258,8 @@ } @Override public void sendResponseHeaders(int code, int contentLength) throws IOException { + if (contentLength == 0) contentLength = -1; + if (contentLength < 0) contentLength = 0; exchange.sendResponseHeaders(code, contentLength); } void doFilter(Filter.Chain filter) throws IOException { diff -r c8a1eccbc719 -r be9720a556c7 test/jdk/java/net/httpclient/ThrowingSubscribers.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/jdk/java/net/httpclient/ThrowingSubscribers.java Thu Feb 15 15:28:17 2018 +0000 @@ -0,0 +1,531 @@ +/* + * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * @test + * @summary Tests what happens when response body handlers and subscribers + * throw unexpected exceptions. + * @library /lib/testlibrary http2/server + * @build jdk.testlibrary.SimpleSSLContext HttpServerAdapters ThrowingSubscribers + * @modules java.base/sun.net.www.http + * java.net.http/jdk.internal.net.http.common + * java.net.http/jdk.internal.net.http.frame + * java.net.http/jdk.internal.net.http.hpack + * @run testng/othervm ThrowingSubscribers + */ + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import com.sun.net.httpserver.HttpsConfigurator; +import com.sun.net.httpserver.HttpsServer; +import jdk.testlibrary.SimpleSSLContext; +import org.testng.annotations.AfterTest; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import javax.net.ssl.SSLContext; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandler; +import java.net.http.HttpResponse.BodySubscriber; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.lang.System.out; +import static java.lang.String.format; +import static java.net.http.HttpResponse.BodySubscriber.asString; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class ThrowingSubscribers implements HttpServerAdapters { + + SSLContext sslContext; + HttpTestServer httpTestServer; // HTTP/1.1 [ 4 servers ] + HttpTestServer httpsTestServer; // HTTPS/1.1 + HttpTestServer http2TestServer; // HTTP/2 ( h2c ) + HttpTestServer https2TestServer; // HTTP/2 ( h2 ) + String httpURI_fixed; + String httpURI_chunk; + String httpsURI_fixed; + String httpsURI_chunk; + String http2URI_fixed; + String http2URI_chunk; + String https2URI_fixed; + String https2URI_chunk; + + static final int ITERATION_COUNT = 2; + // a shared executor helps reduce the amount of threads created by the test + static final Executor executor = new TestExecutor(Executors.newCachedThreadPool()); + static final ConcurrentMap FAILURES = new ConcurrentHashMap<>(); + static volatile boolean tasksFailed; + + static class TestExecutor implements Executor { + final AtomicLong tasks = new AtomicLong(); + Executor executor; + TestExecutor(Executor executor) { + this.executor = executor; + } + + @Override + public void execute(Runnable command) { + long id = tasks.incrementAndGet(); + executor.execute(() -> { + try { + command.run(); + } catch (Throwable t) { + tasksFailed = true; + System.out.printf("Task %s failed: %s%n", id, t); + System.err.printf("Task %s failed: %s%n", id, t); + FAILURES.putIfAbsent("Task " + id, t); + throw t; + } + }); + } + } + + @AfterClass + static final void printFailedTests() { + if (FAILURES.isEmpty()) return; + out.println("Failed tests: "); + FAILURES.entrySet().forEach((e) -> { + out.printf("\t%s: %s%n", e.getKey(), e.getValue()); + e.getValue().printStackTrace(); + }); + if (tasksFailed) { + throw new RuntimeException("Some tasks failed"); + } + } + + @DataProvider(name = "variants") + public Object[][] variants() { + return new Object[][]{ + { httpURI_fixed, false }, + { httpURI_chunk, false }, + { httpsURI_fixed, false }, + { httpsURI_chunk, false }, + { http2URI_fixed, false }, + { http2URI_chunk, false }, + { https2URI_fixed, false }, + { https2URI_chunk, false }, + + { httpURI_fixed, true }, + { httpURI_chunk, true }, + { httpsURI_fixed, true }, + { httpsURI_chunk, true }, + { http2URI_fixed, true }, + { http2URI_chunk, true }, + { https2URI_fixed, true }, + { https2URI_chunk, true }, + }; + } + + HttpClient newHttpClient() { + return HttpClient.newBuilder() + .executor(executor) + .sslContext(sslContext) + .build(); + } + + @Test(dataProvider = "variants") + public void testNoThrows(String uri, boolean sameClient) throws Exception { + HttpClient client = null; + for (int i=0; i< ITERATION_COUNT; i++) { + if (!sameClient || client == null) + client = newHttpClient(); + + HttpRequest req = HttpRequest.newBuilder(URI.create(uri)) + .build(); + BodyHandler handler = new ThrowingBodyHandler((w) -> {}, + BodyHandler.asString()); + HttpResponse response = client.send(req, handler); + String body = response.body(); + assertEquals(URI.create(body).getPath(), URI.create(uri).getPath()); + } + } + + @Test(dataProvider = "variants") + public void testThrowingAsString(String uri, boolean sameClient) throws Exception { + String test = format("testThrowingAsString(%s,%b)", uri, sameClient); + testThrowing(test, uri, sameClient, BodyHandler::asString, + this::shouldHaveThrown, false); + } + + @Test(dataProvider = "variants") + public void testThrowingAsLines(String uri, boolean sameClient) throws Exception { + String test = format("testThrowingAsLines(%s,%b)", uri, sameClient); + testThrowing(test, uri, sameClient, BodyHandler::asLines, + this::checkAsLines, false); + } + + @Test(dataProvider = "variants") + public void testThrowingAsInputStream(String uri, boolean sameClient) throws Exception { + String test = format("testThrowingAsInputStream(%s,%b)", uri, sameClient); + testThrowing(test, uri, sameClient, BodyHandler::asInputStream, + this::checkAsInputStream, false); + } + + @Test(dataProvider = "variants") + public void testThrowingAsStringAsync(String uri, boolean sameClient) throws Exception { + String test = format("testThrowingAsStringAsync(%s,%b)", uri, sameClient); + testThrowing(uri, sameClient, BodyHandler::asString, + this::shouldHaveThrown, true); + } + + @Test(dataProvider = "variants") + public void testThrowingAsLinesAsync(String uri, boolean sameClient) throws Exception { + String test = format("testThrowingAsLinesAsync(%s,%b)", uri, sameClient); + testThrowing(test, uri, sameClient, BodyHandler::asLines, + this::checkAsLines, true); + } + + @Test(dataProvider = "variants") + public void testThrowingAsInputStreamAsync(String uri, boolean sameClient) throws Exception { + String test = format("testThrowingAsInputStreamAsync(%s,%b)", uri, sameClient); + testThrowing(test, uri, sameClient, BodyHandler::asInputStream, + this::checkAsInputStream, true); + } + + private void testThrowing(String name, String uri, boolean sameClient, + Supplier> handlers, + Finisher finisher, boolean async) + throws Exception { + out.printf("%n%s%n", name); + try { + testThrowing(uri, sameClient, handlers, finisher, async); + } catch (Error | Exception x) { + FAILURES.putIfAbsent(name, x); + throw x; + } + } + + private void testThrowing(String uri, boolean sameClient, + Supplier> handlers, + Finisher finisher, boolean async) + throws Exception { + HttpClient client = null; + RuntimeExceptionThrower thrower = new RuntimeExceptionThrower(); + for (Where where : Where.values()) { + if (where == Where.ON_SUBSCRIBE) continue; + if (where == Where.ON_ERROR) continue; + if (where == Where.GET_BODY) continue; // doesn't work with HTTP/2 + if (!sameClient || client == null) + client = newHttpClient(); + + HttpRequest req = HttpRequest.newBuilder(URI.create(uri)) + .build(); + BodyHandler handler = new ThrowingBodyHandler(where.select(thrower), handlers.get()); + System.out.println("try throwing in " + where); + HttpResponse response = null; + if (async) { + try { + response = client.sendAsync(req, handler).join(); + } catch (Error | Exception x) { + UncheckedCustomException cause = findCause(x, + UncheckedCustomException.class::isInstance); + if (cause == null) throw x; + System.out.println("Got expected exception: " + cause); + } + } else { + try { + response = client.send(req, handler); + } catch (UncheckedCustomException t) { + System.out.println("Got expected exception: " + t); + } + } + if (response != null) { + finisher.finish(where, response); + } + } + } + + enum Where {BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY; + public Consumer select(Consumer consumer) { + return new Consumer() { + @Override + public void accept(Where where) { + if (Where.this == where) { + consumer.accept(where); + } + } + }; + } + } + + interface Finisher { + U finish(Where w, HttpResponse resp) throws IOException; + } + + U shouldHaveThrown(Where w, HttpResponse resp) { + throw new RuntimeException("Expected exception not thrown in " + w); + } + + List checkAsLines(Where w, HttpResponse> resp) { + switch(w) { + case BODY_HANDLER: return shouldHaveThrown(w, resp); + case ON_SUBSCRIBE: return shouldHaveThrown(w, resp); + case GET_BODY: return shouldHaveThrown(w, resp); + default: break; + } + List result = null; + try { + result = resp.body().collect(Collectors.toList()); + } catch (Error | Exception x) { + UncheckedCustomException cause = + findCause(x, UncheckedCustomException.class::isInstance); + if (cause != null) { + out.println("Got expected exception in " + w + ": " + x); + return result; + } + throw x; + } + throw new RuntimeException("Expected exception not thrown in " + w); + } + + List checkAsInputStream(Where w, HttpResponse resp) + throws IOException + { + switch(w) { + case BODY_HANDLER: return shouldHaveThrown(w, resp); + case ON_SUBSCRIBE: return shouldHaveThrown(w, resp); + case GET_BODY: return shouldHaveThrown(w, resp); + default: break; + } + List result = null; + try (InputStreamReader r1 = new InputStreamReader(resp.body(), UTF_8); + BufferedReader r = new BufferedReader(r1)) { + try { + result = r.lines().collect(Collectors.toList()); + } catch (Error | Exception x) { + UncheckedCustomException cause = + findCause(x, UncheckedCustomException.class::isInstance); + if (cause != null) { + out.println("Got expected exception in " + w + ": " + x); + return result; + } + throw x; + } + } + throw new RuntimeException("Expected exception not thrown in " + w); + } + + private static E findCause(Throwable x, + Predicate filter) { + while (x != null && !filter.test(x)) x = x.getCause(); + return (E)x; + } + + static class RuntimeExceptionThrower implements Consumer { + @Override + public void accept(Where where) { + throw new UncheckedCustomException(where.name()); + } + } + + static class UncheckedCustomException extends RuntimeException { + UncheckedCustomException(String message) { + super(message); + } + UncheckedCustomException(String message, Throwable cause) { + super(message, cause); + } + } + + static class ThrowingBodyHandler implements BodyHandler { + final Consumer throwing; + final BodyHandler bodyHandler; + ThrowingBodyHandler(Consumer throwing, BodyHandler bodyHandler) { + this.throwing = throwing; + this.bodyHandler = bodyHandler; + } + @Override + public BodySubscriber apply(int statusCode, HttpHeaders responseHeaders) { + throwing.accept(Where.BODY_HANDLER); + BodySubscriber subscriber = bodyHandler.apply(statusCode, responseHeaders); + return new ThrowingBodySubscriber(throwing, subscriber); + } + } + + static class ThrowingBodySubscriber implements BodySubscriber { + private final BodySubscriber subscriber; + volatile boolean onSubscribeCalled; + final Consumer throwing; + ThrowingBodySubscriber(Consumer throwing, BodySubscriber subscriber) { + this.throwing = throwing; + this.subscriber = subscriber; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + //out.println("onSubscribe "); + onSubscribeCalled = true; + throwing.accept(Where.ON_SUBSCRIBE); + subscriber.onSubscribe(subscription); + } + + @Override + public void onNext(List item) { + // out.println("onNext " + item); + assertTrue(onSubscribeCalled); + throwing.accept(Where.ON_NEXT); + subscriber.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + //out.println("onError"); + assertTrue(onSubscribeCalled); + throwing.accept(Where.ON_ERROR); + subscriber.onError(throwable); + } + + @Override + public void onComplete() { + //out.println("onComplete"); + assertTrue(onSubscribeCalled, "onComplete called before onSubscribe"); + throwing.accept(Where.ON_COMPLETE); + subscriber.onComplete(); + } + + @Override + public CompletionStage getBody() { + throwing.accept(Where.GET_BODY); + return subscriber.getBody(); + } + } + + + @BeforeTest + public void setup() throws Exception { + sslContext = new SimpleSSLContext().get(); + if (sslContext == null) + throw new AssertionError("Unexpected null sslContext"); + + // HTTP/1.1 + HttpTestHandler h1_fixedLengthHandler = new HTTP_FixedLengthHandler(); + HttpTestHandler h1_chunkHandler = new HTTP_ChunkedHandler(); + InetSocketAddress sa = new InetSocketAddress(0); + httpTestServer = HttpTestServer.of(HttpServer.create(sa, 0)); + httpTestServer.addHandler(h1_fixedLengthHandler, "/http1/fixed"); + httpTestServer.addHandler(h1_chunkHandler, "/http1/chunk"); + httpURI_fixed = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/fixed/x"; + httpURI_chunk = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/chunk/x"; + + HttpsServer httpsServer = HttpsServer.create(sa, 0); + httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext)); + httpsTestServer = HttpTestServer.of(httpsServer); + httpsTestServer.addHandler(h1_fixedLengthHandler, "/https1/fixed"); + httpsTestServer.addHandler(h1_chunkHandler, "/https1/chunk"); + httpsURI_fixed = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/fixed/x"; + httpsURI_chunk = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/chunk/x"; + + // HTTP/2 + HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler(); + HttpTestHandler h2_chunkedHandler = new HTTP_ChunkedHandler(); + + http2TestServer = HttpTestServer.of(new Http2TestServer("127.0.0.1", false, 0)); + http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed"); + http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk"); + int port = http2TestServer.getAddress().getPort(); + http2URI_fixed = "http://127.0.0.1:" + port + "/http2/fixed/x"; + http2URI_chunk = "http://127.0.0.1:" + port + "/http2/chunk/x"; + + https2TestServer = HttpTestServer.of(new Http2TestServer("127.0.0.1", true, 0)); + https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed"); + https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk"); + port = https2TestServer.getAddress().getPort(); + https2URI_fixed = "https://127.0.0.1:" + port + "/https2/fixed/x"; + https2URI_chunk = "https://127.0.0.1:" + port + "/https2/chunk/x"; + + httpTestServer.start(); + httpsTestServer.start(); + http2TestServer.start(); + https2TestServer.start(); + } + + @AfterTest + public void teardown() throws Exception { + httpTestServer.stop(); + httpsTestServer.stop(); + http2TestServer.stop(); + https2TestServer.stop(); + } + + static class HTTP_FixedLengthHandler implements HttpTestHandler { + @Override + public void handle(HttpTestExchange t) throws IOException { + out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI()); + try (InputStream is = t.getRequestBody()) { + is.readAllBytes(); + } + byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8); + t.sendResponseHeaders(200, resp.length); //fixed content length + try (OutputStream os = t.getResponseBody()) { + os.write(resp); + } + } + } + + static class HTTP_ChunkedHandler implements HttpTestHandler { + @Override + public void handle(HttpTestExchange t) throws IOException { + out.println("HTTP_ChunkedHandler received request to " + t.getRequestURI()); + byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8); + try (InputStream is = t.getRequestBody()) { + is.readAllBytes(); + } + t.sendResponseHeaders(200, -1); // chunked/variable + try (OutputStream os = t.getResponseBody()) { + os.write(resp); + } + } + } + +}