test/jdk/java/net/httpclient/DependentActionsTest.java
changeset 49765 ee6f7a61f3a5
child 50681 4254bed3c09d
child 56451 9585061fdb04
equal deleted inserted replaced
49707:f7fd051519ac 49765:ee6f7a61f3a5
       
     1 /*
       
     2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 /*
       
    25  * @test
       
    26  * @summary Verify that dependent synchronous actions added before the CF
       
    27  *          completes are executed either asynchronously in an executor when the
       
    28  *          CF later completes, or in the user thread that joins.
       
    29  * @library /lib/testlibrary http2/server
       
    30  * @build jdk.testlibrary.SimpleSSLContext HttpServerAdapters ThrowingPublishers
       
    31  * @modules java.base/sun.net.www.http
       
    32  *          java.net.http/jdk.internal.net.http.common
       
    33  *          java.net.http/jdk.internal.net.http.frame
       
    34  *          java.net.http/jdk.internal.net.http.hpack
       
    35  * @run testng/othervm -Djdk.internal.httpclient.debug=true DependentActionsTest
       
    36  * @run testng/othervm/java.security.policy=dependent.policy
       
    37   *        -Djdk.internal.httpclient.debug=true DependentActionsTest
       
    38  */
       
    39 
       
    40 import java.io.BufferedReader;
       
    41 import java.io.InputStreamReader;
       
    42 import java.lang.StackWalker.StackFrame;
       
    43 import com.sun.net.httpserver.HttpServer;
       
    44 import com.sun.net.httpserver.HttpsConfigurator;
       
    45 import com.sun.net.httpserver.HttpsServer;
       
    46 import jdk.testlibrary.SimpleSSLContext;
       
    47 import org.testng.annotations.AfterTest;
       
    48 import org.testng.annotations.AfterClass;
       
    49 import org.testng.annotations.BeforeTest;
       
    50 import org.testng.annotations.DataProvider;
       
    51 import org.testng.annotations.Test;
       
    52 
       
    53 import javax.net.ssl.SSLContext;
       
    54 import java.io.IOException;
       
    55 import java.io.InputStream;
       
    56 import java.io.OutputStream;
       
    57 import java.net.InetAddress;
       
    58 import java.net.InetSocketAddress;
       
    59 import java.net.URI;
       
    60 import java.net.http.HttpClient;
       
    61 import java.net.http.HttpHeaders;
       
    62 import java.net.http.HttpRequest;
       
    63 import java.net.http.HttpResponse;
       
    64 import java.net.http.HttpResponse.BodyHandler;
       
    65 import java.net.http.HttpResponse.BodyHandlers;
       
    66 import java.net.http.HttpResponse.BodySubscriber;
       
    67 import java.nio.ByteBuffer;
       
    68 import java.nio.charset.StandardCharsets;
       
    69 import java.util.EnumSet;
       
    70 import java.util.List;
       
    71 import java.util.Optional;
       
    72 import java.util.concurrent.CompletableFuture;
       
    73 import java.util.concurrent.CompletionException;
       
    74 import java.util.concurrent.CompletionStage;
       
    75 import java.util.concurrent.ConcurrentHashMap;
       
    76 import java.util.concurrent.ConcurrentMap;
       
    77 import java.util.concurrent.Executor;
       
    78 import java.util.concurrent.Executors;
       
    79 import java.util.concurrent.Flow;
       
    80 import java.util.concurrent.Semaphore;
       
    81 import java.util.concurrent.atomic.AtomicBoolean;
       
    82 import java.util.concurrent.atomic.AtomicLong;
       
    83 import java.util.concurrent.atomic.AtomicReference;
       
    84 import java.util.function.Consumer;
       
    85 import java.util.function.Supplier;
       
    86 import java.util.stream.Collectors;
       
    87 import java.util.stream.Stream;
       
    88 
       
    89 import static java.lang.System.out;
       
    90 import static java.lang.String.format;
       
    91 import static org.testng.Assert.assertEquals;
       
    92 import static org.testng.Assert.assertTrue;
       
    93 
       
    94 public class DependentActionsTest implements HttpServerAdapters {
       
    95 
       
    96     SSLContext sslContext;
       
    97     HttpTestServer httpTestServer;    // HTTP/1.1    [ 4 servers ]
       
    98     HttpTestServer httpsTestServer;   // HTTPS/1.1
       
    99     HttpTestServer http2TestServer;   // HTTP/2 ( h2c )
       
   100     HttpTestServer https2TestServer;  // HTTP/2 ( h2  )
       
   101     String httpURI_fixed;
       
   102     String httpURI_chunk;
       
   103     String httpsURI_fixed;
       
   104     String httpsURI_chunk;
       
   105     String http2URI_fixed;
       
   106     String http2URI_chunk;
       
   107     String https2URI_fixed;
       
   108     String https2URI_chunk;
       
   109 
       
   110     static final StackWalker WALKER =
       
   111             StackWalker.getInstance(StackWalker.Option.RETAIN_CLASS_REFERENCE);
       
   112 
       
   113     static final int ITERATION_COUNT = 1;
       
   114     // a shared executor helps reduce the amount of threads created by the test
       
   115     static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
       
   116     static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
       
   117     static volatile boolean tasksFailed;
       
   118     static final AtomicLong serverCount = new AtomicLong();
       
   119     static final AtomicLong clientCount = new AtomicLong();
       
   120     static final long start = System.nanoTime();
       
   121     public static String now() {
       
   122         long now = System.nanoTime() - start;
       
   123         long secs = now / 1000_000_000;
       
   124         long mill = (now % 1000_000_000) / 1000_000;
       
   125         long nan = now % 1000_000;
       
   126         return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);
       
   127     }
       
   128 
       
   129     private volatile HttpClient sharedClient;
       
   130 
       
   131     static class TestExecutor implements Executor {
       
   132         final AtomicLong tasks = new AtomicLong();
       
   133         Executor executor;
       
   134         TestExecutor(Executor executor) {
       
   135             this.executor = executor;
       
   136         }
       
   137 
       
   138         @Override
       
   139         public void execute(Runnable command) {
       
   140             long id = tasks.incrementAndGet();
       
   141             executor.execute(() -> {
       
   142                 try {
       
   143                     command.run();
       
   144                 } catch (Throwable t) {
       
   145                     tasksFailed = true;
       
   146                     System.out.printf(now() + "Task %s failed: %s%n", id, t);
       
   147                     System.err.printf(now() + "Task %s failed: %s%n", id, t);
       
   148                     FAILURES.putIfAbsent("Task " + id, t);
       
   149                     throw t;
       
   150                 }
       
   151             });
       
   152         }
       
   153     }
       
   154 
       
   155     @AfterClass
       
   156     static final void printFailedTests() {
       
   157         out.println("\n=========================");
       
   158         try {
       
   159             out.printf("%n%sCreated %d servers and %d clients%n",
       
   160                     now(), serverCount.get(), clientCount.get());
       
   161             if (FAILURES.isEmpty()) return;
       
   162             out.println("Failed tests: ");
       
   163             FAILURES.entrySet().forEach((e) -> {
       
   164                 out.printf("\t%s: %s%n", e.getKey(), e.getValue());
       
   165                 e.getValue().printStackTrace(out);
       
   166                 e.getValue().printStackTrace();
       
   167             });
       
   168             if (tasksFailed) {
       
   169                 System.out.println("WARNING: Some tasks failed");
       
   170             }
       
   171         } finally {
       
   172             out.println("\n=========================\n");
       
   173         }
       
   174     }
       
   175 
       
   176     private String[] uris() {
       
   177         return new String[] {
       
   178                 httpURI_fixed,
       
   179                 httpURI_chunk,
       
   180                 httpsURI_fixed,
       
   181                 httpsURI_chunk,
       
   182                 http2URI_fixed,
       
   183                 http2URI_chunk,
       
   184                 https2URI_fixed,
       
   185                 https2URI_chunk,
       
   186         };
       
   187     }
       
   188 
       
   189     static final class SemaphoreStallerSupplier
       
   190             implements Supplier<SemaphoreStaller> {
       
   191         @Override
       
   192         public SemaphoreStaller get() {
       
   193             return new SemaphoreStaller();
       
   194         }
       
   195         @Override
       
   196         public String toString() {
       
   197             return "SemaphoreStaller";
       
   198         }
       
   199     }
       
   200 
       
   201     @DataProvider(name = "noStalls")
       
   202     public Object[][] noThrows() {
       
   203         String[] uris = uris();
       
   204         Object[][] result = new Object[uris.length * 2][];
       
   205         int i = 0;
       
   206         for (boolean sameClient : List.of(false, true)) {
       
   207             for (String uri: uris()) {
       
   208                 result[i++] = new Object[] {uri, sameClient};
       
   209             }
       
   210         }
       
   211         assert i == uris.length * 2;
       
   212         return result;
       
   213     }
       
   214 
       
   215     @DataProvider(name = "variants")
       
   216     public Object[][] variants() {
       
   217         String[] uris = uris();
       
   218         Object[][] result = new Object[uris.length * 2][];
       
   219         int i = 0;
       
   220         Supplier<? extends Staller> s = new SemaphoreStallerSupplier();
       
   221         for (Supplier<? extends Staller> staller : List.of(s)) {
       
   222             for (boolean sameClient : List.of(false, true)) {
       
   223                 for (String uri : uris()) {
       
   224                     result[i++] = new Object[]{uri, sameClient, staller};
       
   225                 }
       
   226             }
       
   227         }
       
   228         assert i == uris.length * 2;
       
   229         return result;
       
   230     }
       
   231 
       
   232     private HttpClient makeNewClient() {
       
   233         clientCount.incrementAndGet();
       
   234         return HttpClient.newBuilder()
       
   235                 .executor(executor)
       
   236                 .sslContext(sslContext)
       
   237                 .build();
       
   238     }
       
   239 
       
   240     HttpClient newHttpClient(boolean share) {
       
   241         if (!share) return makeNewClient();
       
   242         HttpClient shared = sharedClient;
       
   243         if (shared != null) return shared;
       
   244         synchronized (this) {
       
   245             shared = sharedClient;
       
   246             if (shared == null) {
       
   247                 shared = sharedClient = makeNewClient();
       
   248             }
       
   249             return shared;
       
   250         }
       
   251     }
       
   252 
       
   253     @Test(dataProvider = "noStalls")
       
   254     public void testNoStalls(String uri, boolean sameClient)
       
   255             throws Exception {
       
   256         HttpClient client = null;
       
   257         out.printf("%ntestNoStalls(%s, %b)%n", uri, sameClient);
       
   258         for (int i=0; i< ITERATION_COUNT; i++) {
       
   259             if (!sameClient || client == null)
       
   260                 client = newHttpClient(sameClient);
       
   261 
       
   262             HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
       
   263                     .build();
       
   264             BodyHandler<String> handler =
       
   265                     new StallingBodyHandler((w) -> {},
       
   266                             BodyHandlers.ofString());
       
   267             HttpResponse<String> response = client.send(req, handler);
       
   268             String body = response.body();
       
   269             assertEquals(URI.create(body).getPath(), URI.create(uri).getPath());
       
   270         }
       
   271     }
       
   272 
       
   273     @Test(dataProvider = "variants")
       
   274     public void testAsStringAsync(String uri,
       
   275                                   boolean sameClient,
       
   276                                   Supplier<Staller> s)
       
   277             throws Exception
       
   278     {
       
   279         Staller staller = s.get();
       
   280         String test = format("testAsStringAsync(%s, %b, %s)",
       
   281                 uri, sameClient, staller);
       
   282         testDependent(test, uri, sameClient, BodyHandlers::ofString,
       
   283                 this::finish, this::extractString, staller);
       
   284     }
       
   285 
       
   286     @Test(dataProvider = "variants")
       
   287     public void testAsLinesAsync(String uri,
       
   288                                  boolean sameClient,
       
   289                                  Supplier<Staller> s)
       
   290             throws Exception
       
   291     {
       
   292         Staller staller = s.get();
       
   293         String test = format("testAsLinesAsync(%s, %b, %s)",
       
   294                 uri, sameClient, staller);
       
   295         testDependent(test, uri, sameClient, BodyHandlers::ofLines,
       
   296                 this::finish, this::extractStream, staller);
       
   297     }
       
   298 
       
   299     @Test(dataProvider = "variants")
       
   300     public void testAsInputStreamAsync(String uri,
       
   301                                        boolean sameClient,
       
   302                                        Supplier<Staller> s)
       
   303             throws Exception
       
   304     {
       
   305         Staller staller = s.get();
       
   306         String test = format("testAsInputStreamAsync(%s, %b, %s)",
       
   307                 uri, sameClient, staller);
       
   308         testDependent(test, uri, sameClient, BodyHandlers::ofInputStream,
       
   309                 this::finish, this::extractInputStream, staller);
       
   310     }
       
   311 
       
   312     private <T,U> void testDependent(String name, String uri, boolean sameClient,
       
   313                                      Supplier<BodyHandler<T>> handlers,
       
   314                                      Finisher finisher,
       
   315                                      Extractor extractor,
       
   316                                      Staller staller)
       
   317             throws Exception
       
   318     {
       
   319         out.printf("%n%s%s%n", now(), name);
       
   320         try {
       
   321             testDependent(uri, sameClient, handlers, finisher, extractor, staller);
       
   322         } catch (Error | Exception x) {
       
   323             FAILURES.putIfAbsent(name, x);
       
   324             throw x;
       
   325         }
       
   326     }
       
   327 
       
   328     private <T,U> void testDependent(String uri, boolean sameClient,
       
   329                                      Supplier<BodyHandler<T>> handlers,
       
   330                                      Finisher finisher,
       
   331                                      Extractor extractor,
       
   332                                      Staller staller)
       
   333             throws Exception
       
   334     {
       
   335         HttpClient client = null;
       
   336         for (Where where : EnumSet.of(Where.BODY_HANDLER)) {
       
   337             if (!sameClient || client == null)
       
   338                 client = newHttpClient(sameClient);
       
   339 
       
   340             HttpRequest req = HttpRequest.
       
   341                     newBuilder(URI.create(uri))
       
   342                     .build();
       
   343             BodyHandler<T> handler =
       
   344                     new StallingBodyHandler(where.select(staller), handlers.get());
       
   345             System.out.println("try stalling in " + where);
       
   346             staller.acquire();
       
   347             assert staller.willStall();
       
   348             CompletableFuture<HttpResponse<T>> responseCF = client.sendAsync(req, handler);
       
   349             assert !responseCF.isDone();
       
   350             finisher.finish(where, responseCF, staller, extractor);
       
   351         }
       
   352     }
       
   353 
       
   354     enum Where {
       
   355         BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY, BODY_CF;
       
   356         public Consumer<Where> select(Consumer<Where> consumer) {
       
   357             return new Consumer<Where>() {
       
   358                 @Override
       
   359                 public void accept(Where where) {
       
   360                     if (Where.this == where) {
       
   361                         consumer.accept(where);
       
   362                     }
       
   363                 }
       
   364             };
       
   365         }
       
   366     }
       
   367 
       
   368     interface Extractor<T> {
       
   369         public List<String> extract(HttpResponse<T> resp);
       
   370     }
       
   371 
       
   372     final List<String> extractString(HttpResponse<String> resp) {
       
   373         return List.of(resp.body());
       
   374     }
       
   375 
       
   376     final List<String> extractStream(HttpResponse<Stream<String>> resp) {
       
   377         return resp.body().collect(Collectors.toList());
       
   378     }
       
   379 
       
   380     final List<String> extractInputStream(HttpResponse<InputStream> resp) {
       
   381         try (InputStream is = resp.body()) {
       
   382             return new BufferedReader(new InputStreamReader(is))
       
   383                     .lines().collect(Collectors.toList());
       
   384         } catch (IOException x) {
       
   385             throw new CompletionException(x);
       
   386         }
       
   387     }
       
   388 
       
   389     interface Finisher<T> {
       
   390         public void finish(Where w,
       
   391                            CompletableFuture<HttpResponse<T>> cf,
       
   392                            Staller staller,
       
   393                            Extractor extractor);
       
   394     }
       
   395 
       
   396     Optional<StackFrame> findFrame(Stream<StackFrame> s, String name) {
       
   397         return s.filter((f) -> f.getClassName().contains(name))
       
   398                 .filter((f) -> f.getDeclaringClass().getModule().equals(HttpClient.class.getModule()))
       
   399                 .findFirst();
       
   400     }
       
   401 
       
   402     <T> void checkThreadAndStack(Thread thread,
       
   403                                  AtomicReference<RuntimeException> failed,
       
   404                                  T result,
       
   405                                  Throwable error) {
       
   406         if (Thread.currentThread() == thread) {
       
   407             //failed.set(new RuntimeException("Dependant action was executed in " + thread));
       
   408             List<StackFrame> httpStack = WALKER.walk(s -> s.filter(f -> f.getDeclaringClass()
       
   409                     .getModule().equals(HttpClient.class.getModule()))
       
   410                     .collect(Collectors.toList()));
       
   411             if (!httpStack.isEmpty()) {
       
   412                 System.out.println("Found unexpected trace: ");
       
   413                 httpStack.forEach(f -> System.out.printf("\t%s%n", f));
       
   414                 failed.set(new RuntimeException("Dependant action has unexpected frame in " +
       
   415                         Thread.currentThread() + ": " + httpStack.get(0)));
       
   416 
       
   417             }
       
   418             return;
       
   419         } else if (System.getSecurityManager() != null) {
       
   420             Optional<StackFrame> sf = WALKER.walk(s -> findFrame(s, "PrivilegedRunnable"));
       
   421             if (!sf.isPresent()) {
       
   422                 failed.set(new RuntimeException("Dependant action does not have expected frame in "
       
   423                         + Thread.currentThread()));
       
   424                 return;
       
   425             } else {
       
   426                 System.out.println("Found expected frame: " + sf.get());
       
   427             }
       
   428         } else {
       
   429             List<StackFrame> httpStack = WALKER.walk(s -> s.filter(f -> f.getDeclaringClass()
       
   430                     .getModule().equals(HttpClient.class.getModule()))
       
   431                     .collect(Collectors.toList()));
       
   432             if (!httpStack.isEmpty()) {
       
   433                 System.out.println("Found unexpected trace: ");
       
   434                 httpStack.forEach(f -> System.out.printf("\t%s%n", f));
       
   435                 failed.set(new RuntimeException("Dependant action has unexpected frame in " +
       
   436                         Thread.currentThread() + ": " + httpStack.get(0)));
       
   437 
       
   438             }
       
   439         }
       
   440     }
       
   441 
       
   442     <T> void finish(Where w, CompletableFuture<HttpResponse<T>> cf,
       
   443                     Staller staller,
       
   444                     Extractor<T> extractor) {
       
   445         Thread thread = Thread.currentThread();
       
   446         AtomicReference<RuntimeException> failed = new AtomicReference<>();
       
   447         CompletableFuture<HttpResponse<T>> done = cf.whenComplete(
       
   448                 (r,t) -> checkThreadAndStack(thread, failed, r, t));
       
   449         assert !cf.isDone();
       
   450         try {
       
   451             Thread.sleep(100);
       
   452         } catch (Throwable t) {/* don't care */}
       
   453         assert !cf.isDone();
       
   454         staller.release();
       
   455         try {
       
   456             HttpResponse<T> response = done.join();
       
   457             List<String> result = extractor.extract(response);
       
   458             RuntimeException error = failed.get();
       
   459             if (error != null) {
       
   460                 throw new RuntimeException("Test failed in "
       
   461                         + w + ": " + response, error);
       
   462             }
       
   463             assertEquals(result, List.of(response.request().uri().getPath()));
       
   464         } finally {
       
   465             staller.reset();
       
   466         }
       
   467     }
       
   468 
       
   469     interface Staller extends Consumer<Where> {
       
   470         void release();
       
   471         void acquire();
       
   472         void reset();
       
   473         boolean willStall();
       
   474     }
       
   475 
       
   476     static final class SemaphoreStaller implements Staller {
       
   477         final Semaphore sem = new Semaphore(1);
       
   478         @Override
       
   479         public void accept(Where where) {
       
   480             System.out.println("Acquiring semaphore in "
       
   481                     + where + " permits=" + sem.availablePermits());
       
   482             sem.acquireUninterruptibly();
       
   483             System.out.println("Semaphored acquired in " + where);
       
   484         }
       
   485 
       
   486         @Override
       
   487         public void release() {
       
   488             System.out.println("Releasing semaphore: permits="
       
   489                     + sem.availablePermits());
       
   490             sem.release();
       
   491         }
       
   492 
       
   493         @Override
       
   494         public void acquire() {
       
   495             sem.acquireUninterruptibly();
       
   496             System.out.println("Semaphored acquired");
       
   497         }
       
   498 
       
   499         @Override
       
   500         public void reset() {
       
   501             System.out.println("Reseting semaphore: permits="
       
   502                     + sem.availablePermits());
       
   503             sem.drainPermits();
       
   504             sem.release();
       
   505             System.out.println("Semaphore reset: permits="
       
   506                     + sem.availablePermits());
       
   507         }
       
   508 
       
   509         @Override
       
   510         public boolean willStall() {
       
   511             return sem.availablePermits() <= 0;
       
   512         }
       
   513 
       
   514         @Override
       
   515         public String toString() {
       
   516             return "SemaphoreStaller";
       
   517         }
       
   518     }
       
   519 
       
   520     static final class StallingBodyHandler<T> implements BodyHandler<T> {
       
   521         final Consumer<Where> stalling;
       
   522         final BodyHandler<T> bodyHandler;
       
   523         StallingBodyHandler(Consumer<Where> stalling, BodyHandler<T> bodyHandler) {
       
   524             this.stalling = stalling;
       
   525             this.bodyHandler = bodyHandler;
       
   526         }
       
   527         @Override
       
   528         public BodySubscriber<T> apply(HttpResponse.ResponseInfo rinfo) {
       
   529             stalling.accept(Where.BODY_HANDLER);
       
   530             BodySubscriber<T> subscriber = bodyHandler.apply(rinfo);
       
   531             return new StallingBodySubscriber(stalling, subscriber);
       
   532         }
       
   533     }
       
   534 
       
   535     static final class StallingBodySubscriber<T> implements BodySubscriber<T> {
       
   536         private final BodySubscriber<T> subscriber;
       
   537         volatile boolean onSubscribeCalled;
       
   538         final Consumer<Where> stalling;
       
   539         StallingBodySubscriber(Consumer<Where> stalling, BodySubscriber<T> subscriber) {
       
   540             this.stalling = stalling;
       
   541             this.subscriber = subscriber;
       
   542         }
       
   543 
       
   544         @Override
       
   545         public void onSubscribe(Flow.Subscription subscription) {
       
   546             //out.println("onSubscribe ");
       
   547             onSubscribeCalled = true;
       
   548             stalling.accept(Where.ON_SUBSCRIBE);
       
   549             subscriber.onSubscribe(subscription);
       
   550         }
       
   551 
       
   552         @Override
       
   553         public void onNext(List<ByteBuffer> item) {
       
   554             // out.println("onNext " + item);
       
   555             assertTrue(onSubscribeCalled);
       
   556             stalling.accept(Where.ON_NEXT);
       
   557             subscriber.onNext(item);
       
   558         }
       
   559 
       
   560         @Override
       
   561         public void onError(Throwable throwable) {
       
   562             //out.println("onError");
       
   563             assertTrue(onSubscribeCalled);
       
   564             stalling.accept(Where.ON_ERROR);
       
   565             subscriber.onError(throwable);
       
   566         }
       
   567 
       
   568         @Override
       
   569         public void onComplete() {
       
   570             //out.println("onComplete");
       
   571             assertTrue(onSubscribeCalled, "onComplete called before onSubscribe");
       
   572             stalling.accept(Where.ON_COMPLETE);
       
   573             subscriber.onComplete();
       
   574         }
       
   575 
       
   576         @Override
       
   577         public CompletionStage<T> getBody() {
       
   578             stalling.accept(Where.GET_BODY);
       
   579             try {
       
   580                 stalling.accept(Where.BODY_CF);
       
   581             } catch (Throwable t) {
       
   582                 return CompletableFuture.failedFuture(t);
       
   583             }
       
   584             return subscriber.getBody();
       
   585         }
       
   586     }
       
   587 
       
   588 
       
   589     @BeforeTest
       
   590     public void setup() throws Exception {
       
   591         sslContext = new SimpleSSLContext().get();
       
   592         if (sslContext == null)
       
   593             throw new AssertionError("Unexpected null sslContext");
       
   594 
       
   595         // HTTP/1.1
       
   596         HttpTestHandler h1_fixedLengthHandler = new HTTP_FixedLengthHandler();
       
   597         HttpTestHandler h1_chunkHandler = new HTTP_ChunkedHandler();
       
   598         InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
       
   599         httpTestServer = HttpTestServer.of(HttpServer.create(sa, 0));
       
   600         httpTestServer.addHandler(h1_fixedLengthHandler, "/http1/fixed");
       
   601         httpTestServer.addHandler(h1_chunkHandler, "/http1/chunk");
       
   602         httpURI_fixed = "http://" + httpTestServer.serverAuthority() + "/http1/fixed/x";
       
   603         httpURI_chunk = "http://" + httpTestServer.serverAuthority() + "/http1/chunk/x";
       
   604 
       
   605         HttpsServer httpsServer = HttpsServer.create(sa, 0);
       
   606         httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
       
   607         httpsTestServer = HttpTestServer.of(httpsServer);
       
   608         httpsTestServer.addHandler(h1_fixedLengthHandler, "/https1/fixed");
       
   609         httpsTestServer.addHandler(h1_chunkHandler, "/https1/chunk");
       
   610         httpsURI_fixed = "https://" + httpsTestServer.serverAuthority() + "/https1/fixed/x";
       
   611         httpsURI_chunk = "https://" + httpsTestServer.serverAuthority() + "/https1/chunk/x";
       
   612 
       
   613         // HTTP/2
       
   614         HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler();
       
   615         HttpTestHandler h2_chunkedHandler = new HTTP_ChunkedHandler();
       
   616 
       
   617         http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
       
   618         http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
       
   619         http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
       
   620         http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed/x";
       
   621         http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk/x";
       
   622 
       
   623         https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, 0));
       
   624         https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
       
   625         https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
       
   626         https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed/x";
       
   627         https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk/x";
       
   628 
       
   629         serverCount.addAndGet(4);
       
   630         httpTestServer.start();
       
   631         httpsTestServer.start();
       
   632         http2TestServer.start();
       
   633         https2TestServer.start();
       
   634     }
       
   635 
       
   636     @AfterTest
       
   637     public void teardown() throws Exception {
       
   638         sharedClient = null;
       
   639         httpTestServer.stop();
       
   640         httpsTestServer.stop();
       
   641         http2TestServer.stop();
       
   642         https2TestServer.stop();
       
   643     }
       
   644 
       
   645     static class HTTP_FixedLengthHandler implements HttpTestHandler {
       
   646         @Override
       
   647         public void handle(HttpTestExchange t) throws IOException {
       
   648             out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI());
       
   649             try (InputStream is = t.getRequestBody()) {
       
   650                 is.readAllBytes();
       
   651             }
       
   652             byte[] resp = t.getRequestURI().getPath().getBytes(StandardCharsets.UTF_8);
       
   653             t.sendResponseHeaders(200, resp.length);  //fixed content length
       
   654             try (OutputStream os = t.getResponseBody()) {
       
   655                 os.write(resp);
       
   656             }
       
   657         }
       
   658     }
       
   659 
       
   660     static class HTTP_ChunkedHandler implements HttpTestHandler {
       
   661         @Override
       
   662         public void handle(HttpTestExchange t) throws IOException {
       
   663             out.println("HTTP_ChunkedHandler received request to " + t.getRequestURI());
       
   664             byte[] resp = t.getRequestURI().getPath().toString().getBytes(StandardCharsets.UTF_8);
       
   665             try (InputStream is = t.getRequestBody()) {
       
   666                 is.readAllBytes();
       
   667             }
       
   668             t.sendResponseHeaders(200, -1); // chunked/variable
       
   669             try (OutputStream os = t.getResponseBody()) {
       
   670                 os.write(resp);
       
   671             }
       
   672         }
       
   673     }
       
   674 }