test/jdk/java/net/httpclient/AbstractThrowingPushPromises.java
branchhttp-client-branch
changeset 56780 6e0e57fe33c7
parent 56771 73a6534bce94
child 56795 03ece2518428
equal deleted inserted replaced
56779:bbe903489568 56780:6e0e57fe33c7
       
     1 /*
       
     2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 /*
       
    25  * @test
       
    26  * @summary Tests what happens when push promise handlers and their
       
    27  *          response body handlers and subscribers throw unexpected exceptions.
       
    28  * @library /lib/testlibrary http2/server
       
    29  * @build jdk.testlibrary.SimpleSSLContext HttpServerAdapters
       
    30   *       ReferenceTracker AbstractThrowingPushPromises
       
    31  * @modules java.base/sun.net.www.http
       
    32  *          java.net.http/jdk.internal.net.http.common
       
    33  *          java.net.http/jdk.internal.net.http.frame
       
    34  *          java.net.http/jdk.internal.net.http.hpack
       
    35  * @run testng/othervm -Djdk.internal.httpclient.debug=true AbstractThrowingPushPromises
       
    36  */
       
    37 
       
    38 import jdk.testlibrary.SimpleSSLContext;
       
    39 import org.testng.annotations.AfterTest;
       
    40 import org.testng.annotations.AfterClass;
       
    41 import org.testng.annotations.BeforeTest;
       
    42 import org.testng.annotations.DataProvider;
       
    43 import org.testng.annotations.Test;
       
    44 
       
    45 import javax.net.ssl.SSLContext;
       
    46 import java.io.BufferedReader;
       
    47 import java.io.IOException;
       
    48 import java.io.InputStream;
       
    49 import java.io.InputStreamReader;
       
    50 import java.io.OutputStream;
       
    51 import java.io.UncheckedIOException;
       
    52 import java.net.URI;
       
    53 import java.net.URISyntaxException;
       
    54 import java.net.http.HttpClient;
       
    55 import java.net.http.HttpHeaders;
       
    56 import java.net.http.HttpRequest;
       
    57 import java.net.http.HttpResponse;
       
    58 import java.net.http.HttpResponse.BodyHandler;
       
    59 import java.net.http.HttpResponse.BodyHandlers;
       
    60 import java.net.http.HttpResponse.BodySubscriber;
       
    61 import java.net.http.HttpResponse.PushPromiseHandler;
       
    62 import java.nio.ByteBuffer;
       
    63 import java.nio.charset.StandardCharsets;
       
    64 import java.util.List;
       
    65 import java.util.Map;
       
    66 import java.util.concurrent.CompletableFuture;
       
    67 import java.util.concurrent.CompletionException;
       
    68 import java.util.concurrent.CompletionStage;
       
    69 import java.util.concurrent.ConcurrentHashMap;
       
    70 import java.util.concurrent.ConcurrentMap;
       
    71 import java.util.concurrent.Executor;
       
    72 import java.util.concurrent.Executors;
       
    73 import java.util.concurrent.Flow;
       
    74 import java.util.concurrent.atomic.AtomicLong;
       
    75 import java.util.function.BiPredicate;
       
    76 import java.util.function.Consumer;
       
    77 import java.util.function.Function;
       
    78 import java.util.function.Predicate;
       
    79 import java.util.function.Supplier;
       
    80 import java.util.stream.Collectors;
       
    81 import java.util.stream.Stream;
       
    82 
       
    83 import static java.lang.System.out;
       
    84 import static java.lang.System.err;
       
    85 import static java.lang.String.format;
       
    86 import static java.nio.charset.StandardCharsets.UTF_8;
       
    87 import static org.testng.Assert.assertEquals;
       
    88 import static org.testng.Assert.assertTrue;
       
    89 
       
    90 public abstract class AbstractThrowingPushPromises implements HttpServerAdapters {
       
    91 
       
    92     SSLContext sslContext;
       
    93     HttpTestServer http2TestServer;   // HTTP/2 ( h2c )
       
    94     HttpTestServer https2TestServer;  // HTTP/2 ( h2  )
       
    95     String http2URI_fixed;
       
    96     String http2URI_chunk;
       
    97     String https2URI_fixed;
       
    98     String https2URI_chunk;
       
    99 
       
   100     static final int ITERATION_COUNT = 1;
       
   101     // a shared executor helps reduce the amount of threads created by the test
       
   102     static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
       
   103     static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
       
   104     static volatile boolean tasksFailed;
       
   105     static final AtomicLong serverCount = new AtomicLong();
       
   106     static final AtomicLong clientCount = new AtomicLong();
       
   107     static final long start = System.nanoTime();
       
   108     public static String now() {
       
   109         long now = System.nanoTime() - start;
       
   110         long secs = now / 1000_000_000;
       
   111         long mill = (now % 1000_000_000) / 1000_000;
       
   112         long nan = now % 1000_000;
       
   113         return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);
       
   114     }
       
   115 
       
   116     final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
       
   117     private volatile HttpClient sharedClient;
       
   118 
       
   119     static class TestExecutor implements Executor {
       
   120         final AtomicLong tasks = new AtomicLong();
       
   121         Executor executor;
       
   122         TestExecutor(Executor executor) {
       
   123             this.executor = executor;
       
   124         }
       
   125 
       
   126         @Override
       
   127         public void execute(Runnable command) {
       
   128             long id = tasks.incrementAndGet();
       
   129             executor.execute(() -> {
       
   130                 try {
       
   131                     command.run();
       
   132                 } catch (Throwable t) {
       
   133                     tasksFailed = true;
       
   134                     out.printf(now() + "Task %s failed: %s%n", id, t);
       
   135                     err.printf(now() + "Task %s failed: %s%n", id, t);
       
   136                     FAILURES.putIfAbsent("Task " + id, t);
       
   137                     throw t;
       
   138                 }
       
   139             });
       
   140         }
       
   141     }
       
   142 
       
   143     @AfterClass
       
   144     static final void printFailedTests() {
       
   145         out.println("\n=========================");
       
   146         try {
       
   147             out.printf("%n%sCreated %d servers and %d clients%n",
       
   148                     now(), serverCount.get(), clientCount.get());
       
   149             if (FAILURES.isEmpty()) return;
       
   150             out.println("Failed tests: ");
       
   151             FAILURES.entrySet().forEach((e) -> {
       
   152                 out.printf("\t%s: %s%n", e.getKey(), e.getValue());
       
   153                 e.getValue().printStackTrace(out);
       
   154                 e.getValue().printStackTrace();
       
   155             });
       
   156             if (tasksFailed) {
       
   157                 out.println("WARNING: Some tasks failed");
       
   158             }
       
   159         } finally {
       
   160             out.println("\n=========================\n");
       
   161         }
       
   162     }
       
   163 
       
   164     private String[] uris() {
       
   165         return new String[] {
       
   166                 http2URI_fixed,
       
   167                 http2URI_chunk,
       
   168                 https2URI_fixed,
       
   169                 https2URI_chunk,
       
   170         };
       
   171     }
       
   172 
       
   173     @DataProvider(name = "sanity")
       
   174     public Object[][] sanity() {
       
   175         String[] uris = uris();
       
   176         Object[][] result = new Object[uris.length * 2][];
       
   177 
       
   178         int i = 0;
       
   179         for (boolean sameClient : List.of(false, true)) {
       
   180             for (String uri: uris()) {
       
   181                 result[i++] = new Object[] {uri, sameClient};
       
   182             }
       
   183         }
       
   184         assert i == uris.length * 2;
       
   185         return result;
       
   186     }
       
   187 
       
   188     enum Where {
       
   189         BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY, BODY_CF,
       
   190         BEFORE_ACCEPTING, AFTER_ACCEPTING;
       
   191         public Consumer<Where> select(Consumer<Where> consumer) {
       
   192             return new Consumer<Where>() {
       
   193                 @Override
       
   194                 public void accept(Where where) {
       
   195                     if (Where.this == where) {
       
   196                         consumer.accept(where);
       
   197                     }
       
   198                 }
       
   199             };
       
   200         }
       
   201     }
       
   202 
       
   203     private Object[][] variants(List<Thrower> throwers) {
       
   204         String[] uris = uris();
       
   205         Object[][] result = new Object[uris.length * 2 * throwers.size()][];
       
   206         int i = 0;
       
   207         for (Thrower thrower : throwers) {
       
   208             for (boolean sameClient : List.of(false, true)) {
       
   209                 for (String uri : uris()) {
       
   210                     result[i++] = new Object[]{uri, sameClient, thrower};
       
   211                 }
       
   212             }
       
   213         }
       
   214         assert i == uris.length * 2 * throwers.size();
       
   215         return result;
       
   216     }
       
   217 
       
   218     @DataProvider(name = "ioVariants")
       
   219     public Object[][] ioVariants() {
       
   220         return variants(List.of(
       
   221                 new UncheckedIOExceptionThrower()));
       
   222     }
       
   223 
       
   224     @DataProvider(name = "customVariants")
       
   225     public Object[][] customVariants() {
       
   226         return variants(List.of(
       
   227                 new UncheckedCustomExceptionThrower()));
       
   228     }
       
   229 
       
   230     private HttpClient makeNewClient() {
       
   231         clientCount.incrementAndGet();
       
   232         return TRACKER.track(HttpClient.newBuilder()
       
   233                 .proxy(HttpClient.Builder.NO_PROXY)
       
   234                 .executor(executor)
       
   235                 .sslContext(sslContext)
       
   236                 .build());
       
   237     }
       
   238 
       
   239     HttpClient newHttpClient(boolean share) {
       
   240         if (!share) return makeNewClient();
       
   241         HttpClient shared = sharedClient;
       
   242         if (shared != null) return shared;
       
   243         synchronized (this) {
       
   244             shared = sharedClient;
       
   245             if (shared == null) {
       
   246                 shared = sharedClient = makeNewClient();
       
   247             }
       
   248             return shared;
       
   249         }
       
   250     }
       
   251 
       
   252     // @Test(dataProvider = "sanity")
       
   253     protected void testSanityImpl(String uri, boolean sameClient)
       
   254             throws Exception {
       
   255         HttpClient client = null;
       
   256         out.printf("%ntestNoThrows(%s, %b)%n", uri, sameClient);
       
   257         for (int i=0; i< ITERATION_COUNT; i++) {
       
   258             if (!sameClient || client == null)
       
   259                 client = newHttpClient(sameClient);
       
   260 
       
   261             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
       
   262                     .build();
       
   263             BodyHandler<Stream<String>> handler =
       
   264                     new ThrowingBodyHandler((w) -> {},
       
   265                                             BodyHandlers.ofLines());
       
   266             Map<HttpRequest, CompletableFuture<HttpResponse<Stream<String>>>> pushPromises =
       
   267                     new ConcurrentHashMap<>();
       
   268             PushPromiseHandler<Stream<String>> pushHandler = new PushPromiseHandler<>() {
       
   269                 @Override
       
   270                 public void applyPushPromise(HttpRequest initiatingRequest,
       
   271                                              HttpRequest pushPromiseRequest,
       
   272                                              Function<BodyHandler<Stream<String>>,
       
   273                                                      CompletableFuture<HttpResponse<Stream<String>>>>
       
   274                                                      acceptor) {
       
   275                     pushPromises.putIfAbsent(pushPromiseRequest, acceptor.apply(handler));
       
   276                 }
       
   277             };
       
   278             HttpResponse<Stream<String>> response =
       
   279                     client.sendAsync(req, BodyHandlers.ofLines(), pushHandler).get();
       
   280             String body = response.body().collect(Collectors.joining("|"));
       
   281             assertEquals(URI.create(body).getPath(), URI.create(uri).getPath());
       
   282             for (HttpRequest promised : pushPromises.keySet()) {
       
   283                 out.printf("%s Received promise: %s%n\tresponse: %s%n",
       
   284                         now(), promised, pushPromises.get(promised).get());
       
   285                 String promisedBody = pushPromises.get(promised).get().body()
       
   286                         .collect(Collectors.joining("|"));
       
   287                 assertEquals(promisedBody, promised.uri().toASCIIString());
       
   288             }
       
   289             assertEquals(3, pushPromises.size());
       
   290         }
       
   291     }
       
   292 
       
   293     // @Test(dataProvider = "variants")
       
   294     protected void testThrowingAsStringImpl(String uri,
       
   295                                      boolean sameClient,
       
   296                                      Thrower thrower)
       
   297             throws Exception
       
   298     {
       
   299         String test = format("testThrowingAsString(%s, %b, %s)",
       
   300                              uri, sameClient, thrower);
       
   301         testThrowing(test, uri, sameClient, BodyHandlers::ofString,
       
   302                 this::checkAsString, thrower);
       
   303     }
       
   304 
       
   305     //@Test(dataProvider = "variants")
       
   306     protected void testThrowingAsLinesImpl(String uri,
       
   307                                     boolean sameClient,
       
   308                                     Thrower thrower)
       
   309             throws Exception
       
   310     {
       
   311         String test =  format("testThrowingAsLines(%s, %b, %s)",
       
   312                 uri, sameClient, thrower);
       
   313         testThrowing(test, uri, sameClient, BodyHandlers::ofLines,
       
   314                 this::checkAsLines, thrower);
       
   315     }
       
   316 
       
   317     //@Test(dataProvider = "variants")
       
   318     protected void testThrowingAsInputStreamImpl(String uri,
       
   319                                           boolean sameClient,
       
   320                                           Thrower thrower)
       
   321             throws Exception
       
   322     {
       
   323         String test = format("testThrowingAsInputStream(%s, %b, %s)",
       
   324                 uri, sameClient, thrower);
       
   325         testThrowing(test, uri, sameClient, BodyHandlers::ofInputStream,
       
   326                 this::checkAsInputStream,  thrower);
       
   327     }
       
   328 
       
   329     private <T,U> void testThrowing(String name, String uri, boolean sameClient,
       
   330                                     Supplier<BodyHandler<T>> handlers,
       
   331                                     Finisher finisher, Thrower thrower)
       
   332             throws Exception
       
   333     {
       
   334         out.printf("%n%s%s%n", now(), name);
       
   335         try {
       
   336             testThrowing(uri, sameClient, handlers, finisher, thrower);
       
   337         } catch (Error | Exception x) {
       
   338             FAILURES.putIfAbsent(name, x);
       
   339             throw x;
       
   340         }
       
   341     }
       
   342 
       
   343     private <T,U> void testThrowing(String uri, boolean sameClient,
       
   344                                     Supplier<BodyHandler<T>> handlers,
       
   345                                     Finisher finisher, Thrower thrower)
       
   346             throws Exception
       
   347     {
       
   348         HttpClient client = null;
       
   349         for (Where where : Where.values()) {
       
   350             if (where == Where.ON_ERROR) continue;
       
   351             if (!sameClient || client == null)
       
   352                 client = newHttpClient(sameClient);
       
   353 
       
   354             HttpRequest req = HttpRequest.
       
   355                     newBuilder(URI.create(uri))
       
   356                     .build();
       
   357             ConcurrentMap<HttpRequest, CompletableFuture<HttpResponse<T>>> promiseMap =
       
   358                     new ConcurrentHashMap<>();
       
   359             Supplier<BodyHandler<T>> throwing = () ->
       
   360                     new ThrowingBodyHandler(where.select(thrower), handlers.get());
       
   361             PushPromiseHandler<T> pushHandler = new ThrowingPromiseHandler<>(
       
   362                     where.select(thrower),
       
   363                     PushPromiseHandler.of((r) -> throwing.get(), promiseMap));
       
   364             out.println("try throwing in " + where);
       
   365             HttpResponse<T> response = null;
       
   366             try {
       
   367                 response = client.sendAsync(req, handlers.get(), pushHandler).join();
       
   368             } catch (Error | Exception x) {
       
   369                 throw x;
       
   370             }
       
   371             if (response != null) {
       
   372                 finisher.finish(where, req.uri(), response, thrower, promiseMap);
       
   373             }
       
   374         }
       
   375     }
       
   376 
       
   377     interface Thrower extends Consumer<Where>, Predicate<Throwable> {
       
   378 
       
   379     }
       
   380 
       
   381     interface Finisher<T,U> {
       
   382         U finish(Where w, URI requestURI, HttpResponse<T> resp, Thrower thrower,
       
   383                  Map<HttpRequest, CompletableFuture<HttpResponse<T>>> promises);
       
   384     }
       
   385 
       
   386     final <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {
       
   387         String msg = "Expected exception not thrown in " + w
       
   388                 + "\n\tReceived: " + resp
       
   389                 + "\n\tWith body: " + resp.body();
       
   390         System.out.println(msg);
       
   391         throw new RuntimeException(msg);
       
   392     }
       
   393 
       
   394     final List<String> checkAsString(Where w, URI reqURI,
       
   395                                     HttpResponse<String> resp,
       
   396                                     Thrower thrower,
       
   397                                     Map<HttpRequest, CompletableFuture<HttpResponse<String>>> promises) {
       
   398         Function<HttpResponse<String>, List<String>> extractor =
       
   399                 (r) -> List.of(r.body());
       
   400         return check(w, reqURI, resp, thrower, promises, extractor);
       
   401     }
       
   402 
       
   403     final List<String> checkAsLines(Where w, URI reqURI,
       
   404                                     HttpResponse<Stream<String>> resp,
       
   405                                     Thrower thrower,
       
   406                                     Map<HttpRequest, CompletableFuture<HttpResponse<Stream<String>>>> promises) {
       
   407         Function<HttpResponse<Stream<String>>, List<String>> extractor =
       
   408                 (r) -> r.body().collect(Collectors.toList());
       
   409         return check(w, reqURI, resp, thrower, promises, extractor);
       
   410     }
       
   411 
       
   412     final List<String> checkAsInputStream(Where w, URI reqURI,
       
   413                                           HttpResponse<InputStream> resp,
       
   414                                           Thrower thrower,
       
   415                                           Map<HttpRequest, CompletableFuture<HttpResponse<InputStream>>> promises)
       
   416     {
       
   417         Function<HttpResponse<InputStream>, List<String>> extractor = (r) -> {
       
   418             List<String> result;
       
   419             try (InputStream is = r.body()) {
       
   420                 result = new BufferedReader(new InputStreamReader(is))
       
   421                         .lines().collect(Collectors.toList());
       
   422             } catch (Throwable t) {
       
   423                 throw new CompletionException(t);
       
   424             }
       
   425             return result;
       
   426         };
       
   427         return check(w, reqURI, resp, thrower, promises, extractor);
       
   428     }
       
   429 
       
   430     private final <T> List<String> check(Where w, URI reqURI,
       
   431                                  HttpResponse<T> resp,
       
   432                                  Thrower thrower,
       
   433                                  Map<HttpRequest, CompletableFuture<HttpResponse<T>>> promises,
       
   434                                  Function<HttpResponse<T>, List<String>> extractor)
       
   435     {
       
   436         List<String> result = extractor.apply(resp);
       
   437         for (HttpRequest req : promises.keySet()) {
       
   438             switch (w) {
       
   439                 case BEFORE_ACCEPTING:
       
   440                     throw new RuntimeException("No push promise should have been received" +
       
   441                             " for " + reqURI + " in " + w + ": got " + promises.keySet());
       
   442                 default:
       
   443                     break;
       
   444             }
       
   445             HttpResponse<T> presp;
       
   446             try {
       
   447                 presp = promises.get(req).join();
       
   448             } catch (Error | Exception x) {
       
   449                 Throwable cause = findCause(x, thrower);
       
   450                 if (cause != null) {
       
   451                     out.println(now() + "Got expected exception in "
       
   452                             + w + ": " + cause);
       
   453                     continue;
       
   454                 }
       
   455                 throw x;
       
   456             }
       
   457             switch (w) {
       
   458                 case BEFORE_ACCEPTING:
       
   459                 case AFTER_ACCEPTING:
       
   460                 case BODY_HANDLER:
       
   461                 case GET_BODY:
       
   462                 case BODY_CF:
       
   463                     return shouldHaveThrown(w, presp, thrower);
       
   464                 default:
       
   465                     break;
       
   466             }
       
   467             List<String> presult = null;
       
   468             try {
       
   469                 presult = extractor.apply(presp);
       
   470             } catch (Error | Exception x) {
       
   471                 Throwable cause = findCause(x, thrower);
       
   472                 if (cause != null) {
       
   473                     out.println(now() + "Got expected exception for "
       
   474                             + req + " in " + w + ": " + cause);
       
   475                     continue;
       
   476                 }
       
   477                 throw x;
       
   478             }
       
   479             throw new RuntimeException("Expected exception not thrown for "
       
   480                     + req + " in " + w);
       
   481         }
       
   482         final int expectedCount;
       
   483         switch (w) {
       
   484             case BEFORE_ACCEPTING:
       
   485                 expectedCount = 0;
       
   486                 break;
       
   487             default:
       
   488                 expectedCount = 3;
       
   489         }
       
   490         assertEquals(promises.size(), expectedCount,
       
   491                 "bad promise count for " + reqURI + " with " + w);
       
   492         assertEquals(result, List.of(reqURI.toASCIIString()));
       
   493         return result;
       
   494     }
       
   495 
       
   496     private static Throwable findCause(Throwable x,
       
   497                                        Predicate<Throwable> filter) {
       
   498         while (x != null && !filter.test(x)) x = x.getCause();
       
   499         return x;
       
   500     }
       
   501 
       
   502     static final class UncheckedCustomExceptionThrower implements Thrower {
       
   503         @Override
       
   504         public void accept(Where where) {
       
   505             out.println(now() + "Throwing in " + where);
       
   506             throw new UncheckedCustomException(where.name());
       
   507         }
       
   508 
       
   509         @Override
       
   510         public boolean test(Throwable throwable) {
       
   511             return UncheckedCustomException.class.isInstance(throwable);
       
   512         }
       
   513 
       
   514         @Override
       
   515         public String toString() {
       
   516             return "UncheckedCustomExceptionThrower";
       
   517         }
       
   518     }
       
   519 
       
   520     static final class UncheckedIOExceptionThrower implements Thrower {
       
   521         @Override
       
   522         public void accept(Where where) {
       
   523             out.println(now() + "Throwing in " + where);
       
   524             throw new UncheckedIOException(new CustomIOException(where.name()));
       
   525         }
       
   526 
       
   527         @Override
       
   528         public boolean test(Throwable throwable) {
       
   529             return UncheckedIOException.class.isInstance(throwable)
       
   530                     && CustomIOException.class.isInstance(throwable.getCause());
       
   531         }
       
   532 
       
   533         @Override
       
   534         public String toString() {
       
   535             return "UncheckedIOExceptionThrower";
       
   536         }
       
   537     }
       
   538 
       
   539     static final class UncheckedCustomException extends RuntimeException {
       
   540         UncheckedCustomException(String message) {
       
   541             super(message);
       
   542         }
       
   543         UncheckedCustomException(String message, Throwable cause) {
       
   544             super(message, cause);
       
   545         }
       
   546     }
       
   547 
       
   548     static final class CustomIOException extends IOException {
       
   549         CustomIOException(String message) {
       
   550             super(message);
       
   551         }
       
   552         CustomIOException(String message, Throwable cause) {
       
   553             super(message, cause);
       
   554         }
       
   555     }
       
   556 
       
   557     static final class ThrowingPromiseHandler<T> implements PushPromiseHandler<T> {
       
   558         final Consumer<Where> throwing;
       
   559         final PushPromiseHandler<T> pushHandler;
       
   560         ThrowingPromiseHandler(Consumer<Where> throwing, PushPromiseHandler<T> pushHandler) {
       
   561             this.throwing = throwing;
       
   562             this.pushHandler = pushHandler;
       
   563         }
       
   564 
       
   565         @Override
       
   566         public void applyPushPromise(HttpRequest initiatingRequest,
       
   567                                      HttpRequest pushPromiseRequest,
       
   568                                      Function<BodyHandler<T>,
       
   569                                              CompletableFuture<HttpResponse<T>>> acceptor) {
       
   570             throwing.accept(Where.BEFORE_ACCEPTING);
       
   571             pushHandler.applyPushPromise(initiatingRequest, pushPromiseRequest, acceptor);
       
   572             throwing.accept(Where.AFTER_ACCEPTING);
       
   573         }
       
   574     }
       
   575 
       
   576     static final class ThrowingBodyHandler<T> implements BodyHandler<T> {
       
   577         final Consumer<Where> throwing;
       
   578         final BodyHandler<T> bodyHandler;
       
   579         ThrowingBodyHandler(Consumer<Where> throwing, BodyHandler<T> bodyHandler) {
       
   580             this.throwing = throwing;
       
   581             this.bodyHandler = bodyHandler;
       
   582         }
       
   583         @Override
       
   584         public BodySubscriber<T> apply(HttpResponse.ResponseInfo rinfo) {
       
   585             throwing.accept(Where.BODY_HANDLER);
       
   586             BodySubscriber<T> subscriber = bodyHandler.apply(rinfo);
       
   587             return new ThrowingBodySubscriber(throwing, subscriber);
       
   588         }
       
   589     }
       
   590 
       
   591     static final class ThrowingBodySubscriber<T> implements BodySubscriber<T> {
       
   592         private final BodySubscriber<T> subscriber;
       
   593         volatile boolean onSubscribeCalled;
       
   594         final Consumer<Where> throwing;
       
   595         ThrowingBodySubscriber(Consumer<Where> throwing, BodySubscriber<T> subscriber) {
       
   596             this.throwing = throwing;
       
   597             this.subscriber = subscriber;
       
   598         }
       
   599 
       
   600         @Override
       
   601         public void onSubscribe(Flow.Subscription subscription) {
       
   602             //out.println("onSubscribe ");
       
   603             onSubscribeCalled = true;
       
   604             throwing.accept(Where.ON_SUBSCRIBE);
       
   605             subscriber.onSubscribe(subscription);
       
   606         }
       
   607 
       
   608         @Override
       
   609         public void onNext(List<ByteBuffer> item) {
       
   610            // out.println("onNext " + item);
       
   611             assertTrue(onSubscribeCalled);
       
   612             throwing.accept(Where.ON_NEXT);
       
   613             subscriber.onNext(item);
       
   614         }
       
   615 
       
   616         @Override
       
   617         public void onError(Throwable throwable) {
       
   618             //out.println("onError");
       
   619             assertTrue(onSubscribeCalled);
       
   620             throwing.accept(Where.ON_ERROR);
       
   621             subscriber.onError(throwable);
       
   622         }
       
   623 
       
   624         @Override
       
   625         public void onComplete() {
       
   626             //out.println("onComplete");
       
   627             assertTrue(onSubscribeCalled, "onComplete called before onSubscribe");
       
   628             throwing.accept(Where.ON_COMPLETE);
       
   629             subscriber.onComplete();
       
   630         }
       
   631 
       
   632         @Override
       
   633         public CompletionStage<T> getBody() {
       
   634             throwing.accept(Where.GET_BODY);
       
   635             try {
       
   636                 throwing.accept(Where.BODY_CF);
       
   637             } catch (Throwable t) {
       
   638                 return CompletableFuture.failedFuture(t);
       
   639             }
       
   640             return subscriber.getBody();
       
   641         }
       
   642     }
       
   643 
       
   644 
       
   645     @BeforeTest
       
   646     public void setup() throws Exception {
       
   647         sslContext = new SimpleSSLContext().get();
       
   648         if (sslContext == null)
       
   649             throw new AssertionError("Unexpected null sslContext");
       
   650 
       
   651         // HTTP/2
       
   652         HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler();
       
   653         HttpTestHandler h2_chunkedHandler = new HTTP_ChunkedHandler();
       
   654 
       
   655         http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
       
   656         http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
       
   657         http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
       
   658         http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed/x";
       
   659         http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk/x";
       
   660 
       
   661         https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext));
       
   662         https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
       
   663         https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
       
   664         https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed/x";
       
   665         https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk/x";
       
   666 
       
   667         serverCount.addAndGet(2);
       
   668         http2TestServer.start();
       
   669         https2TestServer.start();
       
   670     }
       
   671 
       
   672     @AfterTest
       
   673     public void teardown() throws Exception {
       
   674         String sharedClientName =
       
   675                 sharedClient == null ? null : sharedClient.toString();
       
   676         sharedClient = null;
       
   677         Thread.sleep(100);
       
   678         AssertionError fail = TRACKER.check(500);
       
   679         try {
       
   680             http2TestServer.stop();
       
   681             https2TestServer.stop();
       
   682         } finally {
       
   683             if (fail != null) {
       
   684                 if (sharedClientName != null) {
       
   685                     System.err.println("Shared client name is: " + sharedClientName);
       
   686                 }
       
   687                 throw fail;
       
   688             }
       
   689         }
       
   690     }
       
   691 
       
   692     static final BiPredicate<String,String> ACCEPT_ALL = (x, y) -> true;
       
   693 
       
   694     private static void pushPromiseFor(HttpTestExchange t,
       
   695                                        URI requestURI,
       
   696                                        String pushPath,
       
   697                                        boolean fixed)
       
   698         throws IOException
       
   699     {
       
   700         try {
       
   701             URI promise = new URI(requestURI.getScheme(),
       
   702                     requestURI.getAuthority(),
       
   703                     pushPath, null, null);
       
   704             byte[] promiseBytes = promise.toASCIIString().getBytes(UTF_8);
       
   705             out.printf("TestServer: %s Pushing promise: %s%n", now(), promise);
       
   706             err.printf("TestServer: %s Pushing promise: %s%n", now(), promise);
       
   707             HttpHeaders headers;
       
   708             if (fixed) {
       
   709                 String length = String.valueOf(promiseBytes.length);
       
   710                 headers = HttpHeaders.of(Map.of("Content-Length", List.of(length)),
       
   711                                          ACCEPT_ALL);
       
   712             } else {
       
   713                 headers = HttpHeaders.of(Map.of(), ACCEPT_ALL); // empty
       
   714             }
       
   715             t.serverPush(promise, headers, promiseBytes);
       
   716         } catch (URISyntaxException x) {
       
   717             throw new IOException(x.getMessage(), x);
       
   718         }
       
   719     }
       
   720 
       
   721     static class HTTP_FixedLengthHandler implements HttpTestHandler {
       
   722         @Override
       
   723         public void handle(HttpTestExchange t) throws IOException {
       
   724             out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI());
       
   725             try (InputStream is = t.getRequestBody()) {
       
   726                 is.readAllBytes();
       
   727             }
       
   728             URI requestURI = t.getRequestURI();
       
   729             for (int i = 1; i<2; i++) {
       
   730                 String path = requestURI.getPath() + "/before/promise-" + i;
       
   731                 pushPromiseFor(t, requestURI, path, true);
       
   732             }
       
   733             byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8);
       
   734             t.sendResponseHeaders(200, resp.length);  //fixed content length
       
   735             try (OutputStream os = t.getResponseBody()) {
       
   736                 int bytes = resp.length/3;
       
   737                 for (int i = 0; i<2; i++) {
       
   738                     String path = requestURI.getPath() + "/after/promise-" + (i + 2);
       
   739                     os.write(resp, i * bytes, bytes);
       
   740                     os.flush();
       
   741                     pushPromiseFor(t, requestURI, path, true);
       
   742                 }
       
   743                 os.write(resp, 2*bytes, resp.length - 2*bytes);
       
   744             }
       
   745         }
       
   746 
       
   747     }
       
   748 
       
   749     static class HTTP_ChunkedHandler implements HttpTestHandler {
       
   750         @Override
       
   751         public void handle(HttpTestExchange t) throws IOException {
       
   752             out.println("HTTP_ChunkedHandler received request to " + t.getRequestURI());
       
   753             byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8);
       
   754             try (InputStream is = t.getRequestBody()) {
       
   755                 is.readAllBytes();
       
   756             }
       
   757             URI requestURI = t.getRequestURI();
       
   758             for (int i = 1; i<2; i++) {
       
   759                 String path = requestURI.getPath() + "/before/promise-" + i;
       
   760                 pushPromiseFor(t, requestURI, path, false);
       
   761             }
       
   762             t.sendResponseHeaders(200, -1); // chunked/variable
       
   763             try (OutputStream os = t.getResponseBody()) {
       
   764                 int bytes = resp.length/3;
       
   765                 for (int i = 0; i<2; i++) {
       
   766                     String path = requestURI.getPath() + "/after/promise-" + (i + 2);
       
   767                     os.write(resp, i * bytes, bytes);
       
   768                     os.flush();
       
   769                     pushPromiseFor(t, requestURI, path, false);
       
   770                 }
       
   771                 os.write(resp, 2*bytes, resp.length - 2*bytes);
       
   772             }
       
   773         }
       
   774     }
       
   775 
       
   776 }