test/jdk/java/net/httpclient/ThrowingPushPromises.java
branchhttp-client-branch
changeset 56268 481d8c9acc7f
child 56399 a0929d5dd63f
equal deleted inserted replaced
56267:fe6f17faa23a 56268:481d8c9acc7f
       
     1 /*
       
     2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 /*
       
    25  * @test
       
    26  * @summary Tests what happens when push promise handlers and their
       
    27  *          response body handlers and subscribers throw unexpected exceptions.
       
    28  * @library /lib/testlibrary http2/server
       
    29  * @build jdk.testlibrary.SimpleSSLContext HttpServerAdapters ThrowingPushPromises
       
    30  * @modules java.base/sun.net.www.http
       
    31  *          java.net.http/jdk.internal.net.http.common
       
    32  *          java.net.http/jdk.internal.net.http.frame
       
    33  *          java.net.http/jdk.internal.net.http.hpack
       
    34  * @run testng/othervm -Djdk.internal.httpclient.debug=true ThrowingPushPromises
       
    35  */
       
    36 
       
    37 import jdk.internal.net.http.common.HttpHeadersImpl;
       
    38 import jdk.testlibrary.SimpleSSLContext;
       
    39 import org.testng.annotations.AfterTest;
       
    40 import org.testng.annotations.AfterClass;
       
    41 import org.testng.annotations.BeforeTest;
       
    42 import org.testng.annotations.DataProvider;
       
    43 import org.testng.annotations.Test;
       
    44 
       
    45 import javax.net.ssl.SSLContext;
       
    46 import java.io.BufferedReader;
       
    47 import java.io.IOException;
       
    48 import java.io.InputStream;
       
    49 import java.io.InputStreamReader;
       
    50 import java.io.OutputStream;
       
    51 import java.io.UncheckedIOException;
       
    52 import java.net.URI;
       
    53 import java.net.URISyntaxException;
       
    54 import java.net.http.HttpClient;
       
    55 import java.net.http.HttpHeaders;
       
    56 import java.net.http.HttpRequest;
       
    57 import java.net.http.HttpResponse;
       
    58 import java.net.http.HttpResponse.BodyHandler;
       
    59 import java.net.http.HttpResponse.BodyHandlers;
       
    60 import java.net.http.HttpResponse.BodySubscriber;
       
    61 import java.net.http.HttpResponse.PushPromiseHandler;
       
    62 import java.nio.ByteBuffer;
       
    63 import java.nio.charset.StandardCharsets;
       
    64 import java.util.List;
       
    65 import java.util.Map;
       
    66 import java.util.concurrent.CompletableFuture;
       
    67 import java.util.concurrent.CompletionException;
       
    68 import java.util.concurrent.CompletionStage;
       
    69 import java.util.concurrent.ConcurrentHashMap;
       
    70 import java.util.concurrent.ConcurrentMap;
       
    71 import java.util.concurrent.Executor;
       
    72 import java.util.concurrent.Executors;
       
    73 import java.util.concurrent.Flow;
       
    74 import java.util.concurrent.atomic.AtomicLong;
       
    75 import java.util.function.Consumer;
       
    76 import java.util.function.Function;
       
    77 import java.util.function.Predicate;
       
    78 import java.util.function.Supplier;
       
    79 import java.util.stream.Collectors;
       
    80 import java.util.stream.Stream;
       
    81 
       
    82 import static java.lang.System.out;
       
    83 import static java.lang.System.err;
       
    84 import static java.lang.String.format;
       
    85 import static java.nio.charset.StandardCharsets.UTF_8;
       
    86 import static org.testng.Assert.assertEquals;
       
    87 import static org.testng.Assert.assertTrue;
       
    88 
       
    89 public class ThrowingPushPromises implements HttpServerAdapters {
       
    90 
       
    91     SSLContext sslContext;
       
    92     HttpTestServer http2TestServer;   // HTTP/2 ( h2c )
       
    93     HttpTestServer https2TestServer;  // HTTP/2 ( h2  )
       
    94     String http2URI_fixed;
       
    95     String http2URI_chunk;
       
    96     String https2URI_fixed;
       
    97     String https2URI_chunk;
       
    98 
       
    99     static final int ITERATION_COUNT = 1;
       
   100     // a shared executor helps reduce the amount of threads created by the test
       
   101     static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
       
   102     static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
       
   103     static volatile boolean tasksFailed;
       
   104     static final AtomicLong serverCount = new AtomicLong();
       
   105     static final AtomicLong clientCount = new AtomicLong();
       
   106     static final long start = System.nanoTime();
       
   107     public static String now() {
       
   108         long now = System.nanoTime() - start;
       
   109         long secs = now / 1000_000_000;
       
   110         long mill = (now % 1000_000_000) / 1000_000;
       
   111         long nan = now % 1000_000;
       
   112         return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);
       
   113     }
       
   114 
       
   115     private volatile HttpClient sharedClient;
       
   116 
       
   117     static class TestExecutor implements Executor {
       
   118         final AtomicLong tasks = new AtomicLong();
       
   119         Executor executor;
       
   120         TestExecutor(Executor executor) {
       
   121             this.executor = executor;
       
   122         }
       
   123 
       
   124         @Override
       
   125         public void execute(Runnable command) {
       
   126             long id = tasks.incrementAndGet();
       
   127             executor.execute(() -> {
       
   128                 try {
       
   129                     command.run();
       
   130                 } catch (Throwable t) {
       
   131                     tasksFailed = true;
       
   132                     out.printf(now() + "Task %s failed: %s%n", id, t);
       
   133                     err.printf(now() + "Task %s failed: %s%n", id, t);
       
   134                     FAILURES.putIfAbsent("Task " + id, t);
       
   135                     throw t;
       
   136                 }
       
   137             });
       
   138         }
       
   139     }
       
   140 
       
   141     @AfterClass
       
   142     static final void printFailedTests() {
       
   143         out.println("\n=========================");
       
   144         try {
       
   145             out.printf("%n%sCreated %d servers and %d clients%n",
       
   146                     now(), serverCount.get(), clientCount.get());
       
   147             if (FAILURES.isEmpty()) return;
       
   148             out.println("Failed tests: ");
       
   149             FAILURES.entrySet().forEach((e) -> {
       
   150                 out.printf("\t%s: %s%n", e.getKey(), e.getValue());
       
   151                 e.getValue().printStackTrace(out);
       
   152                 e.getValue().printStackTrace();
       
   153             });
       
   154             if (tasksFailed) {
       
   155                 out.println("WARNING: Some tasks failed");
       
   156             }
       
   157         } finally {
       
   158             out.println("\n=========================\n");
       
   159         }
       
   160     }
       
   161 
       
   162     private String[] uris() {
       
   163         return new String[] {
       
   164                 http2URI_fixed,
       
   165                 http2URI_chunk,
       
   166                 https2URI_fixed,
       
   167                 https2URI_chunk,
       
   168         };
       
   169     }
       
   170 
       
   171     @DataProvider(name = "noThrows")
       
   172     public Object[][] noThrows() {
       
   173         String[] uris = uris();
       
   174         Object[][] result = new Object[uris.length * 2][];
       
   175 
       
   176         int i = 0;
       
   177         for (boolean sameClient : List.of(false, true)) {
       
   178             for (String uri: uris()) {
       
   179                 result[i++] = new Object[] {uri, sameClient};
       
   180             }
       
   181         }
       
   182         assert i == uris.length * 2;
       
   183         return result;
       
   184     }
       
   185 
       
   186     @DataProvider(name = "variants")
       
   187     public Object[][] variants() {
       
   188         String[] uris = uris();
       
   189         Object[][] result = new Object[uris.length * 2 * 2][];
       
   190         int i = 0;
       
   191         for (Thrower thrower : List.of(
       
   192                 new UncheckedIOExceptionThrower(),
       
   193                 new UncheckedCustomExceptionThrower())) {
       
   194             for (boolean sameClient : List.of(false, true)) {
       
   195                 for (String uri : uris()) {
       
   196                     result[i++] = new Object[]{uri, sameClient, thrower};
       
   197                 }
       
   198             }
       
   199         }
       
   200         assert i == uris.length * 2 * 2;
       
   201         return result;
       
   202     }
       
   203 
       
   204     private HttpClient makeNewClient() {
       
   205         clientCount.incrementAndGet();
       
   206         return HttpClient.newBuilder()
       
   207                 .executor(executor)
       
   208                 .sslContext(sslContext)
       
   209                 .build();
       
   210     }
       
   211 
       
   212     HttpClient newHttpClient(boolean share) {
       
   213         if (!share) return makeNewClient();
       
   214         HttpClient shared = sharedClient;
       
   215         if (shared != null) return shared;
       
   216         synchronized (this) {
       
   217             shared = sharedClient;
       
   218             if (shared == null) {
       
   219                 shared = sharedClient = makeNewClient();
       
   220             }
       
   221             return shared;
       
   222         }
       
   223     }
       
   224 
       
   225     @Test(dataProvider = "noThrows")
       
   226     public void testNoThrows(String uri, boolean sameClient)
       
   227             throws Exception {
       
   228         HttpClient client = null;
       
   229         out.printf("%ntestNoThrows(%s, %b)%n", uri, sameClient);
       
   230         for (int i=0; i< ITERATION_COUNT; i++) {
       
   231             if (!sameClient || client == null)
       
   232                 client = newHttpClient(sameClient);
       
   233 
       
   234             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
       
   235                     .build();
       
   236             BodyHandler<Stream<String>> handler =
       
   237                     new ThrowingBodyHandler((w) -> {},
       
   238                                             BodyHandlers.ofLines());
       
   239             Map<HttpRequest, CompletableFuture<HttpResponse<Stream<String>>>> pushPromises =
       
   240                     new ConcurrentHashMap<>();
       
   241             PushPromiseHandler<Stream<String>> pushHandler = new PushPromiseHandler<>() {
       
   242                 @Override
       
   243                 public void applyPushPromise(HttpRequest initiatingRequest,
       
   244                                              HttpRequest pushPromiseRequest,
       
   245                                              Function<BodyHandler<Stream<String>>,
       
   246                                                      CompletableFuture<HttpResponse<Stream<String>>>>
       
   247                                                      acceptor) {
       
   248                     pushPromises.putIfAbsent(pushPromiseRequest, acceptor.apply(handler));
       
   249                 }
       
   250             };
       
   251             HttpResponse<Stream<String>> response =
       
   252                     client.sendAsync(req, BodyHandlers.ofLines(), pushHandler).get();
       
   253             String body = response.body().collect(Collectors.joining("|"));
       
   254             assertEquals(URI.create(body).getPath(), URI.create(uri).getPath());
       
   255             for (HttpRequest promised : pushPromises.keySet()) {
       
   256                 out.printf("%s Received promise: %s%n\tresponse: %s%n",
       
   257                         now(), promised, pushPromises.get(promised).get());
       
   258                 String promisedBody = pushPromises.get(promised).get().body()
       
   259                         .collect(Collectors.joining("|"));
       
   260                 assertEquals(promisedBody, promised.uri().toASCIIString());
       
   261             }
       
   262             assertEquals(3, pushPromises.size());
       
   263         }
       
   264     }
       
   265 
       
   266     @Test(dataProvider = "variants")
       
   267     public void testThrowingAsString(String uri,
       
   268                                      boolean sameClient,
       
   269                                      Thrower thrower)
       
   270             throws Exception
       
   271     {
       
   272         String test = format("testThrowingAsString(%s, %b, %s)",
       
   273                              uri, sameClient, thrower);
       
   274         testThrowing(test, uri, sameClient, BodyHandlers::ofString,
       
   275                 this::checkAsString, thrower);
       
   276     }
       
   277 
       
   278     @Test(dataProvider = "variants")
       
   279     public void testThrowingAsLines(String uri,
       
   280                                     boolean sameClient,
       
   281                                     Thrower thrower)
       
   282             throws Exception
       
   283     {
       
   284         String test =  format("testThrowingAsLines(%s, %b, %s)",
       
   285                 uri, sameClient, thrower);
       
   286         testThrowing(test, uri, sameClient, BodyHandlers::ofLines,
       
   287                 this::checkAsLines, thrower);
       
   288     }
       
   289 
       
   290     @Test(dataProvider = "variants")
       
   291     public void testThrowingAsInputStream(String uri,
       
   292                                           boolean sameClient,
       
   293                                           Thrower thrower)
       
   294             throws Exception
       
   295     {
       
   296         String test = format("testThrowingAsInputStream(%s, %b, %s)",
       
   297                 uri, sameClient, thrower);
       
   298         testThrowing(test, uri, sameClient, BodyHandlers::ofInputStream,
       
   299                 this::checkAsInputStream,  thrower);
       
   300     }
       
   301 
       
   302     private <T,U> void testThrowing(String name, String uri, boolean sameClient,
       
   303                                     Supplier<BodyHandler<T>> handlers,
       
   304                                     Finisher finisher, Thrower thrower)
       
   305             throws Exception
       
   306     {
       
   307         out.printf("%n%s%s%n", now(), name);
       
   308         try {
       
   309             testThrowing(uri, sameClient, handlers, finisher, thrower);
       
   310         } catch (Error | Exception x) {
       
   311             FAILURES.putIfAbsent(name, x);
       
   312             throw x;
       
   313         }
       
   314     }
       
   315 
       
   316     private <T,U> void testThrowing(String uri, boolean sameClient,
       
   317                                     Supplier<BodyHandler<T>> handlers,
       
   318                                     Finisher finisher, Thrower thrower)
       
   319             throws Exception
       
   320     {
       
   321         HttpClient client = null;
       
   322         for (Where where : Where.values()) {
       
   323             if (where == Where.ON_ERROR) continue;
       
   324             if (!sameClient || client == null)
       
   325                 client = newHttpClient(sameClient);
       
   326 
       
   327             HttpRequest req = HttpRequest.
       
   328                     newBuilder(URI.create(uri))
       
   329                     .build();
       
   330             ConcurrentMap<HttpRequest, CompletableFuture<HttpResponse<T>>> promiseMap =
       
   331                     new ConcurrentHashMap<>();
       
   332             Supplier<BodyHandler<T>> throwing = () ->
       
   333                     new ThrowingBodyHandler(where.select(thrower), handlers.get());
       
   334             PushPromiseHandler<T> pushHandler = new ThrowingPromiseHandler<>(
       
   335                     where.select(thrower),
       
   336                     PushPromiseHandler.of((r) -> throwing.get(), promiseMap));
       
   337             out.println("try throwing in " + where);
       
   338             HttpResponse<T> response = null;
       
   339             try {
       
   340                 response = client.sendAsync(req, handlers.get(), pushHandler).join();
       
   341             } catch (Error | Exception x) {
       
   342                 throw x;
       
   343             }
       
   344             if (response != null) {
       
   345                 finisher.finish(where, req.uri(), response, thrower, promiseMap);
       
   346             }
       
   347         }
       
   348     }
       
   349 
       
   350     enum Where {
       
   351         BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY, BODY_CF,
       
   352         BEFORE_ACCEPTING, AFTER_ACCEPTING;
       
   353         public Consumer<Where> select(Consumer<Where> consumer) {
       
   354             return new Consumer<Where>() {
       
   355                 @Override
       
   356                 public void accept(Where where) {
       
   357                     if (Where.this == where) {
       
   358                         consumer.accept(where);
       
   359                     }
       
   360                 }
       
   361             };
       
   362         }
       
   363     }
       
   364 
       
   365     interface Thrower extends Consumer<Where>, Predicate<Throwable> {
       
   366 
       
   367     }
       
   368 
       
   369     interface Finisher<T,U> {
       
   370         U finish(Where w, URI requestURI, HttpResponse<T> resp, Thrower thrower,
       
   371                  Map<HttpRequest, CompletableFuture<HttpResponse<T>>> promises);
       
   372     }
       
   373 
       
   374     final <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {
       
   375         throw new RuntimeException("Expected exception not thrown in " + w);
       
   376     }
       
   377 
       
   378     final List<String> checkAsString(Where w, URI reqURI,
       
   379                                     HttpResponse<String> resp,
       
   380                                     Thrower thrower,
       
   381                                     Map<HttpRequest, CompletableFuture<HttpResponse<String>>> promises) {
       
   382         Function<HttpResponse<String>, List<String>> extractor =
       
   383                 (r) -> List.of(r.body());
       
   384         return check(w, reqURI, resp, thrower, promises, extractor);
       
   385     }
       
   386 
       
   387     final List<String> checkAsLines(Where w, URI reqURI,
       
   388                                     HttpResponse<Stream<String>> resp,
       
   389                                     Thrower thrower,
       
   390                                     Map<HttpRequest, CompletableFuture<HttpResponse<Stream<String>>>> promises) {
       
   391         Function<HttpResponse<Stream<String>>, List<String>> extractor =
       
   392                 (r) -> r.body().collect(Collectors.toList());
       
   393         return check(w, reqURI, resp, thrower, promises, extractor);
       
   394     }
       
   395 
       
   396     final List<String> checkAsInputStream(Where w, URI reqURI,
       
   397                                           HttpResponse<InputStream> resp,
       
   398                                           Thrower thrower,
       
   399                                           Map<HttpRequest, CompletableFuture<HttpResponse<InputStream>>> promises)
       
   400     {
       
   401         Function<HttpResponse<InputStream>, List<String>> extractor = (r) -> {
       
   402             List<String> result;
       
   403             try (InputStream is = r.body()) {
       
   404                 result = new BufferedReader(new InputStreamReader(is))
       
   405                         .lines().collect(Collectors.toList());
       
   406             } catch (Throwable t) {
       
   407                 throw new CompletionException(t);
       
   408             }
       
   409             return result;
       
   410         };
       
   411         return check(w, reqURI, resp, thrower, promises, extractor);
       
   412     }
       
   413 
       
   414     private final <T> List<String> check(Where w, URI reqURI,
       
   415                                  HttpResponse<T> resp,
       
   416                                  Thrower thrower,
       
   417                                  Map<HttpRequest, CompletableFuture<HttpResponse<T>>> promises,
       
   418                                  Function<HttpResponse<T>, List<String>> extractor)
       
   419     {
       
   420         List<String> result = extractor.apply(resp);
       
   421         for (HttpRequest req : promises.keySet()) {
       
   422             switch (w) {
       
   423                 case BEFORE_ACCEPTING:
       
   424                     throw new RuntimeException("No push promise should have been received" +
       
   425                             " for " + reqURI + " in " + w + ": got " + promises.keySet());
       
   426                 default:
       
   427                     break;
       
   428             }
       
   429             HttpResponse<T> presp;
       
   430             try {
       
   431                 presp = promises.get(req).join();
       
   432             } catch (Error | Exception x) {
       
   433                 Throwable cause = findCause(x, thrower);
       
   434                 if (cause != null) {
       
   435                     out.println(now() + "Got expected exception in "
       
   436                             + w + ": " + cause);
       
   437                     continue;
       
   438                 }
       
   439                 throw x;
       
   440             }
       
   441             switch (w) {
       
   442                 case BEFORE_ACCEPTING:
       
   443                 case AFTER_ACCEPTING:
       
   444                 case BODY_HANDLER:
       
   445                 case ON_SUBSCRIBE:
       
   446                 case GET_BODY:
       
   447                 case BODY_CF:
       
   448                     return shouldHaveThrown(w, presp, thrower);
       
   449                 default:
       
   450                     break;
       
   451             }
       
   452             List<String> presult = null;
       
   453             try {
       
   454                 presult = extractor.apply(presp);
       
   455             } catch (Error | Exception x) {
       
   456                 Throwable cause = findCause(x, thrower);
       
   457                 if (cause != null) {
       
   458                     out.println(now() + "Got expected exception for "
       
   459                             + req + " in " + w + ": " + cause);
       
   460                     continue;
       
   461                 }
       
   462                 throw x;
       
   463             }
       
   464             throw new RuntimeException("Expected exception not thrown for "
       
   465                     + req + " in " + w);
       
   466         }
       
   467         final int expectedCount;
       
   468         switch (w) {
       
   469             case BEFORE_ACCEPTING:
       
   470                 expectedCount = 0;
       
   471                 break;
       
   472             default:
       
   473                 expectedCount = 3;
       
   474         }
       
   475         assertEquals(promises.size(), expectedCount,
       
   476                 "bad promise count for " + reqURI + " with " + w);
       
   477         assertEquals(result, List.of(reqURI.toASCIIString()));
       
   478         return result;
       
   479     }
       
   480 
       
   481     private static Throwable findCause(Throwable x,
       
   482                                        Predicate<Throwable> filter) {
       
   483         while (x != null && !filter.test(x)) x = x.getCause();
       
   484         return x;
       
   485     }
       
   486 
       
   487     static final class UncheckedCustomExceptionThrower implements Thrower {
       
   488         @Override
       
   489         public void accept(Where where) {
       
   490             out.println(now() + "Throwing in " + where);
       
   491             throw new UncheckedCustomException(where.name());
       
   492         }
       
   493 
       
   494         @Override
       
   495         public boolean test(Throwable throwable) {
       
   496             return UncheckedCustomException.class.isInstance(throwable);
       
   497         }
       
   498 
       
   499         @Override
       
   500         public String toString() {
       
   501             return "UncheckedCustomExceptionThrower";
       
   502         }
       
   503     }
       
   504 
       
   505     static final class UncheckedIOExceptionThrower implements Thrower {
       
   506         @Override
       
   507         public void accept(Where where) {
       
   508             out.println(now() + "Throwing in " + where);
       
   509             throw new UncheckedIOException(new CustomIOException(where.name()));
       
   510         }
       
   511 
       
   512         @Override
       
   513         public boolean test(Throwable throwable) {
       
   514             return UncheckedIOException.class.isInstance(throwable)
       
   515                     && CustomIOException.class.isInstance(throwable.getCause());
       
   516         }
       
   517 
       
   518         @Override
       
   519         public String toString() {
       
   520             return "UncheckedIOExceptionThrower";
       
   521         }
       
   522     }
       
   523 
       
   524     static final class UncheckedCustomException extends RuntimeException {
       
   525         UncheckedCustomException(String message) {
       
   526             super(message);
       
   527         }
       
   528         UncheckedCustomException(String message, Throwable cause) {
       
   529             super(message, cause);
       
   530         }
       
   531     }
       
   532 
       
   533     static final class CustomIOException extends IOException {
       
   534         CustomIOException(String message) {
       
   535             super(message);
       
   536         }
       
   537         CustomIOException(String message, Throwable cause) {
       
   538             super(message, cause);
       
   539         }
       
   540     }
       
   541 
       
   542     static final class ThrowingPromiseHandler<T> implements PushPromiseHandler<T> {
       
   543         final Consumer<Where> throwing;
       
   544         final PushPromiseHandler<T> pushHandler;
       
   545         ThrowingPromiseHandler(Consumer<Where> throwing, PushPromiseHandler<T> pushHandler) {
       
   546             this.throwing = throwing;
       
   547             this.pushHandler = pushHandler;
       
   548         }
       
   549 
       
   550         @Override
       
   551         public void applyPushPromise(HttpRequest initiatingRequest,
       
   552                                      HttpRequest pushPromiseRequest,
       
   553                                      Function<BodyHandler<T>,
       
   554                                              CompletableFuture<HttpResponse<T>>> acceptor) {
       
   555             throwing.accept(Where.BEFORE_ACCEPTING);
       
   556             pushHandler.applyPushPromise(initiatingRequest, pushPromiseRequest, acceptor);
       
   557             throwing.accept(Where.AFTER_ACCEPTING);
       
   558         }
       
   559     }
       
   560 
       
   561     static final class ThrowingBodyHandler<T> implements BodyHandler<T> {
       
   562         final Consumer<Where> throwing;
       
   563         final BodyHandler<T> bodyHandler;
       
   564         ThrowingBodyHandler(Consumer<Where> throwing, BodyHandler<T> bodyHandler) {
       
   565             this.throwing = throwing;
       
   566             this.bodyHandler = bodyHandler;
       
   567         }
       
   568         @Override
       
   569         public BodySubscriber<T> apply(int statusCode, HttpHeaders responseHeaders) {
       
   570             throwing.accept(Where.BODY_HANDLER);
       
   571             BodySubscriber<T> subscriber = bodyHandler.apply(statusCode, responseHeaders);
       
   572             return new ThrowingBodySubscriber(throwing, subscriber);
       
   573         }
       
   574     }
       
   575 
       
   576     static final class ThrowingBodySubscriber<T> implements BodySubscriber<T> {
       
   577         private final BodySubscriber<T> subscriber;
       
   578         volatile boolean onSubscribeCalled;
       
   579         final Consumer<Where> throwing;
       
   580         ThrowingBodySubscriber(Consumer<Where> throwing, BodySubscriber<T> subscriber) {
       
   581             this.throwing = throwing;
       
   582             this.subscriber = subscriber;
       
   583         }
       
   584 
       
   585         @Override
       
   586         public void onSubscribe(Flow.Subscription subscription) {
       
   587             //out.println("onSubscribe ");
       
   588             onSubscribeCalled = true;
       
   589             throwing.accept(Where.ON_SUBSCRIBE);
       
   590             subscriber.onSubscribe(subscription);
       
   591         }
       
   592 
       
   593         @Override
       
   594         public void onNext(List<ByteBuffer> item) {
       
   595            // out.println("onNext " + item);
       
   596             assertTrue(onSubscribeCalled);
       
   597             throwing.accept(Where.ON_NEXT);
       
   598             subscriber.onNext(item);
       
   599         }
       
   600 
       
   601         @Override
       
   602         public void onError(Throwable throwable) {
       
   603             //out.println("onError");
       
   604             assertTrue(onSubscribeCalled);
       
   605             throwing.accept(Where.ON_ERROR);
       
   606             subscriber.onError(throwable);
       
   607         }
       
   608 
       
   609         @Override
       
   610         public void onComplete() {
       
   611             //out.println("onComplete");
       
   612             assertTrue(onSubscribeCalled, "onComplete called before onSubscribe");
       
   613             throwing.accept(Where.ON_COMPLETE);
       
   614             subscriber.onComplete();
       
   615         }
       
   616 
       
   617         @Override
       
   618         public CompletionStage<T> getBody() {
       
   619             throwing.accept(Where.GET_BODY);
       
   620             try {
       
   621                 throwing.accept(Where.BODY_CF);
       
   622             } catch (Throwable t) {
       
   623                 return CompletableFuture.failedFuture(t);
       
   624             }
       
   625             return subscriber.getBody();
       
   626         }
       
   627     }
       
   628 
       
   629 
       
   630     @BeforeTest
       
   631     public void setup() throws Exception {
       
   632         sslContext = new SimpleSSLContext().get();
       
   633         if (sslContext == null)
       
   634             throw new AssertionError("Unexpected null sslContext");
       
   635 
       
   636         // HTTP/2
       
   637         HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler();
       
   638         HttpTestHandler h2_chunkedHandler = new HTTP_ChunkedHandler();
       
   639 
       
   640         http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
       
   641         http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
       
   642         http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
       
   643         http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed/x";
       
   644         http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk/x";
       
   645 
       
   646         https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, 0));
       
   647         https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
       
   648         https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
       
   649         https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed/x";
       
   650         https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk/x";
       
   651 
       
   652         serverCount.addAndGet(2);
       
   653         http2TestServer.start();
       
   654         https2TestServer.start();
       
   655     }
       
   656 
       
   657     @AfterTest
       
   658     public void teardown() throws Exception {
       
   659         sharedClient = null;
       
   660         http2TestServer.stop();
       
   661         https2TestServer.stop();
       
   662     }
       
   663 
       
   664     private static void pushPromiseFor(HttpTestExchange t, URI requestURI, String pushPath, boolean fixed)
       
   665             throws IOException
       
   666     {
       
   667         try {
       
   668             URI promise = new URI(requestURI.getScheme(),
       
   669                     requestURI.getAuthority(),
       
   670                     pushPath, null, null);
       
   671             byte[] promiseBytes = promise.toASCIIString().getBytes(UTF_8);
       
   672             out.printf("TestServer: %s Pushing promise: %s%n", now(), promise);
       
   673             err.printf("TestServer: %s Pushing promise: %s%n", now(), promise);
       
   674             HttpTestHeaders headers =  HttpTestHeaders.of(new HttpHeadersImpl());
       
   675             if (fixed) {
       
   676                 headers.addHeader("Content-length", String.valueOf(promiseBytes.length));
       
   677             }
       
   678             t.serverPush(promise, headers, promiseBytes);
       
   679         } catch (URISyntaxException x) {
       
   680             throw new IOException(x.getMessage(), x);
       
   681         }
       
   682     }
       
   683 
       
   684     static class HTTP_FixedLengthHandler implements HttpTestHandler {
       
   685         @Override
       
   686         public void handle(HttpTestExchange t) throws IOException {
       
   687             out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI());
       
   688             try (InputStream is = t.getRequestBody()) {
       
   689                 is.readAllBytes();
       
   690             }
       
   691             URI requestURI = t.getRequestURI();
       
   692             for (int i = 1; i<2; i++) {
       
   693                 String path = requestURI.getPath() + "/before/promise-" + i;
       
   694                 pushPromiseFor(t, requestURI, path, true);
       
   695             }
       
   696             byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8);
       
   697             t.sendResponseHeaders(200, resp.length);  //fixed content length
       
   698             try (OutputStream os = t.getResponseBody()) {
       
   699                 int bytes = resp.length/3;
       
   700                 for (int i = 0; i<2; i++) {
       
   701                     String path = requestURI.getPath() + "/after/promise-" + (i + 2);
       
   702                     os.write(resp, i * bytes, bytes);
       
   703                     os.flush();
       
   704                     pushPromiseFor(t, requestURI, path, true);
       
   705                 }
       
   706                 os.write(resp, 2*bytes, resp.length - 2*bytes);
       
   707             }
       
   708         }
       
   709 
       
   710     }
       
   711 
       
   712     static class HTTP_ChunkedHandler implements HttpTestHandler {
       
   713         @Override
       
   714         public void handle(HttpTestExchange t) throws IOException {
       
   715             out.println("HTTP_ChunkedHandler received request to " + t.getRequestURI());
       
   716             byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8);
       
   717             try (InputStream is = t.getRequestBody()) {
       
   718                 is.readAllBytes();
       
   719             }
       
   720             URI requestURI = t.getRequestURI();
       
   721             for (int i = 1; i<2; i++) {
       
   722                 String path = requestURI.getPath() + "/before/promise-" + i;
       
   723                 pushPromiseFor(t, requestURI, path, false);
       
   724             }
       
   725             t.sendResponseHeaders(200, -1); // chunked/variable
       
   726             try (OutputStream os = t.getResponseBody()) {
       
   727                 int bytes = resp.length/3;
       
   728                 for (int i = 0; i<2; i++) {
       
   729                     String path = requestURI.getPath() + "/after/promise-" + (i + 2);
       
   730                     os.write(resp, i * bytes, bytes);
       
   731                     os.flush();
       
   732                     pushPromiseFor(t, requestURI, path, false);
       
   733                 }
       
   734                 os.write(resp, 2*bytes, resp.length - 2*bytes);
       
   735             }
       
   736         }
       
   737     }
       
   738 
       
   739 }