test/jdk/java/net/httpclient/ThrowingPushPromises.java
changeset 49765 ee6f7a61f3a5
child 56451 9585061fdb04
equal deleted inserted replaced
49707:f7fd051519ac 49765:ee6f7a61f3a5
       
     1 /*
       
     2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 /*
       
    25  * @test
       
    26  * @summary Tests what happens when push promise handlers and their
       
    27  *          response body handlers and subscribers throw unexpected exceptions.
       
    28  * @library /lib/testlibrary http2/server
       
    29  * @build jdk.testlibrary.SimpleSSLContext HttpServerAdapters
       
    30   *       ReferenceTracker ThrowingPushPromises
       
    31  * @modules java.base/sun.net.www.http
       
    32  *          java.net.http/jdk.internal.net.http.common
       
    33  *          java.net.http/jdk.internal.net.http.frame
       
    34  *          java.net.http/jdk.internal.net.http.hpack
       
    35  * @run testng/othervm -Djdk.internal.httpclient.debug=true ThrowingPushPromises
       
    36  */
       
    37 
       
    38 import jdk.internal.net.http.common.HttpHeadersImpl;
       
    39 import jdk.testlibrary.SimpleSSLContext;
       
    40 import org.testng.annotations.AfterTest;
       
    41 import org.testng.annotations.AfterClass;
       
    42 import org.testng.annotations.BeforeTest;
       
    43 import org.testng.annotations.DataProvider;
       
    44 import org.testng.annotations.Test;
       
    45 
       
    46 import javax.net.ssl.SSLContext;
       
    47 import java.io.BufferedReader;
       
    48 import java.io.IOException;
       
    49 import java.io.InputStream;
       
    50 import java.io.InputStreamReader;
       
    51 import java.io.OutputStream;
       
    52 import java.io.UncheckedIOException;
       
    53 import java.net.URI;
       
    54 import java.net.URISyntaxException;
       
    55 import java.net.http.HttpClient;
       
    56 import java.net.http.HttpRequest;
       
    57 import java.net.http.HttpResponse;
       
    58 import java.net.http.HttpResponse.BodyHandler;
       
    59 import java.net.http.HttpResponse.BodyHandlers;
       
    60 import java.net.http.HttpResponse.BodySubscriber;
       
    61 import java.net.http.HttpResponse.PushPromiseHandler;
       
    62 import java.nio.ByteBuffer;
       
    63 import java.nio.charset.StandardCharsets;
       
    64 import java.util.List;
       
    65 import java.util.Map;
       
    66 import java.util.concurrent.CompletableFuture;
       
    67 import java.util.concurrent.CompletionException;
       
    68 import java.util.concurrent.CompletionStage;
       
    69 import java.util.concurrent.ConcurrentHashMap;
       
    70 import java.util.concurrent.ConcurrentMap;
       
    71 import java.util.concurrent.Executor;
       
    72 import java.util.concurrent.Executors;
       
    73 import java.util.concurrent.Flow;
       
    74 import java.util.concurrent.atomic.AtomicLong;
       
    75 import java.util.function.Consumer;
       
    76 import java.util.function.Function;
       
    77 import java.util.function.Predicate;
       
    78 import java.util.function.Supplier;
       
    79 import java.util.stream.Collectors;
       
    80 import java.util.stream.Stream;
       
    81 
       
    82 import static java.lang.System.out;
       
    83 import static java.lang.System.err;
       
    84 import static java.lang.String.format;
       
    85 import static java.nio.charset.StandardCharsets.UTF_8;
       
    86 import static org.testng.Assert.assertEquals;
       
    87 import static org.testng.Assert.assertTrue;
       
    88 
       
    89 public class ThrowingPushPromises implements HttpServerAdapters {
       
    90 
       
    91     SSLContext sslContext;
       
    92     HttpTestServer http2TestServer;   // HTTP/2 ( h2c )
       
    93     HttpTestServer https2TestServer;  // HTTP/2 ( h2  )
       
    94     String http2URI_fixed;
       
    95     String http2URI_chunk;
       
    96     String https2URI_fixed;
       
    97     String https2URI_chunk;
       
    98 
       
    99     static final int ITERATION_COUNT = 1;
       
   100     // a shared executor helps reduce the amount of threads created by the test
       
   101     static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
       
   102     static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
       
   103     static volatile boolean tasksFailed;
       
   104     static final AtomicLong serverCount = new AtomicLong();
       
   105     static final AtomicLong clientCount = new AtomicLong();
       
   106     static final long start = System.nanoTime();
       
   107     public static String now() {
       
   108         long now = System.nanoTime() - start;
       
   109         long secs = now / 1000_000_000;
       
   110         long mill = (now % 1000_000_000) / 1000_000;
       
   111         long nan = now % 1000_000;
       
   112         return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);
       
   113     }
       
   114 
       
   115     final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
       
   116     private volatile HttpClient sharedClient;
       
   117 
       
   118     static class TestExecutor implements Executor {
       
   119         final AtomicLong tasks = new AtomicLong();
       
   120         Executor executor;
       
   121         TestExecutor(Executor executor) {
       
   122             this.executor = executor;
       
   123         }
       
   124 
       
   125         @Override
       
   126         public void execute(Runnable command) {
       
   127             long id = tasks.incrementAndGet();
       
   128             executor.execute(() -> {
       
   129                 try {
       
   130                     command.run();
       
   131                 } catch (Throwable t) {
       
   132                     tasksFailed = true;
       
   133                     out.printf(now() + "Task %s failed: %s%n", id, t);
       
   134                     err.printf(now() + "Task %s failed: %s%n", id, t);
       
   135                     FAILURES.putIfAbsent("Task " + id, t);
       
   136                     throw t;
       
   137                 }
       
   138             });
       
   139         }
       
   140     }
       
   141 
       
   142     @AfterClass
       
   143     static final void printFailedTests() {
       
   144         out.println("\n=========================");
       
   145         try {
       
   146             out.printf("%n%sCreated %d servers and %d clients%n",
       
   147                     now(), serverCount.get(), clientCount.get());
       
   148             if (FAILURES.isEmpty()) return;
       
   149             out.println("Failed tests: ");
       
   150             FAILURES.entrySet().forEach((e) -> {
       
   151                 out.printf("\t%s: %s%n", e.getKey(), e.getValue());
       
   152                 e.getValue().printStackTrace(out);
       
   153                 e.getValue().printStackTrace();
       
   154             });
       
   155             if (tasksFailed) {
       
   156                 out.println("WARNING: Some tasks failed");
       
   157             }
       
   158         } finally {
       
   159             out.println("\n=========================\n");
       
   160         }
       
   161     }
       
   162 
       
   163     private String[] uris() {
       
   164         return new String[] {
       
   165                 http2URI_fixed,
       
   166                 http2URI_chunk,
       
   167                 https2URI_fixed,
       
   168                 https2URI_chunk,
       
   169         };
       
   170     }
       
   171 
       
   172     @DataProvider(name = "noThrows")
       
   173     public Object[][] noThrows() {
       
   174         String[] uris = uris();
       
   175         Object[][] result = new Object[uris.length * 2][];
       
   176 
       
   177         int i = 0;
       
   178         for (boolean sameClient : List.of(false, true)) {
       
   179             for (String uri: uris()) {
       
   180                 result[i++] = new Object[] {uri, sameClient};
       
   181             }
       
   182         }
       
   183         assert i == uris.length * 2;
       
   184         return result;
       
   185     }
       
   186 
       
   187     @DataProvider(name = "variants")
       
   188     public Object[][] variants() {
       
   189         String[] uris = uris();
       
   190         Object[][] result = new Object[uris.length * 2 * 2][];
       
   191         int i = 0;
       
   192         for (Thrower thrower : List.of(
       
   193                 new UncheckedIOExceptionThrower(),
       
   194                 new UncheckedCustomExceptionThrower())) {
       
   195             for (boolean sameClient : List.of(false, true)) {
       
   196                 for (String uri : uris()) {
       
   197                     result[i++] = new Object[]{uri, sameClient, thrower};
       
   198                 }
       
   199             }
       
   200         }
       
   201         assert i == uris.length * 2 * 2;
       
   202         return result;
       
   203     }
       
   204 
       
   205     private HttpClient makeNewClient() {
       
   206         clientCount.incrementAndGet();
       
   207         return TRACKER.track(HttpClient.newBuilder()
       
   208                 .proxy(HttpClient.Builder.NO_PROXY)
       
   209                 .executor(executor)
       
   210                 .sslContext(sslContext)
       
   211                 .build());
       
   212     }
       
   213 
       
   214     HttpClient newHttpClient(boolean share) {
       
   215         if (!share) return makeNewClient();
       
   216         HttpClient shared = sharedClient;
       
   217         if (shared != null) return shared;
       
   218         synchronized (this) {
       
   219             shared = sharedClient;
       
   220             if (shared == null) {
       
   221                 shared = sharedClient = makeNewClient();
       
   222             }
       
   223             return shared;
       
   224         }
       
   225     }
       
   226 
       
   227     @Test(dataProvider = "noThrows")
       
   228     public void testNoThrows(String uri, boolean sameClient)
       
   229             throws Exception {
       
   230         HttpClient client = null;
       
   231         out.printf("%ntestNoThrows(%s, %b)%n", uri, sameClient);
       
   232         for (int i=0; i< ITERATION_COUNT; i++) {
       
   233             if (!sameClient || client == null)
       
   234                 client = newHttpClient(sameClient);
       
   235 
       
   236             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
       
   237                     .build();
       
   238             BodyHandler<Stream<String>> handler =
       
   239                     new ThrowingBodyHandler((w) -> {},
       
   240                                             BodyHandlers.ofLines());
       
   241             Map<HttpRequest, CompletableFuture<HttpResponse<Stream<String>>>> pushPromises =
       
   242                     new ConcurrentHashMap<>();
       
   243             PushPromiseHandler<Stream<String>> pushHandler = new PushPromiseHandler<>() {
       
   244                 @Override
       
   245                 public void applyPushPromise(HttpRequest initiatingRequest,
       
   246                                              HttpRequest pushPromiseRequest,
       
   247                                              Function<BodyHandler<Stream<String>>,
       
   248                                                      CompletableFuture<HttpResponse<Stream<String>>>>
       
   249                                                      acceptor) {
       
   250                     pushPromises.putIfAbsent(pushPromiseRequest, acceptor.apply(handler));
       
   251                 }
       
   252             };
       
   253             HttpResponse<Stream<String>> response =
       
   254                     client.sendAsync(req, BodyHandlers.ofLines(), pushHandler).get();
       
   255             String body = response.body().collect(Collectors.joining("|"));
       
   256             assertEquals(URI.create(body).getPath(), URI.create(uri).getPath());
       
   257             for (HttpRequest promised : pushPromises.keySet()) {
       
   258                 out.printf("%s Received promise: %s%n\tresponse: %s%n",
       
   259                         now(), promised, pushPromises.get(promised).get());
       
   260                 String promisedBody = pushPromises.get(promised).get().body()
       
   261                         .collect(Collectors.joining("|"));
       
   262                 assertEquals(promisedBody, promised.uri().toASCIIString());
       
   263             }
       
   264             assertEquals(3, pushPromises.size());
       
   265         }
       
   266     }
       
   267 
       
   268     @Test(dataProvider = "variants")
       
   269     public void testThrowingAsString(String uri,
       
   270                                      boolean sameClient,
       
   271                                      Thrower thrower)
       
   272             throws Exception
       
   273     {
       
   274         String test = format("testThrowingAsString(%s, %b, %s)",
       
   275                              uri, sameClient, thrower);
       
   276         testThrowing(test, uri, sameClient, BodyHandlers::ofString,
       
   277                 this::checkAsString, thrower);
       
   278     }
       
   279 
       
   280     @Test(dataProvider = "variants")
       
   281     public void testThrowingAsLines(String uri,
       
   282                                     boolean sameClient,
       
   283                                     Thrower thrower)
       
   284             throws Exception
       
   285     {
       
   286         String test =  format("testThrowingAsLines(%s, %b, %s)",
       
   287                 uri, sameClient, thrower);
       
   288         testThrowing(test, uri, sameClient, BodyHandlers::ofLines,
       
   289                 this::checkAsLines, thrower);
       
   290     }
       
   291 
       
   292     @Test(dataProvider = "variants")
       
   293     public void testThrowingAsInputStream(String uri,
       
   294                                           boolean sameClient,
       
   295                                           Thrower thrower)
       
   296             throws Exception
       
   297     {
       
   298         String test = format("testThrowingAsInputStream(%s, %b, %s)",
       
   299                 uri, sameClient, thrower);
       
   300         testThrowing(test, uri, sameClient, BodyHandlers::ofInputStream,
       
   301                 this::checkAsInputStream,  thrower);
       
   302     }
       
   303 
       
   304     private <T,U> void testThrowing(String name, String uri, boolean sameClient,
       
   305                                     Supplier<BodyHandler<T>> handlers,
       
   306                                     Finisher finisher, Thrower thrower)
       
   307             throws Exception
       
   308     {
       
   309         out.printf("%n%s%s%n", now(), name);
       
   310         try {
       
   311             testThrowing(uri, sameClient, handlers, finisher, thrower);
       
   312         } catch (Error | Exception x) {
       
   313             FAILURES.putIfAbsent(name, x);
       
   314             throw x;
       
   315         }
       
   316     }
       
   317 
       
   318     private <T,U> void testThrowing(String uri, boolean sameClient,
       
   319                                     Supplier<BodyHandler<T>> handlers,
       
   320                                     Finisher finisher, Thrower thrower)
       
   321             throws Exception
       
   322     {
       
   323         HttpClient client = null;
       
   324         for (Where where : Where.values()) {
       
   325             if (where == Where.ON_ERROR) continue;
       
   326             if (!sameClient || client == null)
       
   327                 client = newHttpClient(sameClient);
       
   328 
       
   329             HttpRequest req = HttpRequest.
       
   330                     newBuilder(URI.create(uri))
       
   331                     .build();
       
   332             ConcurrentMap<HttpRequest, CompletableFuture<HttpResponse<T>>> promiseMap =
       
   333                     new ConcurrentHashMap<>();
       
   334             Supplier<BodyHandler<T>> throwing = () ->
       
   335                     new ThrowingBodyHandler(where.select(thrower), handlers.get());
       
   336             PushPromiseHandler<T> pushHandler = new ThrowingPromiseHandler<>(
       
   337                     where.select(thrower),
       
   338                     PushPromiseHandler.of((r) -> throwing.get(), promiseMap));
       
   339             out.println("try throwing in " + where);
       
   340             HttpResponse<T> response = null;
       
   341             try {
       
   342                 response = client.sendAsync(req, handlers.get(), pushHandler).join();
       
   343             } catch (Error | Exception x) {
       
   344                 throw x;
       
   345             }
       
   346             if (response != null) {
       
   347                 finisher.finish(where, req.uri(), response, thrower, promiseMap);
       
   348             }
       
   349         }
       
   350     }
       
   351 
       
   352     enum Where {
       
   353         BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY, BODY_CF,
       
   354         BEFORE_ACCEPTING, AFTER_ACCEPTING;
       
   355         public Consumer<Where> select(Consumer<Where> consumer) {
       
   356             return new Consumer<Where>() {
       
   357                 @Override
       
   358                 public void accept(Where where) {
       
   359                     if (Where.this == where) {
       
   360                         consumer.accept(where);
       
   361                     }
       
   362                 }
       
   363             };
       
   364         }
       
   365     }
       
   366 
       
   367     interface Thrower extends Consumer<Where>, Predicate<Throwable> {
       
   368 
       
   369     }
       
   370 
       
   371     interface Finisher<T,U> {
       
   372         U finish(Where w, URI requestURI, HttpResponse<T> resp, Thrower thrower,
       
   373                  Map<HttpRequest, CompletableFuture<HttpResponse<T>>> promises);
       
   374     }
       
   375 
       
   376     final <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {
       
   377         String msg = "Expected exception not thrown in " + w
       
   378                 + "\n\tReceived: " + resp
       
   379                 + "\n\tWith body: " + resp.body();
       
   380         System.out.println(msg);
       
   381         throw new RuntimeException(msg);
       
   382     }
       
   383 
       
   384     final List<String> checkAsString(Where w, URI reqURI,
       
   385                                     HttpResponse<String> resp,
       
   386                                     Thrower thrower,
       
   387                                     Map<HttpRequest, CompletableFuture<HttpResponse<String>>> promises) {
       
   388         Function<HttpResponse<String>, List<String>> extractor =
       
   389                 (r) -> List.of(r.body());
       
   390         return check(w, reqURI, resp, thrower, promises, extractor);
       
   391     }
       
   392 
       
   393     final List<String> checkAsLines(Where w, URI reqURI,
       
   394                                     HttpResponse<Stream<String>> resp,
       
   395                                     Thrower thrower,
       
   396                                     Map<HttpRequest, CompletableFuture<HttpResponse<Stream<String>>>> promises) {
       
   397         Function<HttpResponse<Stream<String>>, List<String>> extractor =
       
   398                 (r) -> r.body().collect(Collectors.toList());
       
   399         return check(w, reqURI, resp, thrower, promises, extractor);
       
   400     }
       
   401 
       
   402     final List<String> checkAsInputStream(Where w, URI reqURI,
       
   403                                           HttpResponse<InputStream> resp,
       
   404                                           Thrower thrower,
       
   405                                           Map<HttpRequest, CompletableFuture<HttpResponse<InputStream>>> promises)
       
   406     {
       
   407         Function<HttpResponse<InputStream>, List<String>> extractor = (r) -> {
       
   408             List<String> result;
       
   409             try (InputStream is = r.body()) {
       
   410                 result = new BufferedReader(new InputStreamReader(is))
       
   411                         .lines().collect(Collectors.toList());
       
   412             } catch (Throwable t) {
       
   413                 throw new CompletionException(t);
       
   414             }
       
   415             return result;
       
   416         };
       
   417         return check(w, reqURI, resp, thrower, promises, extractor);
       
   418     }
       
   419 
       
   420     private final <T> List<String> check(Where w, URI reqURI,
       
   421                                  HttpResponse<T> resp,
       
   422                                  Thrower thrower,
       
   423                                  Map<HttpRequest, CompletableFuture<HttpResponse<T>>> promises,
       
   424                                  Function<HttpResponse<T>, List<String>> extractor)
       
   425     {
       
   426         List<String> result = extractor.apply(resp);
       
   427         for (HttpRequest req : promises.keySet()) {
       
   428             switch (w) {
       
   429                 case BEFORE_ACCEPTING:
       
   430                     throw new RuntimeException("No push promise should have been received" +
       
   431                             " for " + reqURI + " in " + w + ": got " + promises.keySet());
       
   432                 default:
       
   433                     break;
       
   434             }
       
   435             HttpResponse<T> presp;
       
   436             try {
       
   437                 presp = promises.get(req).join();
       
   438             } catch (Error | Exception x) {
       
   439                 Throwable cause = findCause(x, thrower);
       
   440                 if (cause != null) {
       
   441                     out.println(now() + "Got expected exception in "
       
   442                             + w + ": " + cause);
       
   443                     continue;
       
   444                 }
       
   445                 throw x;
       
   446             }
       
   447             switch (w) {
       
   448                 case BEFORE_ACCEPTING:
       
   449                 case AFTER_ACCEPTING:
       
   450                 case BODY_HANDLER:
       
   451                 case GET_BODY:
       
   452                 case BODY_CF:
       
   453                     return shouldHaveThrown(w, presp, thrower);
       
   454                 default:
       
   455                     break;
       
   456             }
       
   457             List<String> presult = null;
       
   458             try {
       
   459                 presult = extractor.apply(presp);
       
   460             } catch (Error | Exception x) {
       
   461                 Throwable cause = findCause(x, thrower);
       
   462                 if (cause != null) {
       
   463                     out.println(now() + "Got expected exception for "
       
   464                             + req + " in " + w + ": " + cause);
       
   465                     continue;
       
   466                 }
       
   467                 throw x;
       
   468             }
       
   469             throw new RuntimeException("Expected exception not thrown for "
       
   470                     + req + " in " + w);
       
   471         }
       
   472         final int expectedCount;
       
   473         switch (w) {
       
   474             case BEFORE_ACCEPTING:
       
   475                 expectedCount = 0;
       
   476                 break;
       
   477             default:
       
   478                 expectedCount = 3;
       
   479         }
       
   480         assertEquals(promises.size(), expectedCount,
       
   481                 "bad promise count for " + reqURI + " with " + w);
       
   482         assertEquals(result, List.of(reqURI.toASCIIString()));
       
   483         return result;
       
   484     }
       
   485 
       
   486     private static Throwable findCause(Throwable x,
       
   487                                        Predicate<Throwable> filter) {
       
   488         while (x != null && !filter.test(x)) x = x.getCause();
       
   489         return x;
       
   490     }
       
   491 
       
   492     static final class UncheckedCustomExceptionThrower implements Thrower {
       
   493         @Override
       
   494         public void accept(Where where) {
       
   495             out.println(now() + "Throwing in " + where);
       
   496             throw new UncheckedCustomException(where.name());
       
   497         }
       
   498 
       
   499         @Override
       
   500         public boolean test(Throwable throwable) {
       
   501             return UncheckedCustomException.class.isInstance(throwable);
       
   502         }
       
   503 
       
   504         @Override
       
   505         public String toString() {
       
   506             return "UncheckedCustomExceptionThrower";
       
   507         }
       
   508     }
       
   509 
       
   510     static final class UncheckedIOExceptionThrower implements Thrower {
       
   511         @Override
       
   512         public void accept(Where where) {
       
   513             out.println(now() + "Throwing in " + where);
       
   514             throw new UncheckedIOException(new CustomIOException(where.name()));
       
   515         }
       
   516 
       
   517         @Override
       
   518         public boolean test(Throwable throwable) {
       
   519             return UncheckedIOException.class.isInstance(throwable)
       
   520                     && CustomIOException.class.isInstance(throwable.getCause());
       
   521         }
       
   522 
       
   523         @Override
       
   524         public String toString() {
       
   525             return "UncheckedIOExceptionThrower";
       
   526         }
       
   527     }
       
   528 
       
   529     static final class UncheckedCustomException extends RuntimeException {
       
   530         UncheckedCustomException(String message) {
       
   531             super(message);
       
   532         }
       
   533         UncheckedCustomException(String message, Throwable cause) {
       
   534             super(message, cause);
       
   535         }
       
   536     }
       
   537 
       
   538     static final class CustomIOException extends IOException {
       
   539         CustomIOException(String message) {
       
   540             super(message);
       
   541         }
       
   542         CustomIOException(String message, Throwable cause) {
       
   543             super(message, cause);
       
   544         }
       
   545     }
       
   546 
       
   547     static final class ThrowingPromiseHandler<T> implements PushPromiseHandler<T> {
       
   548         final Consumer<Where> throwing;
       
   549         final PushPromiseHandler<T> pushHandler;
       
   550         ThrowingPromiseHandler(Consumer<Where> throwing, PushPromiseHandler<T> pushHandler) {
       
   551             this.throwing = throwing;
       
   552             this.pushHandler = pushHandler;
       
   553         }
       
   554 
       
   555         @Override
       
   556         public void applyPushPromise(HttpRequest initiatingRequest,
       
   557                                      HttpRequest pushPromiseRequest,
       
   558                                      Function<BodyHandler<T>,
       
   559                                              CompletableFuture<HttpResponse<T>>> acceptor) {
       
   560             throwing.accept(Where.BEFORE_ACCEPTING);
       
   561             pushHandler.applyPushPromise(initiatingRequest, pushPromiseRequest, acceptor);
       
   562             throwing.accept(Where.AFTER_ACCEPTING);
       
   563         }
       
   564     }
       
   565 
       
   566     static final class ThrowingBodyHandler<T> implements BodyHandler<T> {
       
   567         final Consumer<Where> throwing;
       
   568         final BodyHandler<T> bodyHandler;
       
   569         ThrowingBodyHandler(Consumer<Where> throwing, BodyHandler<T> bodyHandler) {
       
   570             this.throwing = throwing;
       
   571             this.bodyHandler = bodyHandler;
       
   572         }
       
   573         @Override
       
   574         public BodySubscriber<T> apply(HttpResponse.ResponseInfo rinfo) {
       
   575             throwing.accept(Where.BODY_HANDLER);
       
   576             BodySubscriber<T> subscriber = bodyHandler.apply(rinfo);
       
   577             return new ThrowingBodySubscriber(throwing, subscriber);
       
   578         }
       
   579     }
       
   580 
       
   581     static final class ThrowingBodySubscriber<T> implements BodySubscriber<T> {
       
   582         private final BodySubscriber<T> subscriber;
       
   583         volatile boolean onSubscribeCalled;
       
   584         final Consumer<Where> throwing;
       
   585         ThrowingBodySubscriber(Consumer<Where> throwing, BodySubscriber<T> subscriber) {
       
   586             this.throwing = throwing;
       
   587             this.subscriber = subscriber;
       
   588         }
       
   589 
       
   590         @Override
       
   591         public void onSubscribe(Flow.Subscription subscription) {
       
   592             //out.println("onSubscribe ");
       
   593             onSubscribeCalled = true;
       
   594             throwing.accept(Where.ON_SUBSCRIBE);
       
   595             subscriber.onSubscribe(subscription);
       
   596         }
       
   597 
       
   598         @Override
       
   599         public void onNext(List<ByteBuffer> item) {
       
   600            // out.println("onNext " + item);
       
   601             assertTrue(onSubscribeCalled);
       
   602             throwing.accept(Where.ON_NEXT);
       
   603             subscriber.onNext(item);
       
   604         }
       
   605 
       
   606         @Override
       
   607         public void onError(Throwable throwable) {
       
   608             //out.println("onError");
       
   609             assertTrue(onSubscribeCalled);
       
   610             throwing.accept(Where.ON_ERROR);
       
   611             subscriber.onError(throwable);
       
   612         }
       
   613 
       
   614         @Override
       
   615         public void onComplete() {
       
   616             //out.println("onComplete");
       
   617             assertTrue(onSubscribeCalled, "onComplete called before onSubscribe");
       
   618             throwing.accept(Where.ON_COMPLETE);
       
   619             subscriber.onComplete();
       
   620         }
       
   621 
       
   622         @Override
       
   623         public CompletionStage<T> getBody() {
       
   624             throwing.accept(Where.GET_BODY);
       
   625             try {
       
   626                 throwing.accept(Where.BODY_CF);
       
   627             } catch (Throwable t) {
       
   628                 return CompletableFuture.failedFuture(t);
       
   629             }
       
   630             return subscriber.getBody();
       
   631         }
       
   632     }
       
   633 
       
   634 
       
   635     @BeforeTest
       
   636     public void setup() throws Exception {
       
   637         sslContext = new SimpleSSLContext().get();
       
   638         if (sslContext == null)
       
   639             throw new AssertionError("Unexpected null sslContext");
       
   640 
       
   641         // HTTP/2
       
   642         HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler();
       
   643         HttpTestHandler h2_chunkedHandler = new HTTP_ChunkedHandler();
       
   644 
       
   645         http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
       
   646         http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
       
   647         http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
       
   648         http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed/x";
       
   649         http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk/x";
       
   650 
       
   651         https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, 0));
       
   652         https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
       
   653         https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
       
   654         https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed/x";
       
   655         https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk/x";
       
   656 
       
   657         serverCount.addAndGet(2);
       
   658         http2TestServer.start();
       
   659         https2TestServer.start();
       
   660     }
       
   661 
       
   662     @AfterTest
       
   663     public void teardown() throws Exception {
       
   664         String sharedClientName =
       
   665                 sharedClient == null ? null : sharedClient.toString();
       
   666         sharedClient = null;
       
   667         Thread.sleep(100);
       
   668         AssertionError fail = TRACKER.check(500);
       
   669         try {
       
   670             http2TestServer.stop();
       
   671             https2TestServer.stop();
       
   672         } finally {
       
   673             if (fail != null) {
       
   674                 if (sharedClientName != null) {
       
   675                     System.err.println("Shared client name is: " + sharedClientName);
       
   676                 }
       
   677                 throw fail;
       
   678             }
       
   679         }
       
   680     }
       
   681 
       
   682     private static void pushPromiseFor(HttpTestExchange t, URI requestURI, String pushPath, boolean fixed)
       
   683             throws IOException
       
   684     {
       
   685         try {
       
   686             URI promise = new URI(requestURI.getScheme(),
       
   687                     requestURI.getAuthority(),
       
   688                     pushPath, null, null);
       
   689             byte[] promiseBytes = promise.toASCIIString().getBytes(UTF_8);
       
   690             out.printf("TestServer: %s Pushing promise: %s%n", now(), promise);
       
   691             err.printf("TestServer: %s Pushing promise: %s%n", now(), promise);
       
   692             HttpTestHeaders headers =  HttpTestHeaders.of(new HttpHeadersImpl());
       
   693             if (fixed) {
       
   694                 headers.addHeader("Content-length", String.valueOf(promiseBytes.length));
       
   695             }
       
   696             t.serverPush(promise, headers, promiseBytes);
       
   697         } catch (URISyntaxException x) {
       
   698             throw new IOException(x.getMessage(), x);
       
   699         }
       
   700     }
       
   701 
       
   702     static class HTTP_FixedLengthHandler implements HttpTestHandler {
       
   703         @Override
       
   704         public void handle(HttpTestExchange t) throws IOException {
       
   705             out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI());
       
   706             try (InputStream is = t.getRequestBody()) {
       
   707                 is.readAllBytes();
       
   708             }
       
   709             URI requestURI = t.getRequestURI();
       
   710             for (int i = 1; i<2; i++) {
       
   711                 String path = requestURI.getPath() + "/before/promise-" + i;
       
   712                 pushPromiseFor(t, requestURI, path, true);
       
   713             }
       
   714             byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8);
       
   715             t.sendResponseHeaders(200, resp.length);  //fixed content length
       
   716             try (OutputStream os = t.getResponseBody()) {
       
   717                 int bytes = resp.length/3;
       
   718                 for (int i = 0; i<2; i++) {
       
   719                     String path = requestURI.getPath() + "/after/promise-" + (i + 2);
       
   720                     os.write(resp, i * bytes, bytes);
       
   721                     os.flush();
       
   722                     pushPromiseFor(t, requestURI, path, true);
       
   723                 }
       
   724                 os.write(resp, 2*bytes, resp.length - 2*bytes);
       
   725             }
       
   726         }
       
   727 
       
   728     }
       
   729 
       
   730     static class HTTP_ChunkedHandler implements HttpTestHandler {
       
   731         @Override
       
   732         public void handle(HttpTestExchange t) throws IOException {
       
   733             out.println("HTTP_ChunkedHandler received request to " + t.getRequestURI());
       
   734             byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8);
       
   735             try (InputStream is = t.getRequestBody()) {
       
   736                 is.readAllBytes();
       
   737             }
       
   738             URI requestURI = t.getRequestURI();
       
   739             for (int i = 1; i<2; i++) {
       
   740                 String path = requestURI.getPath() + "/before/promise-" + i;
       
   741                 pushPromiseFor(t, requestURI, path, false);
       
   742             }
       
   743             t.sendResponseHeaders(200, -1); // chunked/variable
       
   744             try (OutputStream os = t.getResponseBody()) {
       
   745                 int bytes = resp.length/3;
       
   746                 for (int i = 0; i<2; i++) {
       
   747                     String path = requestURI.getPath() + "/after/promise-" + (i + 2);
       
   748                     os.write(resp, i * bytes, bytes);
       
   749                     os.flush();
       
   750                     pushPromiseFor(t, requestURI, path, false);
       
   751                 }
       
   752                 os.write(resp, 2*bytes, resp.length - 2*bytes);
       
   753             }
       
   754         }
       
   755     }
       
   756 
       
   757 }