test/jdk/java/net/httpclient/whitebox/java.net.http/java/net/http/FlowTest.java
branchhttp-client-branch
changeset 56089 42208b2f224e
parent 55973 4d9b002587db
equal deleted inserted replaced
56088:38fac6d0521d 56089:42208b2f224e
       
     1 /*
       
     2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 package java.net.http;
       
    25 
       
    26 import java.io.BufferedOutputStream;
       
    27 import java.io.File;
       
    28 import java.io.FileInputStream;
       
    29 import java.io.IOException;
       
    30 import java.io.InputStream;
       
    31 import java.io.OutputStream;
       
    32 import java.net.Socket;
       
    33 import java.nio.ByteBuffer;
       
    34 import java.security.KeyManagementException;
       
    35 import java.security.KeyStore;
       
    36 import java.security.KeyStoreException;
       
    37 import java.security.NoSuchAlgorithmException;
       
    38 import java.security.UnrecoverableKeyException;
       
    39 import java.security.cert.CertificateException;
       
    40 import java.util.List;
       
    41 import java.util.Random;
       
    42 import java.util.StringTokenizer;
       
    43 import java.util.concurrent.BlockingQueue;
       
    44 import java.util.concurrent.CompletableFuture;
       
    45 import java.util.concurrent.CountDownLatch;
       
    46 import java.util.concurrent.ExecutorService;
       
    47 import java.util.concurrent.Executors;
       
    48 import java.util.concurrent.Flow;
       
    49 import java.util.concurrent.Flow.Subscriber;
       
    50 import java.util.concurrent.LinkedBlockingQueue;
       
    51 import java.util.concurrent.SubmissionPublisher;
       
    52 import java.util.concurrent.atomic.AtomicInteger;
       
    53 import java.util.concurrent.atomic.AtomicLong;
       
    54 import javax.net.ssl.KeyManagerFactory;
       
    55 import javax.net.ssl.*;
       
    56 import javax.net.ssl.TrustManagerFactory;
       
    57 import java.net.http.internal.common.Utils;
       
    58 import org.testng.annotations.Test;
       
    59 import java.net.http.internal.common.SSLFlowDelegate;
       
    60 
       
    61 @Test
       
    62 public class FlowTest extends AbstractRandomTest {
       
    63 
       
    64     private final SubmissionPublisher<List<ByteBuffer>> srcPublisher;
       
    65     private final ExecutorService executor;
       
    66     private static final long COUNTER = 3000;
       
    67     private static final int LONGS_PER_BUF = 800;
       
    68     static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF;
       
    69     public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0);
       
    70     static volatile String alpn;
       
    71 
       
    72     // This is a hack to work around an issue with SubmissionPublisher.
       
    73     // SubmissionPublisher will call onComplete immediately without forwarding
       
    74     // remaining pending data if SubmissionPublisher.close() is called when
       
    75     // there is no demand. In other words, it doesn't wait for the subscriber
       
    76     // to pull all the data before calling onComplete.
       
    77     // We use a CountDownLatch to figure out when it is safe to call close().
       
    78     // This may cause the test to hang if data are buffered.
       
    79     final CountDownLatch allBytesReceived = new CountDownLatch(1);
       
    80 
       
    81     private final CompletableFuture<Void> completion;
       
    82 
       
    83     public FlowTest() throws IOException {
       
    84         executor = Executors.newCachedThreadPool();
       
    85         srcPublisher = new SubmissionPublisher<>(executor, 20,
       
    86                                                  this::handlePublisherException);
       
    87         SSLContext ctx = (new SimpleSSLContext()).get();
       
    88         SSLEngine engineClient = ctx.createSSLEngine();
       
    89         SSLParameters params = ctx.getSupportedSSLParameters();
       
    90         params.setApplicationProtocols(new String[]{"proto1", "proto2"}); // server will choose proto2
       
    91         params.setProtocols(new String[]{"TLSv1.2"}); // TODO: This is essential. Needs to be protocol impl
       
    92         engineClient.setSSLParameters(params);
       
    93         engineClient.setUseClientMode(true);
       
    94         completion = new CompletableFuture<>();
       
    95         SSLLoopbackSubscriber looper = new SSLLoopbackSubscriber(ctx, executor, allBytesReceived);
       
    96         looper.start();
       
    97         EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion, allBytesReceived);
       
    98         SSLFlowDelegate sslClient = new SSLFlowDelegate(engineClient, executor, end, looper);
       
    99         // going to measure how long handshake takes
       
   100         final long start = System.currentTimeMillis();
       
   101         sslClient.alpn().whenComplete((String s, Throwable t) -> {
       
   102             if (t != null)
       
   103                 t.printStackTrace();
       
   104             long endTime = System.currentTimeMillis();
       
   105             alpn = s;
       
   106             System.out.println("ALPN: " + alpn);
       
   107             long period = (endTime - start);
       
   108             System.out.printf("Handshake took %d ms\n", period);
       
   109         });
       
   110         Subscriber<List<ByteBuffer>> reader = sslClient.upstreamReader();
       
   111         Subscriber<List<ByteBuffer>> writer = sslClient.upstreamWriter();
       
   112         looper.setReturnSubscriber(reader);
       
   113         // now connect all the pieces
       
   114         srcPublisher.subscribe(writer);
       
   115         String aa = sslClient.alpn().join();
       
   116         System.out.println("AAALPN = " + aa);
       
   117     }
       
   118 
       
   119     private void handlePublisherException(Object o, Throwable t) {
       
   120         System.out.println("Src Publisher exception");
       
   121         t.printStackTrace(System.out);
       
   122     }
       
   123 
       
   124     private static ByteBuffer getBuffer(long startingAt) {
       
   125         ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8);
       
   126         for (int j = 0; j < LONGS_PER_BUF; j++) {
       
   127             buf.putLong(startingAt++);
       
   128         }
       
   129         buf.flip();
       
   130         return buf;
       
   131     }
       
   132 
       
   133     @Test
       
   134     public void run() {
       
   135         long count = 0;
       
   136         System.out.printf("Submitting %d buffer arrays\n", COUNTER);
       
   137         System.out.printf("LoopCount should be %d\n", TOTAL_LONGS);
       
   138         for (long i = 0; i < COUNTER; i++) {
       
   139             ByteBuffer b = getBuffer(count);
       
   140             count += LONGS_PER_BUF;
       
   141             srcPublisher.submit(List.of(b));
       
   142         }
       
   143         System.out.println("Finished submission. Waiting for loopback");
       
   144         // make sure we don't wait for allBytesReceived in case of error.
       
   145         completion.whenComplete((r,t) -> allBytesReceived.countDown());
       
   146         try {
       
   147             allBytesReceived.await();
       
   148         } catch (InterruptedException e) {
       
   149             throw new RuntimeException(e);
       
   150         }
       
   151         System.out.println("All bytes received: ");
       
   152         srcPublisher.close();
       
   153         try {
       
   154             completion.join();
       
   155             if (!alpn.equals("proto2")) {
       
   156                 throw new RuntimeException("wrong alpn received");
       
   157             }
       
   158             System.out.println("OK");
       
   159         } finally {
       
   160             executor.shutdownNow();
       
   161         }
       
   162     }
       
   163 
       
   164 /*
       
   165     public static void main(String[]args) throws Exception {
       
   166         FlowTest test = new FlowTest();
       
   167         test.run();
       
   168     }
       
   169 */
       
   170 
       
   171     /**
       
   172      * This Subscriber simulates an SSL loopback network. The object itself
       
   173      * accepts outgoing SSL encrypted data which is looped back via two sockets
       
   174      * (one of which is an SSLSocket emulating a server). The method
       
   175      * {@link #setReturnSubscriber(java.util.concurrent.Flow.Subscriber) }
       
   176      * is used to provide the Subscriber which feeds the incoming side
       
   177      * of SSLFlowDelegate. Three threads are used to implement this behavior
       
   178      * and a SubmissionPublisher drives the incoming read side.
       
   179      * <p>
       
   180      * A thread reads from the buffer, writes
       
   181      * to the client j.n.Socket which is connected to a SSLSocket operating
       
   182      * in server mode. A second thread loops back data read from the SSLSocket back to the
       
   183      * client again. A third thread reads the client socket and pushes the data to
       
   184      * a SubmissionPublisher that drives the reader side of the SSLFlowDelegate
       
   185      */
       
   186     static class SSLLoopbackSubscriber implements Subscriber<List<ByteBuffer>> {
       
   187         private final BlockingQueue<ByteBuffer> buffer;
       
   188         private final Socket clientSock;
       
   189         private final SSLSocket serverSock;
       
   190         private final Thread thread1, thread2, thread3;
       
   191         private volatile Flow.Subscription clientSubscription;
       
   192         private final SubmissionPublisher<List<ByteBuffer>> publisher;
       
   193         private final CountDownLatch allBytesReceived;
       
   194 
       
   195         SSLLoopbackSubscriber(SSLContext ctx,
       
   196                               ExecutorService exec,
       
   197                               CountDownLatch allBytesReceived) throws IOException {
       
   198             SSLServerSocketFactory fac = ctx.getServerSocketFactory();
       
   199             SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0);
       
   200             SSLParameters params = serv.getSSLParameters();
       
   201             params.setApplicationProtocols(new String[]{"proto2"});
       
   202             serv.setSSLParameters(params);
       
   203 
       
   204 
       
   205             int serverPort = serv.getLocalPort();
       
   206             clientSock = new Socket("127.0.0.1", serverPort);
       
   207             serverSock = (SSLSocket) serv.accept();
       
   208             this.buffer = new LinkedBlockingQueue<>();
       
   209             this.allBytesReceived = allBytesReceived;
       
   210             thread1 = new Thread(this::clientWriter, "clientWriter");
       
   211             thread2 = new Thread(this::serverLoopback, "serverLoopback");
       
   212             thread3 = new Thread(this::clientReader, "clientReader");
       
   213             publisher = new SubmissionPublisher<>(exec, Flow.defaultBufferSize(),
       
   214                     this::handlePublisherException);
       
   215             SSLFlowDelegate.Monitor.add(this::monitor);
       
   216         }
       
   217 
       
   218         public void start() {
       
   219             thread1.start();
       
   220             thread2.start();
       
   221             thread3.start();
       
   222         }
       
   223 
       
   224         private void handlePublisherException(Object o, Throwable t) {
       
   225             System.out.println("Loopback Publisher exception");
       
   226             t.printStackTrace(System.out);
       
   227         }
       
   228 
       
   229         private final AtomicInteger readCount = new AtomicInteger();
       
   230 
       
   231         // reads off the SSLSocket the data from the "server"
       
   232         private void clientReader() {
       
   233             try {
       
   234                 InputStream is = clientSock.getInputStream();
       
   235                 final int bufsize = FlowTest.randomRange(512, 16 * 1024);
       
   236                 System.out.println("clientReader: bufsize = " + bufsize);
       
   237                 while (true) {
       
   238                     byte[] buf = new byte[bufsize];
       
   239                     int n = is.read(buf);
       
   240                     if (n == -1) {
       
   241                         System.out.println("clientReader close: read "
       
   242                                 + readCount.get() + " bytes");
       
   243                         System.out.println("clientReader: got EOF. "
       
   244                                             + "Waiting signal to close publisher.");
       
   245                         allBytesReceived.await();
       
   246                         System.out.println("clientReader: closing publisher");
       
   247                         publisher.close();
       
   248                         sleep(2000);
       
   249                         Utils.close(is, clientSock);
       
   250                         return;
       
   251                     }
       
   252                     ByteBuffer bb = ByteBuffer.wrap(buf, 0, n);
       
   253                     readCount.addAndGet(n);
       
   254                     publisher.submit(List.of(bb));
       
   255                 }
       
   256             } catch (Throwable e) {
       
   257                 e.printStackTrace();
       
   258                 Utils.close(clientSock);
       
   259             }
       
   260         }
       
   261 
       
   262         // writes the encrypted data from SSLFLowDelegate to the j.n.Socket
       
   263         // which is connected to the SSLSocket emulating a server.
       
   264         private void clientWriter() {
       
   265             long nbytes = 0;
       
   266             try {
       
   267                 OutputStream os =
       
   268                         new BufferedOutputStream(clientSock.getOutputStream());
       
   269 
       
   270                 while (true) {
       
   271                     ByteBuffer buf = buffer.take();
       
   272                     if (buf == FlowTest.SENTINEL) {
       
   273                         // finished
       
   274                         //Utils.sleep(2000);
       
   275                         System.out.println("clientWriter close: " + nbytes + " written");
       
   276                         clientSock.shutdownOutput();
       
   277                         System.out.println("clientWriter close return");
       
   278                         return;
       
   279                     }
       
   280                     int len = buf.remaining();
       
   281                     int written = writeToStream(os, buf);
       
   282                     assert len == written;
       
   283                     nbytes += len;
       
   284                     assert !buf.hasRemaining()
       
   285                             : "buffer has " + buf.remaining() + " bytes left";
       
   286                     clientSubscription.request(1);
       
   287                 }
       
   288             } catch (Throwable e) {
       
   289                 e.printStackTrace();
       
   290             }
       
   291         }
       
   292 
       
   293         private int writeToStream(OutputStream os, ByteBuffer buf) throws IOException {
       
   294             byte[] b = buf.array();
       
   295             int offset = buf.arrayOffset() + buf.position();
       
   296             int n = buf.limit() - buf.position();
       
   297             os.write(b, offset, n);
       
   298             buf.position(buf.limit());
       
   299             os.flush();
       
   300             return n;
       
   301         }
       
   302 
       
   303         private final AtomicInteger loopCount = new AtomicInteger();
       
   304 
       
   305         public String monitor() {
       
   306             return "serverLoopback: loopcount = " + loopCount.toString()
       
   307                     + " clientRead: count = " + readCount.toString();
       
   308         }
       
   309 
       
   310         // thread2
       
   311         private void serverLoopback() {
       
   312             try {
       
   313                 InputStream is = serverSock.getInputStream();
       
   314                 OutputStream os = serverSock.getOutputStream();
       
   315                 final int bufsize = FlowTest.randomRange(512, 16 * 1024);
       
   316                 System.out.println("serverLoopback: bufsize = " + bufsize);
       
   317                 byte[] bb = new byte[bufsize];
       
   318                 while (true) {
       
   319                     int n = is.read(bb);
       
   320                     if (n == -1) {
       
   321                         sleep(2000);
       
   322                         is.close();
       
   323                         serverSock.close();
       
   324                         return;
       
   325                     }
       
   326                     os.write(bb, 0, n);
       
   327                     os.flush();
       
   328                     loopCount.addAndGet(n);
       
   329                 }
       
   330             } catch (Throwable e) {
       
   331                 e.printStackTrace();
       
   332             }
       
   333         }
       
   334 
       
   335 
       
   336         /**
       
   337          * This needs to be called before the chain is subscribed. It can't be
       
   338          * supplied in the constructor.
       
   339          */
       
   340         public void setReturnSubscriber(Subscriber<List<ByteBuffer>> returnSubscriber) {
       
   341             publisher.subscribe(returnSubscriber);
       
   342         }
       
   343 
       
   344         @Override
       
   345         public void onSubscribe(Flow.Subscription subscription) {
       
   346             clientSubscription = subscription;
       
   347             clientSubscription.request(5);
       
   348         }
       
   349 
       
   350         @Override
       
   351         public void onNext(List<ByteBuffer> item) {
       
   352             try {
       
   353                 for (ByteBuffer b : item)
       
   354                     buffer.put(b);
       
   355             } catch (InterruptedException e) {
       
   356                 e.printStackTrace();
       
   357                 Utils.close(clientSock);
       
   358             }
       
   359         }
       
   360 
       
   361         @Override
       
   362         public void onError(Throwable throwable) {
       
   363             throwable.printStackTrace();
       
   364             Utils.close(clientSock);
       
   365         }
       
   366 
       
   367         @Override
       
   368         public void onComplete() {
       
   369             try {
       
   370                 buffer.put(FlowTest.SENTINEL);
       
   371             } catch (InterruptedException e) {
       
   372                 e.printStackTrace();
       
   373                 Utils.close(clientSock);
       
   374             }
       
   375         }
       
   376     }
       
   377 
       
   378     /**
       
   379      * The final subscriber which receives the decrypted looped-back data.
       
   380      * Just needs to compare the data with what was sent. The given CF is
       
   381      * either completed exceptionally with an error or normally on success.
       
   382      */
       
   383     static class EndSubscriber implements Subscriber<List<ByteBuffer>> {
       
   384 
       
   385         private final long nbytes;
       
   386 
       
   387         private final AtomicLong counter;
       
   388         private volatile Flow.Subscription subscription;
       
   389         private final CompletableFuture<Void> completion;
       
   390         private final CountDownLatch allBytesReceived;
       
   391 
       
   392         EndSubscriber(long nbytes,
       
   393                       CompletableFuture<Void> completion,
       
   394                       CountDownLatch allBytesReceived) {
       
   395             counter = new AtomicLong(0);
       
   396             this.nbytes = nbytes;
       
   397             this.completion = completion;
       
   398             this.allBytesReceived = allBytesReceived;
       
   399         }
       
   400 
       
   401         @Override
       
   402         public void onSubscribe(Flow.Subscription subscription) {
       
   403             this.subscription = subscription;
       
   404             subscription.request(5);
       
   405         }
       
   406 
       
   407         public static String info(List<ByteBuffer> i) {
       
   408             StringBuilder sb = new StringBuilder();
       
   409             sb.append("size: ").append(Integer.toString(i.size()));
       
   410             int x = 0;
       
   411             for (ByteBuffer b : i)
       
   412                 x += b.remaining();
       
   413             sb.append(" bytes: " + Integer.toString(x));
       
   414             return sb.toString();
       
   415         }
       
   416 
       
   417         @Override
       
   418         public void onNext(List<ByteBuffer> buffers) {
       
   419             long currval = counter.get();
       
   420             //if (currval % 500 == 0) {
       
   421             //System.out.println("End: " + currval);
       
   422             //}
       
   423 
       
   424             for (ByteBuffer buf : buffers) {
       
   425                 while (buf.hasRemaining()) {
       
   426                     long n = buf.getLong();
       
   427                     //if (currval > (FlowTest.TOTAL_LONGS - 50)) {
       
   428                     //System.out.println("End: " + currval);
       
   429                     //}
       
   430                     if (n != currval++) {
       
   431                         System.out.println("ERROR at " + n + " != " + (currval - 1));
       
   432                         completion.completeExceptionally(new RuntimeException("ERROR"));
       
   433                         subscription.cancel();
       
   434                         return;
       
   435                     }
       
   436                 }
       
   437             }
       
   438 
       
   439             counter.set(currval);
       
   440             subscription.request(1);
       
   441             if (currval >= TOTAL_LONGS) {
       
   442                 allBytesReceived.countDown();
       
   443             }
       
   444         }
       
   445 
       
   446         @Override
       
   447         public void onError(Throwable throwable) {
       
   448             allBytesReceived.countDown();
       
   449             completion.completeExceptionally(throwable);
       
   450         }
       
   451 
       
   452         @Override
       
   453         public void onComplete() {
       
   454             long n = counter.get();
       
   455             if (n != nbytes) {
       
   456                 System.out.printf("nbytes=%d n=%d\n", nbytes, n);
       
   457                 completion.completeExceptionally(new RuntimeException("ERROR AT END"));
       
   458             } else {
       
   459                 System.out.println("DONE OK: counter = " + n);
       
   460                 allBytesReceived.countDown();
       
   461                 completion.complete(null);
       
   462             }
       
   463         }
       
   464     }
       
   465 
       
   466     /**
       
   467      * Creates a simple usable SSLContext for SSLSocketFactory
       
   468      * or a HttpsServer using either a given keystore or a default
       
   469      * one in the test tree.
       
   470      * <p>
       
   471      * Using this class with a security manager requires the following
       
   472      * permissions to be granted:
       
   473      * <p>
       
   474      * permission "java.util.PropertyPermission" "test.src.path", "read";
       
   475      * permission java.io.FilePermission
       
   476      * "${test.src}/../../../../lib/testlibrary/jdk/testlibrary/testkeys", "read";
       
   477      * The exact path above depends on the location of the test.
       
   478      */
       
   479     static class SimpleSSLContext {
       
   480 
       
   481         private final SSLContext ssl;
       
   482 
       
   483         /**
       
   484          * Loads default keystore from SimpleSSLContext source directory
       
   485          */
       
   486         public SimpleSSLContext() throws IOException {
       
   487             String paths = System.getProperty("test.src.path");
       
   488             StringTokenizer st = new StringTokenizer(paths, File.pathSeparator);
       
   489             boolean securityExceptions = false;
       
   490             SSLContext sslContext = null;
       
   491             while (st.hasMoreTokens()) {
       
   492                 String path = st.nextToken();
       
   493                 try {
       
   494                     File f = new File(path, "../../../../lib/testlibrary/jdk/testlibrary/testkeys");
       
   495                     if (f.exists()) {
       
   496                         try (FileInputStream fis = new FileInputStream(f)) {
       
   497                             sslContext = init(fis);
       
   498                             break;
       
   499                         }
       
   500                     }
       
   501                 } catch (SecurityException e) {
       
   502                     // catch and ignore because permission only required
       
   503                     // for one entry on path (at most)
       
   504                     securityExceptions = true;
       
   505                 }
       
   506             }
       
   507             if (securityExceptions) {
       
   508                 System.out.println("SecurityExceptions thrown on loading testkeys");
       
   509             }
       
   510             ssl = sslContext;
       
   511         }
       
   512 
       
   513         private SSLContext init(InputStream i) throws IOException {
       
   514             try {
       
   515                 char[] passphrase = "passphrase".toCharArray();
       
   516                 KeyStore ks = KeyStore.getInstance("JKS");
       
   517                 ks.load(i, passphrase);
       
   518 
       
   519                 KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
       
   520                 kmf.init(ks, passphrase);
       
   521 
       
   522                 TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
       
   523                 tmf.init(ks);
       
   524 
       
   525                 SSLContext ssl = SSLContext.getInstance("TLS");
       
   526                 ssl.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
       
   527                 return ssl;
       
   528             } catch (KeyManagementException | KeyStoreException |
       
   529                     UnrecoverableKeyException | CertificateException |
       
   530                     NoSuchAlgorithmException e) {
       
   531                 throw new RuntimeException(e.getMessage());
       
   532             }
       
   533         }
       
   534 
       
   535         public SSLContext get() {
       
   536             return ssl;
       
   537         }
       
   538     }
       
   539 
       
   540     private static void sleep(int millis) {
       
   541         try {
       
   542             Thread.sleep(millis);
       
   543         } catch (Exception e) {
       
   544             e.printStackTrace();
       
   545         }
       
   546     }
       
   547 }