test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java
branchhttp-client-branch
changeset 55909 583695a0ed6a
parent 55763 634d8e14c172
child 55942 8d4770c22b63
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java	Tue Nov 28 18:03:56 2017 +0300
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java	Wed Nov 29 11:15:19 2017 +0000
@@ -25,19 +25,27 @@
 
 import jdk.incubator.http.internal.common.Demand;
 import jdk.incubator.http.internal.common.FlowTube;
+import jdk.incubator.http.internal.common.SSLFlowDelegate;
 import jdk.incubator.http.internal.common.SSLTube;
 import jdk.incubator.http.internal.common.SequentialScheduler;
+import jdk.incubator.http.internal.common.Utils;
 import org.testng.annotations.Test;
 
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocket;
 import javax.net.ssl.TrustManagerFactory;
+import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.security.KeyManagementException;
 import java.security.KeyStore;
@@ -48,6 +56,7 @@
 import java.util.List;
 import java.util.Queue;
 import java.util.StringTokenizer;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
@@ -55,8 +64,10 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Flow;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SubmissionPublisher;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 @Test
@@ -80,9 +91,13 @@
         /* Start of wiring */
         ExecutorService sslExecutor = Executors.newCachedThreadPool();
         /* Emulates an echo server */
-        FlowTube server = new SSLTube(createSSLEngine(false),
-                                      sslExecutor,
-                                      new EchoTube(16));
+//        FlowTube server = new SSLTube(createSSLEngine(false),
+//                                      sslExecutor,
+//                                      new EchoTube(16));
+        SSLLoopbackSubscriber server =
+                new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor);
+        server.start();
+
         FlowTube client = new SSLTube(createSSLEngine(true),
                                       sslExecutor,
                                       server);
@@ -113,63 +128,188 @@
         }
     }
 
