test/jdk/java/net/httpclient/AbstractThrowingSubscribers.java
branchhttp-client-branch
changeset 56778 b77704d050f8
parent 56771 73a6534bce94
child 56795 03ece2518428
equal deleted inserted replaced
56777:62a6a0f24efa 56778:b77704d050f8
       
     1 /*
       
     2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 import com.sun.net.httpserver.HttpServer;
       
    25 import com.sun.net.httpserver.HttpsConfigurator;
       
    26 import com.sun.net.httpserver.HttpsServer;
       
    27 import jdk.testlibrary.SimpleSSLContext;
       
    28 import org.testng.annotations.AfterTest;
       
    29 import org.testng.annotations.AfterClass;
       
    30 import org.testng.annotations.BeforeTest;
       
    31 import org.testng.annotations.DataProvider;
       
    32 import org.testng.annotations.Test;
       
    33 
       
    34 import javax.net.ssl.SSLContext;
       
    35 import java.io.BufferedReader;
       
    36 import java.io.IOException;
       
    37 import java.io.InputStream;
       
    38 import java.io.InputStreamReader;
       
    39 import java.io.OutputStream;
       
    40 import java.io.UncheckedIOException;
       
    41 import java.net.InetAddress;
       
    42 import java.net.InetSocketAddress;
       
    43 import java.net.URI;
       
    44 import java.net.http.HttpClient;
       
    45 import java.net.http.HttpHeaders;
       
    46 import java.net.http.HttpRequest;
       
    47 import java.net.http.HttpResponse;
       
    48 import java.net.http.HttpResponse.BodyHandler;
       
    49 import java.net.http.HttpResponse.BodyHandlers;
       
    50 import java.net.http.HttpResponse.BodySubscriber;
       
    51 import java.nio.ByteBuffer;
       
    52 import java.nio.charset.StandardCharsets;
       
    53 import java.util.EnumSet;
       
    54 import java.util.List;
       
    55 import java.util.concurrent.CompletableFuture;
       
    56 import java.util.concurrent.CompletionStage;
       
    57 import java.util.concurrent.ConcurrentHashMap;
       
    58 import java.util.concurrent.ConcurrentMap;
       
    59 import java.util.concurrent.Executor;
       
    60 import java.util.concurrent.Executors;
       
    61 import java.util.concurrent.Flow;
       
    62 import java.util.concurrent.atomic.AtomicLong;
       
    63 import java.util.function.Consumer;
       
    64 import java.util.function.Predicate;
       
    65 import java.util.function.Supplier;
       
    66 import java.util.stream.Collectors;
       
    67 import java.util.stream.Stream;
       
    68 
       
    69 import static java.lang.System.out;
       
    70 import static java.lang.String.format;
       
    71 import static java.nio.charset.StandardCharsets.UTF_8;
       
    72 import static org.testng.Assert.assertEquals;
       
    73 import static org.testng.Assert.assertTrue;
       
    74 
       
    75 public abstract class AbstractThrowingSubscribers implements HttpServerAdapters {
       
    76 
       
    77     SSLContext sslContext;
       
    78     HttpTestServer httpTestServer;    // HTTP/1.1    [ 4 servers ]
       
    79     HttpTestServer httpsTestServer;   // HTTPS/1.1
       
    80     HttpTestServer http2TestServer;   // HTTP/2 ( h2c )
       
    81     HttpTestServer https2TestServer;  // HTTP/2 ( h2  )
       
    82     String httpURI_fixed;
       
    83     String httpURI_chunk;
       
    84     String httpsURI_fixed;
       
    85     String httpsURI_chunk;
       
    86     String http2URI_fixed;
       
    87     String http2URI_chunk;
       
    88     String https2URI_fixed;
       
    89     String https2URI_chunk;
       
    90 
       
    91     static final int ITERATION_COUNT = 1;
       
    92     // a shared executor helps reduce the amount of threads created by the test
       
    93     static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
       
    94     static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
       
    95     static volatile boolean tasksFailed;
       
    96     static final AtomicLong serverCount = new AtomicLong();
       
    97     static final AtomicLong clientCount = new AtomicLong();
       
    98     static final long start = System.nanoTime();
       
    99     public static String now() {
       
   100         long now = System.nanoTime() - start;
       
   101         long secs = now / 1000_000_000;
       
   102         long mill = (now % 1000_000_000) / 1000_000;
       
   103         long nan = now % 1000_000;
       
   104         return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);
       
   105     }
       
   106 
       
   107     final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
       
   108     private volatile HttpClient sharedClient;
       
   109 
       
   110     static class TestExecutor implements Executor {
       
   111         final AtomicLong tasks = new AtomicLong();
       
   112         Executor executor;
       
   113         TestExecutor(Executor executor) {
       
   114             this.executor = executor;
       
   115         }
       
   116 
       
   117         @Override
       
   118         public void execute(Runnable command) {
       
   119             long id = tasks.incrementAndGet();
       
   120             executor.execute(() -> {
       
   121                 try {
       
   122                     command.run();
       
   123                 } catch (Throwable t) {
       
   124                     tasksFailed = true;
       
   125                     System.out.printf(now() + "Task %s failed: %s%n", id, t);
       
   126                     System.err.printf(now() + "Task %s failed: %s%n", id, t);
       
   127                     FAILURES.putIfAbsent("Task " + id, t);
       
   128                     throw t;
       
   129                 }
       
   130             });
       
   131         }
       
   132     }
       
   133 
       
   134     @AfterClass
       
   135     static final void printFailedTests() {
       
   136         out.println("\n=========================");
       
   137         try {
       
   138             out.printf("%n%sCreated %d servers and %d clients%n",
       
   139                     now(), serverCount.get(), clientCount.get());
       
   140             if (FAILURES.isEmpty()) return;
       
   141             out.println("Failed tests: ");
       
   142             FAILURES.entrySet().forEach((e) -> {
       
   143                 out.printf("\t%s: %s%n", e.getKey(), e.getValue());
       
   144                 e.getValue().printStackTrace(out);
       
   145                 e.getValue().printStackTrace();
       
   146             });
       
   147             if (tasksFailed) {
       
   148                 System.out.println("WARNING: Some tasks failed");
       
   149             }
       
   150         } finally {
       
   151             out.println("\n=========================\n");
       
   152         }
       
   153     }
       
   154 
       
   155     private String[] uris() {
       
   156         return new String[] {
       
   157                 httpURI_fixed,
       
   158                 httpURI_chunk,
       
   159                 httpsURI_fixed,
       
   160                 httpsURI_chunk,
       
   161                 http2URI_fixed,
       
   162                 http2URI_chunk,
       
   163                 https2URI_fixed,
       
   164                 https2URI_chunk,
       
   165         };
       
   166     }
       
   167 
       
   168     static AtomicLong URICOUNT = new AtomicLong();
       
   169 
       
   170     @DataProvider(name = "sanity")
       
   171     public Object[][] sanity() {
       
   172         String[] uris = uris();
       
   173         Object[][] result = new Object[uris.length * 2][];
       
   174         int i = 0;
       
   175         for (boolean sameClient : List.of(false, true)) {
       
   176             for (String uri: uris()) {
       
   177                 result[i++] = new Object[] {uri, sameClient};
       
   178             }
       
   179         }
       
   180         assert i == uris.length * 2;
       
   181         return result;
       
   182     }
       
   183 
       
   184     @DataProvider(name = "variants")
       
   185     public Object[][] variants() {
       
   186         String[] uris = uris();
       
   187         Object[][] result = new Object[uris.length * 2 * 2][];
       
   188         int i = 0;
       
   189         for (Thrower thrower : List.of(
       
   190                 new UncheckedIOExceptionThrower(),
       
   191                 new UncheckedCustomExceptionThrower())) {
       
   192             for (boolean sameClient : List.of(false, true)) {
       
   193                 for (String uri : uris()) {
       
   194                     result[i++] = new Object[]{uri, sameClient, thrower};
       
   195                 }
       
   196             }
       
   197         }
       
   198         assert i == uris.length * 2 * 2;
       
   199         return result;
       
   200     }
       
   201 
       
   202     private HttpClient makeNewClient() {
       
   203         clientCount.incrementAndGet();
       
   204         HttpClient client =  HttpClient.newBuilder()
       
   205                 .proxy(HttpClient.Builder.NO_PROXY)
       
   206                 .executor(executor)
       
   207                 .sslContext(sslContext)
       
   208                 .build();
       
   209         return TRACKER.track(client);
       
   210     }
       
   211 
       
   212     HttpClient newHttpClient(boolean share) {
       
   213         if (!share) return makeNewClient();
       
   214         HttpClient shared = sharedClient;
       
   215         if (shared != null) return shared;
       
   216         synchronized (this) {
       
   217             shared = sharedClient;
       
   218             if (shared == null) {
       
   219                 shared = sharedClient = makeNewClient();
       
   220             }
       
   221             return shared;
       
   222         }
       
   223     }
       
   224 
       
   225     enum SubscriberType {
       
   226         INLINE,  // In line subscribers complete their CF on ON_COMPLETE
       
   227                  // e.g. BodySubscribers::ofString
       
   228         OFFLINE; // Off line subscribers complete their CF immediately
       
   229                  // but require the client to pull the data after the
       
   230                  // CF completes (e.g. BodySubscribers::ofInputStream)
       
   231     }
       
   232 
       
   233     static EnumSet<Where> excludes(SubscriberType type) {
       
   234         EnumSet<Where> set = EnumSet.noneOf(Where.class);
       
   235 
       
   236         if (type == SubscriberType.OFFLINE) {
       
   237             // Throwing on onSubscribe needs some more work
       
   238             // for the case of InputStream, where the body has already
       
   239             // completed by the time the subscriber is subscribed.
       
   240             // The only way we have at that point to relay the exception
       
   241             // is to call onError on the subscriber, but should we if
       
   242             // Subscriber::onSubscribed has thrown an exception and
       
   243             // not completed normally?
       
   244             set.add(Where.ON_SUBSCRIBE);
       
   245         }
       
   246 
       
   247         // Don't know how to make the stack reliably cause onError
       
   248         // to be called without closing the connection.
       
   249         // And how do we get the exception if onError throws anyway?
       
   250         set.add(Where.ON_ERROR);
       
   251 
       
   252         return set;
       
   253     }
       
   254 
       
   255     //@Test(dataProvider = "sanity")
       
   256     protected void testSanityImpl(String uri, boolean sameClient)
       
   257             throws Exception {
       
   258         HttpClient client = null;
       
   259         String uri2 = uri + "-" + URICOUNT.incrementAndGet() + "/sanity";
       
   260         out.printf("%ntestSanity(%s, %b)%n", uri2, sameClient);
       
   261         for (int i=0; i< ITERATION_COUNT; i++) {
       
   262             if (!sameClient || client == null)
       
   263                 client = newHttpClient(sameClient);
       
   264 
       
   265             HttpRequest req = HttpRequest.newBuilder(URI.create(uri2))
       
   266                     .build();
       
   267             BodyHandler<String> handler =
       
   268                     new ThrowingBodyHandler((w) -> {},
       
   269                                             BodyHandlers.ofString());
       
   270             HttpResponse<String> response = client.send(req, handler);
       
   271             String body = response.body();
       
   272             assertEquals(URI.create(body).getPath(), URI.create(uri2).getPath());
       
   273         }
       
   274     }
       
   275 
       
   276     //@Test(dataProvider = "variants")
       
   277     protected void testThrowingAsStringImpl(String uri,
       
   278                                      boolean sameClient,
       
   279                                      Thrower thrower)
       
   280             throws Exception
       
   281     {
       
   282         uri = uri + "-" + URICOUNT.incrementAndGet();
       
   283         String test = format("testThrowingAsString(%s, %b, %s)",
       
   284                              uri, sameClient, thrower);
       
   285         testThrowing(test, uri, sameClient, BodyHandlers::ofString,
       
   286                 this::shouldHaveThrown, thrower,false,
       
   287                 excludes(SubscriberType.INLINE));
       
   288     }
       
   289 
       
   290     //@Test(dataProvider = "variants")
       
   291     protected void testThrowingAsLinesImpl(String uri,
       
   292                                     boolean sameClient,
       
   293                                     Thrower thrower)
       
   294             throws Exception
       
   295     {
       
   296         uri = uri + "-" + URICOUNT.incrementAndGet();
       
   297         String test =  format("testThrowingAsLines(%s, %b, %s)",
       
   298                 uri, sameClient, thrower);
       
   299         testThrowing(test, uri, sameClient, BodyHandlers::ofLines,
       
   300                 this::checkAsLines, thrower,false,
       
   301                 excludes(SubscriberType.OFFLINE));
       
   302     }
       
   303 
       
   304     //@Test(dataProvider = "variants")
       
   305     protected void testThrowingAsInputStreamImpl(String uri,
       
   306                                           boolean sameClient,
       
   307                                           Thrower thrower)
       
   308             throws Exception
       
   309     {
       
   310         uri = uri + "-" + URICOUNT.incrementAndGet();
       
   311         String test = format("testThrowingAsInputStream(%s, %b, %s)",
       
   312                 uri, sameClient, thrower);
       
   313         testThrowing(test, uri, sameClient, BodyHandlers::ofInputStream,
       
   314                 this::checkAsInputStream,  thrower,false,
       
   315                 excludes(SubscriberType.OFFLINE));
       
   316     }
       
   317 
       
   318     //@Test(dataProvider = "variants")
       
   319     protected void testThrowingAsStringAsyncImpl(String uri,
       
   320                                           boolean sameClient,
       
   321                                           Thrower thrower)
       
   322             throws Exception
       
   323     {
       
   324         uri = uri + "-" + URICOUNT.incrementAndGet();
       
   325         String test = format("testThrowingAsStringAsync(%s, %b, %s)",
       
   326                 uri, sameClient, thrower);
       
   327         testThrowing(test, uri, sameClient, BodyHandlers::ofString,
       
   328                      this::shouldHaveThrown, thrower, true,
       
   329                 excludes(SubscriberType.INLINE));
       
   330     }
       
   331 
       
   332     //@Test(dataProvider = "variants")
       
   333     protected void testThrowingAsLinesAsyncImpl(String uri,
       
   334                                          boolean sameClient,
       
   335                                          Thrower thrower)
       
   336             throws Exception
       
   337     {
       
   338         uri = uri + "-" + URICOUNT.incrementAndGet();
       
   339         String test = format("testThrowingAsLinesAsync(%s, %b, %s)",
       
   340                 uri, sameClient, thrower);
       
   341         testThrowing(test, uri, sameClient, BodyHandlers::ofLines,
       
   342                 this::checkAsLines, thrower,true,
       
   343                 excludes(SubscriberType.OFFLINE));
       
   344     }
       
   345 
       
   346     //@Test(dataProvider = "variants")
       
   347     protected void testThrowingAsInputStreamAsyncImpl(String uri,
       
   348                                                boolean sameClient,
       
   349                                                Thrower thrower)
       
   350             throws Exception
       
   351     {
       
   352         uri = uri + "-" + URICOUNT.incrementAndGet();
       
   353         String test = format("testThrowingAsInputStreamAsync(%s, %b, %s)",
       
   354                 uri, sameClient, thrower);
       
   355         testThrowing(test, uri, sameClient, BodyHandlers::ofInputStream,
       
   356                 this::checkAsInputStream, thrower,true,
       
   357                 excludes(SubscriberType.OFFLINE));
       
   358     }
       
   359 
       
   360     private <T,U> void testThrowing(String name, String uri, boolean sameClient,
       
   361                                     Supplier<BodyHandler<T>> handlers,
       
   362                                     Finisher finisher, Thrower thrower,
       
   363                                     boolean async, EnumSet<Where> excludes)
       
   364             throws Exception
       
   365     {
       
   366         out.printf("%n%s%s%n", now(), name);
       
   367         try {
       
   368             testThrowing(uri, sameClient, handlers, finisher, thrower, async, excludes);
       
   369         } catch (Error | Exception x) {
       
   370             FAILURES.putIfAbsent(name, x);
       
   371             throw x;
       
   372         }
       
   373     }
       
   374 
       
   375     private <T,U> void testThrowing(String uri, boolean sameClient,
       
   376                                     Supplier<BodyHandler<T>> handlers,
       
   377                                     Finisher finisher, Thrower thrower,
       
   378                                     boolean async,
       
   379                                     EnumSet<Where> excludes)
       
   380             throws Exception
       
   381     {
       
   382         HttpClient client = null;
       
   383         for (Where where : EnumSet.complementOf(excludes)) {
       
   384 
       
   385             if (!sameClient || client == null)
       
   386                 client = newHttpClient(sameClient);
       
   387             String uri2 = uri + "-" + where;
       
   388             HttpRequest req = HttpRequest.
       
   389                     newBuilder(URI.create(uri2))
       
   390                     .build();
       
   391             BodyHandler<T> handler =
       
   392                     new ThrowingBodyHandler(where.select(thrower), handlers.get());
       
   393             System.out.println("try throwing in " + where);
       
   394             HttpResponse<T> response = null;
       
   395             if (async) {
       
   396                 try {
       
   397                     response = client.sendAsync(req, handler).join();
       
   398                 } catch (Error | Exception x) {
       
   399                     Throwable cause = findCause(x, thrower);
       
   400                     if (cause == null) throw causeNotFound(where, x);
       
   401                     System.out.println(now() + "Got expected exception: " + cause);
       
   402                 }
       
   403             } else {
       
   404                 try {
       
   405                     response = client.send(req, handler);
       
   406                 } catch (Error | Exception t) {
       
   407                     // synchronous send will rethrow exceptions
       
   408                     Throwable throwable = t.getCause();
       
   409                     assert throwable != null;
       
   410 
       
   411                     if (thrower.test(throwable)) {
       
   412                         System.out.println(now() + "Got expected exception: " + throwable);
       
   413                     } else throw causeNotFound(where, t);
       
   414                 }
       
   415             }
       
   416             if (response != null) {
       
   417                 finisher.finish(where, response, thrower);
       
   418             }
       
   419         }
       
   420     }
       
   421 
       
   422     enum Where {
       
   423         BODY_HANDLER, ON_SUBSCRIBE, ON_NEXT, ON_COMPLETE, ON_ERROR, GET_BODY, BODY_CF;
       
   424         public Consumer<Where> select(Consumer<Where> consumer) {
       
   425             return new Consumer<Where>() {
       
   426                 @Override
       
   427                 public void accept(Where where) {
       
   428                     if (Where.this == where) {
       
   429                         consumer.accept(where);
       
   430                     }
       
   431                 }
       
   432             };
       
   433         }
       
   434     }
       
   435 
       
   436     static AssertionError causeNotFound(Where w, Throwable t) {
       
   437         return new AssertionError("Expected exception not found in " + w, t);
       
   438     }
       
   439 
       
   440     interface Thrower extends Consumer<Where>, Predicate<Throwable> {
       
   441 
       
   442     }
       
   443 
       
   444     interface Finisher<T,U> {
       
   445         U finish(Where w, HttpResponse<T> resp, Thrower thrower) throws IOException;
       
   446     }
       
   447 
       
   448     final <T,U> U shouldHaveThrown(Where w, HttpResponse<T> resp, Thrower thrower) {
       
   449         String msg = "Expected exception not thrown in " + w
       
   450                 + "\n\tReceived: " + resp
       
   451                 + "\n\tWith body: " + resp.body();
       
   452         System.out.println(msg);
       
   453         throw new RuntimeException(msg);
       
   454     }
       
   455 
       
   456     final List<String> checkAsLines(Where w, HttpResponse<Stream<String>> resp, Thrower thrower) {
       
   457         switch(w) {
       
   458             case BODY_HANDLER: return shouldHaveThrown(w, resp, thrower);
       
   459             case GET_BODY: return shouldHaveThrown(w, resp, thrower);
       
   460             case BODY_CF: return shouldHaveThrown(w, resp, thrower);
       
   461             default: break;
       
   462         }
       
   463         List<String> result = null;
       
   464         try {
       
   465             result = resp.body().collect(Collectors.toList());
       
   466         } catch (Error | Exception x) {
       
   467             Throwable cause = findCause(x, thrower);
       
   468             if (cause != null) {
       
   469                 out.println(now() + "Got expected exception in " + w + ": " + cause);
       
   470                 return result;
       
   471             }
       
   472             throw causeNotFound(w, x);
       
   473         }
       
   474         return shouldHaveThrown(w, resp, thrower);
       
   475     }
       
   476 
       
   477     final List<String> checkAsInputStream(Where w, HttpResponse<InputStream> resp,
       
   478                                     Thrower thrower)
       
   479             throws IOException
       
   480     {
       
   481         switch(w) {
       
   482             case BODY_HANDLER: return shouldHaveThrown(w, resp, thrower);
       
   483             case GET_BODY: return shouldHaveThrown(w, resp, thrower);
       
   484             case BODY_CF: return shouldHaveThrown(w, resp, thrower);
       
   485             default: break;
       
   486         }
       
   487         List<String> result = null;
       
   488         try (InputStreamReader r1 = new InputStreamReader(resp.body(), UTF_8);
       
   489              BufferedReader r = new BufferedReader(r1)) {
       
   490             try {
       
   491                 result = r.lines().collect(Collectors.toList());
       
   492             } catch (Error | Exception x) {
       
   493                 Throwable cause = findCause(x, thrower);
       
   494                 if (cause != null) {
       
   495                     out.println(now() + "Got expected exception in " + w + ": " + cause);
       
   496                     return result;
       
   497                 }
       
   498                 throw causeNotFound(w, x);
       
   499             }
       
   500         }
       
   501         return shouldHaveThrown(w, resp, thrower);
       
   502     }
       
   503 
       
   504     private static Throwable findCause(Throwable x,
       
   505                                        Predicate<Throwable> filter) {
       
   506         while (x != null && !filter.test(x)) x = x.getCause();
       
   507         return x;
       
   508     }
       
   509 
       
   510     static final class UncheckedCustomExceptionThrower implements Thrower {
       
   511         @Override
       
   512         public void accept(Where where) {
       
   513             out.println(now() + "Throwing in " + where);
       
   514             throw new UncheckedCustomException(where.name());
       
   515         }
       
   516 
       
   517         @Override
       
   518         public boolean test(Throwable throwable) {
       
   519             return UncheckedCustomException.class.isInstance(throwable);
       
   520         }
       
   521 
       
   522         @Override
       
   523         public String toString() {
       
   524             return "UncheckedCustomExceptionThrower";
       
   525         }
       
   526     }
       
   527 
       
   528     static final class UncheckedIOExceptionThrower implements Thrower {
       
   529         @Override
       
   530         public void accept(Where where) {
       
   531             out.println(now() + "Throwing in " + where);
       
   532             throw new UncheckedIOException(new CustomIOException(where.name()));
       
   533         }
       
   534 
       
   535         @Override
       
   536         public boolean test(Throwable throwable) {
       
   537             return UncheckedIOException.class.isInstance(throwable)
       
   538                     && CustomIOException.class.isInstance(throwable.getCause());
       
   539         }
       
   540 
       
   541         @Override
       
   542         public String toString() {
       
   543             return "UncheckedIOExceptionThrower";
       
   544         }
       
   545     }
       
   546 
       
   547     static final class UncheckedCustomException extends RuntimeException {
       
   548         UncheckedCustomException(String message) {
       
   549             super(message);
       
   550         }
       
   551         UncheckedCustomException(String message, Throwable cause) {
       
   552             super(message, cause);
       
   553         }
       
   554     }
       
   555 
       
   556     static final class CustomIOException extends IOException {
       
   557         CustomIOException(String message) {
       
   558             super(message);
       
   559         }
       
   560         CustomIOException(String message, Throwable cause) {
       
   561             super(message, cause);
       
   562         }
       
   563     }
       
   564 
       
   565     static final class ThrowingBodyHandler<T> implements BodyHandler<T> {
       
   566         final Consumer<Where> throwing;
       
   567         final BodyHandler<T> bodyHandler;
       
   568         ThrowingBodyHandler(Consumer<Where> throwing, BodyHandler<T> bodyHandler) {
       
   569             this.throwing = throwing;
       
   570             this.bodyHandler = bodyHandler;
       
   571         }
       
   572         @Override
       
   573         public BodySubscriber<T> apply(HttpResponse.ResponseInfo rinfo) {
       
   574             throwing.accept(Where.BODY_HANDLER);
       
   575             BodySubscriber<T> subscriber = bodyHandler.apply(rinfo);
       
   576             return new ThrowingBodySubscriber(throwing, subscriber);
       
   577         }
       
   578     }
       
   579 
       
   580     static final class ThrowingBodySubscriber<T> implements BodySubscriber<T> {
       
   581         private final BodySubscriber<T> subscriber;
       
   582         volatile boolean onSubscribeCalled;
       
   583         final Consumer<Where> throwing;
       
   584         ThrowingBodySubscriber(Consumer<Where> throwing, BodySubscriber<T> subscriber) {
       
   585             this.throwing = throwing;
       
   586             this.subscriber = subscriber;
       
   587         }
       
   588 
       
   589         @Override
       
   590         public void onSubscribe(Flow.Subscription subscription) {
       
   591             //out.println("onSubscribe ");
       
   592             onSubscribeCalled = true;
       
   593             throwing.accept(Where.ON_SUBSCRIBE);
       
   594             subscriber.onSubscribe(subscription);
       
   595         }
       
   596 
       
   597         @Override
       
   598         public void onNext(List<ByteBuffer> item) {
       
   599            // out.println("onNext " + item);
       
   600             assertTrue(onSubscribeCalled);
       
   601             throwing.accept(Where.ON_NEXT);
       
   602             subscriber.onNext(item);
       
   603         }
       
   604 
       
   605         @Override
       
   606         public void onError(Throwable throwable) {
       
   607             //out.println("onError");
       
   608             assertTrue(onSubscribeCalled);
       
   609             throwing.accept(Where.ON_ERROR);
       
   610             subscriber.onError(throwable);
       
   611         }
       
   612 
       
   613         @Override
       
   614         public void onComplete() {
       
   615             //out.println("onComplete");
       
   616             assertTrue(onSubscribeCalled, "onComplete called before onSubscribe");
       
   617             throwing.accept(Where.ON_COMPLETE);
       
   618             subscriber.onComplete();
       
   619         }
       
   620 
       
   621         @Override
       
   622         public CompletionStage<T> getBody() {
       
   623             throwing.accept(Where.GET_BODY);
       
   624             try {
       
   625                 throwing.accept(Where.BODY_CF);
       
   626             } catch (Throwable t) {
       
   627                 return CompletableFuture.failedFuture(t);
       
   628             }
       
   629             return subscriber.getBody();
       
   630         }
       
   631     }
       
   632 
       
   633 
       
   634     @BeforeTest
       
   635     public void setup() throws Exception {
       
   636         sslContext = new SimpleSSLContext().get();
       
   637         if (sslContext == null)
       
   638             throw new AssertionError("Unexpected null sslContext");
       
   639 
       
   640         // HTTP/1.1
       
   641         HttpTestHandler h1_fixedLengthHandler = new HTTP_FixedLengthHandler();
       
   642         HttpTestHandler h1_chunkHandler = new HTTP_ChunkedHandler();
       
   643         InetSocketAddress sa = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
       
   644         httpTestServer = HttpTestServer.of(HttpServer.create(sa, 0));
       
   645         httpTestServer.addHandler(h1_fixedLengthHandler, "/http1/fixed");
       
   646         httpTestServer.addHandler(h1_chunkHandler, "/http1/chunk");
       
   647         httpURI_fixed = "http://" + httpTestServer.serverAuthority() + "/http1/fixed/x";
       
   648         httpURI_chunk = "http://" + httpTestServer.serverAuthority() + "/http1/chunk/x";
       
   649 
       
   650         HttpsServer httpsServer = HttpsServer.create(sa, 0);
       
   651         httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
       
   652         httpsTestServer = HttpTestServer.of(httpsServer);
       
   653         httpsTestServer.addHandler(h1_fixedLengthHandler, "/https1/fixed");
       
   654         httpsTestServer.addHandler(h1_chunkHandler, "/https1/chunk");
       
   655         httpsURI_fixed = "https://" + httpsTestServer.serverAuthority() + "/https1/fixed/x";
       
   656         httpsURI_chunk = "https://" + httpsTestServer.serverAuthority() + "/https1/chunk/x";
       
   657 
       
   658         // HTTP/2
       
   659         HttpTestHandler h2_fixedLengthHandler = new HTTP_FixedLengthHandler();
       
   660         HttpTestHandler h2_chunkedHandler = new HTTP_ChunkedHandler();
       
   661 
       
   662         http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
       
   663         http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed");
       
   664         http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk");
       
   665         http2URI_fixed = "http://" + http2TestServer.serverAuthority() + "/http2/fixed/x";
       
   666         http2URI_chunk = "http://" + http2TestServer.serverAuthority() + "/http2/chunk/x";
       
   667 
       
   668         https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext));
       
   669         https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed");
       
   670         https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk");
       
   671         https2URI_fixed = "https://" + https2TestServer.serverAuthority() + "/https2/fixed/x";
       
   672         https2URI_chunk = "https://" + https2TestServer.serverAuthority() + "/https2/chunk/x";
       
   673 
       
   674         serverCount.addAndGet(4);
       
   675         httpTestServer.start();
       
   676         httpsTestServer.start();
       
   677         http2TestServer.start();
       
   678         https2TestServer.start();
       
   679     }
       
   680 
       
   681     @AfterTest
       
   682     public void teardown() throws Exception {
       
   683         String sharedClientName =
       
   684                 sharedClient == null ? null : sharedClient.toString();
       
   685         sharedClient = null;
       
   686         Thread.sleep(100);
       
   687         AssertionError fail = TRACKER.check(500);
       
   688         try {
       
   689             httpTestServer.stop();
       
   690             httpsTestServer.stop();
       
   691             http2TestServer.stop();
       
   692             https2TestServer.stop();
       
   693         } finally {
       
   694             if (fail != null) {
       
   695                 if (sharedClientName != null) {
       
   696                     System.err.println("Shared client name is: " + sharedClientName);
       
   697                 }
       
   698                 throw fail;
       
   699             }
       
   700         }
       
   701     }
       
   702 
       
   703     static class HTTP_FixedLengthHandler implements HttpTestHandler {
       
   704         @Override
       
   705         public void handle(HttpTestExchange t) throws IOException {
       
   706             out.println("HTTP_FixedLengthHandler received request to " + t.getRequestURI());
       
   707             try (InputStream is = t.getRequestBody()) {
       
   708                 is.readAllBytes();
       
   709             }
       
   710             byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8);
       
   711             t.sendResponseHeaders(200, resp.length);  //fixed content length
       
   712             try (OutputStream os = t.getResponseBody()) {
       
   713                 os.write(resp);
       
   714             }
       
   715         }
       
   716     }
       
   717 
       
   718     static class HTTP_ChunkedHandler implements HttpTestHandler {
       
   719         @Override
       
   720         public void handle(HttpTestExchange t) throws IOException {
       
   721             out.println("HTTP_ChunkedHandler received request to " + t.getRequestURI());
       
   722             byte[] resp = t.getRequestURI().toString().getBytes(StandardCharsets.UTF_8);
       
   723             try (InputStream is = t.getRequestBody()) {
       
   724                 is.readAllBytes();
       
   725             }
       
   726             t.sendResponseHeaders(200, -1); // chunked/variable
       
   727             try (OutputStream os = t.getResponseBody()) {
       
   728                 os.write(resp);
       
   729             }
       
   730         }
       
   731     }
       
   732 
       
   733 }