-    private static final class EchoTube implements FlowTube {
-
-        private final static Object EOF = new Object();
-        private final Executor executor = Executors.newSingleThreadExecutor();
+    static class SSLLoopbackSubscriber implements FlowTube {
+        private final BlockingQueue<ByteBuffer> buffer;
+        private final Socket clientSock;
+        private final SSLSocket serverSock;
+        private final Thread thread1, thread2, thread3;
+        private volatile Flow.Subscription clientSubscription;
+        private final SubmissionPublisher<List<ByteBuffer>> publisher;
 
-        private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
-        private final int maxQueueSize;
-        private final SequentialScheduler processingScheduler =
-                new SequentialScheduler(createProcessingTask());
+        SSLLoopbackSubscriber(SSLContext ctx, ExecutorService exec) throws IOException {
+            SSLServerSocketFactory fac = ctx.getServerSocketFactory();
+            SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0);
+            SSLParameters params = serv.getSSLParameters();
+            params.setApplicationProtocols(new String[]{"proto2"});
+            serv.setSSLParameters(params);
+
+
+            int serverPort = serv.getLocalPort();
+            clientSock = new Socket("127.0.0.1", serverPort);
+            serverSock = (SSLSocket) serv.accept();
+            this.buffer = new LinkedBlockingQueue<>();
+            thread1 = new Thread(this::clientWriter, "clientWriter");
+            thread2 = new Thread(this::serverLoopback, "serverLoopback");
+            thread3 = new Thread(this::clientReader, "clientReader");
+            publisher = new SubmissionPublisher<>(exec, Flow.defaultBufferSize(),
+                    this::handlePublisherException);
+            SSLFlowDelegate.Monitor.add(this::monitor);
+        }
 
-        /* Writing into this tube */
-        private long unfulfilled;
-        private Flow.Subscription subscription;
+        public void start() {
+            thread1.start();
+            thread2.start();
+            thread3.start();
+        }
+
+        private void handlePublisherException(Object o, Throwable t) {
+            System.out.println("Loopback Publisher exception");
+            t.printStackTrace(System.out);
+        }
+
+        private final AtomicInteger readCount = new AtomicInteger();
 
-        /* Reading from this tube */
-        private final Demand demand = new Demand();
-        private final AtomicBoolean cancelled = new AtomicBoolean();
-        private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
-
-        private EchoTube(int maxBufferSize) {
-            if (maxBufferSize < 1)
-                throw new IllegalArgumentException();
-            this.maxQueueSize = maxBufferSize;
+        // reads off the SSLSocket the data from the "server"
+        private void clientReader() {
+            try {
+                InputStream is = clientSock.getInputStream();
+                final int bufsize = FlowTest.randomRange(512, 16 * 1024);
+                System.out.println("clientReader: bufsize = " + bufsize);
+                while (true) {
+                    byte[] buf = new byte[bufsize];
+                    int n = is.read(buf);
+                    if (n == -1) {
+                        System.out.println("clientReader close: read "
+                                + readCount.get() + " bytes");
+                        publisher.close();
+                        sleep(2000);
+                        Utils.close(is, clientSock);
+                        return;
+                    }
+                    ByteBuffer bb = ByteBuffer.wrap(buf, 0, n);
+                    readCount.addAndGet(n);
+                    publisher.submit(List.of(bb));
+                }
+            } catch (Throwable e) {
+                e.printStackTrace();
+                Utils.close(clientSock);
+            }
         }
 
-        @Override
-        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
-            this.subscriber = subscriber;
-            this.subscriber.onSubscribe(new InternalSubscription());
+        // writes the encrypted data from SSLFLowDelegate to the j.n.Socket
+        // which is connected to the SSLSocket emulating a server.
+        private void clientWriter() {
+            long nbytes = 0;
+            try {
+                OutputStream os =
+                        new BufferedOutputStream(clientSock.getOutputStream());
+
+                while (true) {
+                    ByteBuffer buf = buffer.take();
+                    if (buf == FlowTest.SENTINEL) {
+                        // finished
+                        //Utils.sleep(2000);
+                        System.out.println("clientWriter close: " + nbytes + " written");
+                        clientSock.shutdownOutput();
+                        System.out.println("clientWriter close return");
+                        return;
+                    }
+                    int len = buf.remaining();
+                    int written = writeToStream(os, buf);
+                    assert len == written;
+                    nbytes += len;
+                    assert !buf.hasRemaining()
+                            : "buffer has " + buf.remaining() + " bytes left";
+                    clientSubscription.request(1);
+                }
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+
+        private int writeToStream(OutputStream os, ByteBuffer buf) throws IOException {
+            byte[] b = buf.array();
+            int offset = buf.arrayOffset() + buf.position();
+            int n = buf.limit() - buf.position();
+            os.write(b, offset, n);
+            buf.position(buf.limit());
+            os.flush();
+            return n;
+        }
+
+        private final AtomicInteger loopCount = new AtomicInteger();
+
+        public String monitor() {
+            return "serverLoopback: loopcount = " + loopCount.toString()
+                    + " clientRead: count = " + readCount.toString();
+        }
+
+        // thread2
+        private void serverLoopback() {
+            try {
+                InputStream is = serverSock.getInputStream();
+                OutputStream os = serverSock.getOutputStream();
+                final int bufsize = FlowTest.randomRange(512, 16 * 1024);
+                System.out.println("serverLoopback: bufsize = " + bufsize);
+                byte[] bb = new byte[bufsize];
+                while (true) {
+                    int n = is.read(bb);
+                    if (n == -1) {
+                        sleep(2000);
+                        is.close();
+                        os.close();
+                        serverSock.close();
+                        return;
+                    }
+                    os.write(bb, 0, n);
+                    os.flush();
+                    loopCount.addAndGet(n);
+                }
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+
+
+        /**
+         * This needs to be called before the chain is subscribed. It can't be
+         * supplied in the constructor.
+         */
+        public void setReturnSubscriber(Flow.Subscriber<List<ByteBuffer>> returnSubscriber) {
+            publisher.subscribe(returnSubscriber);
         }
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
-            unfulfilled = maxQueueSize;
-            (this.subscription = subscription).request(maxQueueSize);
+            clientSubscription = subscription;
+            clientSubscription.request(5);
         }
 
         @Override
         public void onNext(List<ByteBuffer> item) {
-            if (--unfulfilled == (maxQueueSize / 2)) {
-                subscription.request(maxQueueSize - unfulfilled);
-                unfulfilled = maxQueueSize;
+            try {
+                for (ByteBuffer b : item)
+                    buffer.put(b);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                Utils.close(clientSock);
             }
-            queue.add(item);
-            processingScheduler.deferOrSchedule(executor);
         }
 
         @Override
         public void onError(Throwable throwable) {
-            queue.add(throwable);
-            processingScheduler.deferOrSchedule(executor);
+            throwable.printStackTrace();
+            Utils.close(clientSock);
         }
 
         @Override
         public void onComplete() {
-            queue.add(EOF);
-            processingScheduler.deferOrSchedule(executor);
+            try {
+                buffer.put(FlowTest.SENTINEL);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                Utils.close(clientSock);
+            }
         }
 
         @Override
@@ -177,57 +317,163 @@
             return false;
         }
 
-        private class InternalSubscription implements Flow.Subscription {
-
-            @Override
-            public void request(long n) {
-                if (n <= 0) {
-                    throw new InternalError();
-                }
-                demand.increase(n);
-                processingScheduler.runOrSchedule();
-            }
-
-            @Override
-            public void cancel() {
-                cancelled.set(true);
-            }
+        @Override
+        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+            publisher.subscribe(subscriber);
         }
-
-        private SequentialScheduler.RestartableTask createProcessingTask() {
-            return new SequentialScheduler.CompleteRestartableTask() {
+    }
 
-                @Override
-                protected void run() {
-                    while (!cancelled.get()) {
-                        Object item = queue.peek();
-                        if (item == null)
-                            return;
-                        try {
-                            if (item instanceof List) {
-                                if (!demand.tryDecrement())
-                                    return;
-                                @SuppressWarnings("unchecked")
-                                List<ByteBuffer> bytes = (List<ByteBuffer>) item;
-                                subscriber.onNext(bytes);
-                            } else if (item instanceof Throwable) {
-                                cancelled.set(true);
-                                subscriber.onError((Throwable) item);
-                            } else if (item == EOF) {
-                                cancelled.set(true);
-                                subscriber.onComplete();
-                            } else {
-                                throw new InternalError(String.valueOf(item));
-                            }
-                        } finally {
-                            Object removed = queue.remove();
-                            assert removed == item;
-                        }
-                    }
-                }
-            };
+    private static void sleep(long millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException e) {
+
         }
     }
+//    private static final class EchoTube implements FlowTube {
+//
+//        private final static Object EOF = new Object();
+//        private final Executor executor = Executors.newSingleThreadExecutor();
+//
+//        private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
+//        private final int maxQueueSize;
+//        private final SequentialScheduler processingScheduler =
+//                new SequentialScheduler(createProcessingTask());
+//
+//        /* Writing into this tube */
+//        private long unfulfilled;
+//        private Flow.Subscription subscription;
+//
+//        /* Reading from this tube */
+//        private final Demand demand = new Demand();
+//        private final AtomicBoolean cancelled = new AtomicBoolean();
+//        private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
+//
+//        private EchoTube(int maxBufferSize) {
+//            if (maxBufferSize < 1)
+//                throw new IllegalArgumentException();
+//            this.maxQueueSize = maxBufferSize;
+//        }
+//
+//        @Override
+//        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+//            this.subscriber = subscriber;
+//            System.out.println("EchoTube got subscriber: " + subscriber);
+//            this.subscriber.onSubscribe(new InternalSubscription());
+//        }
+//
+//        @Override
+//        public void onSubscribe(Flow.Subscription subscription) {
+//            unfulfilled = maxQueueSize;
+//            System.out.println("EchoTube request: " + maxQueueSize);
+//            (this.subscription = subscription).request(maxQueueSize);
+//        }
+//
+//        @Override
+//        public void onNext(List<ByteBuffer> item) {
+//            if (--unfulfilled == (maxQueueSize / 2)) {
+//                long req = maxQueueSize - unfulfilled;
+//                subscription.request(req);
+//                System.out.println("EchoTube request: " + req);
+//                unfulfilled = maxQueueSize;
+//            }
+//            System.out.println("EchoTube add " + Utils.remaining(item));
+//            queue.add(item);
+//            processingScheduler.deferOrSchedule(executor);
+//        }
+//
+//        @Override
+//        public void onError(Throwable throwable) {
+//            System.out.println("EchoTube add " + throwable);
+//            queue.add(throwable);
+//            processingScheduler.deferOrSchedule(executor);
+//        }
+//
+//        @Override
+//        public void onComplete() {
+//            System.out.println("EchoTube add EOF");
+//            queue.add(EOF);
+//            processingScheduler.deferOrSchedule(executor);
+//        }
+//
+//        @Override
+//        public boolean isFinished() {
+//            return false;
+//        }
+//
+//        private class InternalSubscription implements Flow.Subscription {
+//
+//            @Override
+//            public void request(long n) {
+//                System.out.println("EchoTube got request: " + n);
+//                if (n <= 0) {
+//                    throw new InternalError();
+//                }
+//                demand.increase(n);
+//                processingScheduler.runOrSchedule();
+//            }
+//
+//            @Override
+//            public void cancel() {
+//                cancelled.set(true);
+//            }
+//        }
+//
+//        @Override
+//        public String toString() {
+//            return "EchoTube";
+//        }
+//
+//        private SequentialScheduler.RestartableTask createProcessingTask() {
+//            return new SequentialScheduler.CompleteRestartableTask() {
+//
+//                @Override
+//                protected void run() {
+//                    try {
+//                        while (!cancelled.get()) {
+//                            Object item = queue.peek();
+//                            if (item == null)
+//                                return;
+//                            try {
+//                                System.out.println("EchoTube processing item");
+//                                if (item instanceof List) {
+//                                    if (!demand.tryDecrement()) {
+//                                        System.out.println("EchoTube no demand");
+//                                        return;
+//                                    }
+//                                    @SuppressWarnings("unchecked")
+//                                    List<ByteBuffer> bytes = (List<ByteBuffer>) item;
+//                                    Object removed = queue.remove();
+//                                    assert removed == item;
+//                                    System.out.println("EchoTube processing "
+//                                            + Utils.remaining(bytes));
+//                                    subscriber.onNext(bytes);
+//                                } else if (item instanceof Throwable) {
+//                                    cancelled.set(true);
+//                                    Object removed = queue.remove();
+//                                    assert removed == item;
+//                                    System.out.println("EchoTube processing " + item);
+//                                    subscriber.onError((Throwable) item);
+//                                } else if (item == EOF) {
+//                                    cancelled.set(true);
+//                                    Object removed = queue.remove();
+//                                    assert removed == item;
+//                                    System.out.println("EchoTube processing EOF");
+//                                    subscriber.onComplete();
+//                                } else {
+//                                    throw new InternalError(String.valueOf(item));
+//                                }
+//                            } finally {
+//                            }
+//                        }
+//                    } catch(Throwable t) {
+//                        t.printStackTrace();
+//                        throw t;
+//                    }
+//                }
+//            };
+//        }
+//    }
 
     /**
      * The final subscriber which receives the decrypted looped-back data. Just
@@ -253,6 +499,7 @@
         public void onSubscribe(Flow.Subscription subscription) {
             this.subscription = subscription;
             unfulfilled = REQUEST_WINDOW;
+            System.out.println("EndSubscriber request " + REQUEST_WINDOW);
             subscription.request(REQUEST_WINDOW);
         }
 
@@ -269,7 +516,9 @@
         @Override
         public void onNext(List<ByteBuffer> buffers) {
             if (--unfulfilled == (REQUEST_WINDOW / 2)) {
-                subscription.request(REQUEST_WINDOW - unfulfilled);
+                long req = REQUEST_WINDOW - unfulfilled;
+                System.out.println("EndSubscriber request " + req);
+                subscription.request(req);
                 unfulfilled = REQUEST_WINDOW;
             }
 
@@ -277,6 +526,7 @@
             if (currval % 500 == 0) {
                 System.out.println("End: " + currval);
             }
+            System.out.println("EndSubscriber onNext " + Utils.remaining(buffers));
 
             for (ByteBuffer buf : buffers) {
                 while (buf.hasRemaining()) {
@@ -298,6 +548,7 @@
 
         @Override
         public void onError(Throwable throwable) {
+            System.out.println("EndSubscriber onError " + throwable);
             completion.completeExceptionally(throwable);
         }
 
@@ -312,6 +563,10 @@
                 completion.complete(null);
             }
         }
+        @Override
+        public String toString() {
+            return "EndSubscriber";
+        }
     }
 
     private static SSLEngine createSSLEngine(boolean client) throws IOException {