http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup) http-client-branch
authordfuchs
Wed, 29 Nov 2017 11:15:19 +0000
branchhttp-client-branch
changeset 55909 583695a0ed6a
parent 55908 a36a236e55d8
child 55910 9b2f7e9e95a2
http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SocketTube.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/FlowTube.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java
test/jdk/java/net/httpclient/http2/BasicTest.java
test/jdk/java/net/httpclient/whitebox/SSLTubeTestDriver.java
test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java	Tue Nov 28 18:03:56 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java	Wed Nov 29 11:15:19 2017 +0000
@@ -90,7 +90,7 @@
     ConnectionPool.CacheKey cacheKey() {
         return ConnectionPool.cacheKey(address, null);
     }
-    
+
     @Override
     public void close() {
         plainConnection.close();
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SocketTube.java	Tue Nov 28 18:03:56 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SocketTube.java	Wed Nov 29 11:15:19 2017 +0000
@@ -558,8 +558,8 @@
                     if (subscribed || cancelled) return;
                     subscribed = true;
                 }
-                subscriber.onConnection(this);
-                debug.log(Level.DEBUG, "onConnection called");
+                subscriber.onSubscribe(this);
+                debug.log(Level.DEBUG, "onSubscribe called");
                 if (errorRef.get() != null) {
                     signalCompletion();
                 }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/FlowTube.java	Tue Nov 28 18:03:56 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/FlowTube.java	Wed Nov 29 11:15:19 2017 +0000
@@ -40,12 +40,7 @@
  * flow. A FlowTube supports handing over the same read subscription to different
  * sequential read subscribers over time. When {@code connectFlows(writePublisher,
  * readSubscriber} is called, the FlowTube will call {@code dropSubscription} on
- * its former readSubscriber, and {@code onConnection} on its new readSubscriber.
- * By default, the implementation of {@code onConnection} is to call
- * {@code onSubscribe}, but a subscriber that needs to subscribe sequentially
- * several times to the same FlowTube may override the default implementation
- * to ensure that {@code onSubscribe} is called only once.
- *
+ * its former readSubscriber, and {@code onSubscribe} on its new readSubscriber.
  */
 public interface FlowTube extends
        Flow.Publisher<List<ByteBuffer>>,
@@ -59,14 +54,6 @@
      * should stop calling any method on its subscription.
      */
     static interface TubeSubscriber extends Flow.Subscriber<List<ByteBuffer>> {
-        /**
-         * Called by {@code FlowTube.connectFlows}.
-         * @param subscription the subscription.
-         * @implSpec By default this method call {@code this.onSubscribe()}.
-         */
-        default void onConnection(Flow.Subscription subscription) {
-            onSubscribe(subscription);
-        }
 
         /**
          * Called when the flow is connected again, and the subscription
@@ -176,10 +163,6 @@
             @Override
             public void dropSubscription() {}
             @Override
-            public void onConnection(Flow.Subscription subscription) {
-                delegate.onSubscribe(subscription);
-            }
-            @Override
             public void onSubscribe(Flow.Subscription subscription) {
                 delegate.onSubscribe(subscription);
             }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java	Tue Nov 28 18:03:56 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java	Wed Nov 29 11:15:19 2017 +0000
@@ -92,6 +92,7 @@
     final CompletableFuture<Void> cf;
     final CompletableFuture<String> alpnCF; // completes on initial handshake
     final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
+    volatile boolean close_notify_received;
 
     /**
      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
@@ -121,6 +122,15 @@
     }
 
     /**
+     * Returns true if the SSLFlowDelegate has detected a TLS
+     * close_notify from the server.
+     * @return true, if a close_notify was detected.
+     */
+    public boolean closeNotifyReceived() {
+        return close_notify_received;
+    }
+
+    /**
      * Connects the read sink (downReader) to the SSLFlowDelegate Reader,
      * and the write sink (downWriter) to the SSLFlowDelegate Writer.
      * Called from within the constructor. Overwritten by SSLTube.
@@ -461,6 +471,11 @@
             scheduler.stop();
         }
 
+        @Override
+        public boolean closing() {
+            return closeNotifyReceived();
+        }
+
         private boolean isCompleting() {
             synchronized(writeList) {
                 int lastIndex = writeList.size() - 1;
@@ -692,8 +707,7 @@
                     dst = b;
                     break;
                 case CLOSED:
-                    doClosure();
-                    return new EngineResult(sslResult);
+                    return doClosure(new EngineResult(sslResult));
                 case BUFFER_UNDERFLOW:
                     // handled implicitly by compaction/reallocation of readBuf
                     return new EngineResult(sslResult);
@@ -705,9 +719,22 @@
     }
 
     // FIXME: acknowledge a received CLOSE request from peer
-    void doClosure() throws IOException {
-        //while (!wrapAndSend(emptyArray))
-            //;
+    EngineResult doClosure(EngineResult r) throws IOException {
+        debug.log(Level.DEBUG,
+                "doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]",
+                r.result, engine.getHandshakeStatus(),
+                engine.isOutboundDone(), engine.isInboundDone());
+        if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
+            // we have received TLS close_notify and need to send
+            // an acknowledgement back. We're calling doHandshake
+            // to finish the close handshake.
+            if (engine.isInboundDone() && !engine.isOutboundDone()) {
+                debug.log(Level.DEBUG, "doClosure: close_notify received");
+                close_notify_received = true;
+                doHandshake(r, READER);
+            }
+        }
+        return r;
     }
 
     /**
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java	Tue Nov 28 18:03:56 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java	Wed Nov 29 11:15:19 2017 +0000
@@ -200,11 +200,6 @@
             onSubscribe(delegate::onSubscribe, subscription);
         }
 
-        @Override
-        public void onConnection(Flow.Subscription subscription) {
-            onSubscribe(delegate::onConnection, subscription);
-        }
-
         private void onSubscribe(Consumer<Flow.Subscription> method,
                                  Flow.Subscription subscription) {
             subscribedCalled = true;
@@ -292,7 +287,7 @@
         //    are going to signal the SSLFlowDelegate reader, and make sure
         //    onSubscribed is called within the reader flow
         // 2. If it's not yet subscribed (readSubscription == null), then
-        //    we're going to wait for onSubscribe/onConnection to be called.
+        //    we're going to wait for onSubscribe to be called.
         //
         void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
             debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s",
@@ -345,9 +340,7 @@
             if (previous != null) {
                 previous.dropSubscription();
             }
-            onNewSubscription(delegateWrapper,
-                              delegateWrapper::onSubscribe,
-                              subscription);
+            onNewSubscription(delegateWrapper, subscription);
         }
 
         @Override
@@ -359,14 +352,6 @@
         }
 
         @Override
-        public void onConnection(Flow.Subscription subscription) {
-            debug.log(Level.DEBUG,
-                      "SSLSubscriberWrapper (reader) onConnection(%s)",
-                      subscription);
-            onSubscribeImpl(subscription);
-        }
-
-        @Override
         public void onSubscribe(Flow.Subscription subscription) {
             debug.log(Level.DEBUG,
                       "SSLSubscriberWrapper (reader) onSubscribe(%s)",
@@ -374,7 +359,7 @@
             onSubscribeImpl(subscription);
         }
 
-        // called in the reader flow, from either onSubscribe or onConnection.
+        // called in the reader flow, from onSubscribe.
         private void onSubscribeImpl(Flow.Subscription subscription) {
             assert subscription != null;
             DelegateWrapper subscriberImpl, pending;
@@ -395,13 +380,11 @@
                 // There is no pending delegate, but we have a previously
                 // subscribed delegate. This is obviously a re-subscribe.
                 // We are in the downstream reader flow, so we should call
-                // onConnection directly.
+                // onSubscribe directly.
                 debug.log(Level.DEBUG,
                       "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
                       "resubscribing");
-                onNewSubscription(subscriberImpl,
-                                  subscriberImpl::onConnection,
-                                  subscription);
+                onNewSubscription(subscriberImpl, subscription);
             } else {
                 // We have some pending subscriber: subscribe it now that we have
                 // a subscription. If we already had a previous delegate then
@@ -414,10 +397,8 @@
         }
 
         private void onNewSubscription(DelegateWrapper subscriberImpl,
-                                       Consumer<Flow.Subscription> method,
                                        Flow.Subscription subscription) {
             assert subscriberImpl != null;
-            assert method != null;
             assert subscription != null;
 
             Throwable failed;
@@ -426,7 +407,7 @@
             // subscriber
             sslDelegate.resetReaderDemand();
             // send the subscription to the subscriber.
-            method.accept(subscription);
+            subscriberImpl.onSubscribe(subscription);
 
             // The following twisted logic is just here that we don't invoke
             // onError before onSubscribe. It also prevents race conditions
@@ -484,6 +465,16 @@
             return !(hs == NOT_HANDSHAKING || hs == FINISHED);
         }
 
+        private boolean handshakeFailed() {
+            // sslDelegate can be null if we reach here
+            // during the initial handshake, as that happens
+            // within the SSLFlowDelegate constructor.
+            // In that case we will want to raise an exception.
+            return handshaking()
+                    && (sslDelegate == null
+                    || !sslDelegate.closeNotifyReceived());
+        }
+
         @Override
         public void onComplete() {
             assert !finished && !onCompleteReceived;
@@ -493,7 +484,12 @@
                 subscriberImpl = subscribed;
             }
 
-            if (handshaking()) {
+            if (handshakeFailed()) {
+                debug.log(Level.DEBUG,
+                        "handshake: %s, inbound done: %s outbound done: %s",
+                        engine.getHandshakeStatus(),
+                        engine.isInboundDone(),
+                        engine.isOutboundDone());
                 onErrorImpl(new SSLHandshakeException(
                         "Remote host terminated the handshake"));
             } else if (subscriberImpl != null) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java	Tue Nov 28 18:03:56 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java	Wed Nov 29 11:15:19 2017 +0000
@@ -195,12 +195,23 @@
         outgoing(List.of(buffer), complete);
     }
 
+    /**
+     * Sometime it might be necessary to complete the downstream subscriber
+     * before the upstream completes. For instance, when an SSL server
+     * sends a notify_close. In that case we should let the outgoing
+     * complete before upstream us completed.
+     * @return true, may be overridden by subclasses.
+     */
+    public boolean closing() {
+        return false;
+    }
+
     public void outgoing(List<ByteBuffer> buffers, boolean complete) {
         Objects.requireNonNull(buffers);
         if (complete) {
             assert Utils.remaining(buffers) == 0;
             logger.log(Level.DEBUG, "completionAcknowledged");
-            if (!upstreamCompleted)
+            if (!upstreamCompleted && !closing())
                 throw new IllegalStateException("upstream not completed");
             completionAcknowledged = true;
         } else {
--- a/test/jdk/java/net/httpclient/http2/BasicTest.java	Tue Nov 28 18:03:56 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/BasicTest.java	Wed Nov 29 11:15:19 2017 +0000
@@ -103,6 +103,7 @@
             currentCF.getAndUpdate((cf) -> {
                 if (cf  == null || cf.isDone()) {
                     cf = exchange.sendPing();
+                    assert cf != null;
                     cfs.add(cf);
                 }
                 return cf;
@@ -123,8 +124,10 @@
             paramsTest();
             Thread.sleep(1000 * 4);
             CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0])).join();
-            for (CompletableFuture<Long> cf : cfs) {
-                System.out.printf("Ping ack received in %d millisec\n", cf.get());
+            synchronized (cfs) {
+                for (CompletableFuture<Long> cf : cfs) {
+                    System.out.printf("Ping ack received in %d millisec\n", cf.get());
+                }
             }
         } catch (Throwable tt) {
             System.err.println("tt caught");
--- a/test/jdk/java/net/httpclient/whitebox/SSLTubeTestDriver.java	Tue Nov 28 18:03:56 2017 +0300
+++ b/test/jdk/java/net/httpclient/whitebox/SSLTubeTestDriver.java	Wed Nov 29 11:15:19 2017 +0000
@@ -24,7 +24,5 @@
 /*
  * @test
  * @modules jdk.incubator.httpclient
- * @ignore
+ * @run testng/othervm -Djdk.internal.httpclient.debug=true jdk.incubator.httpclient/jdk.incubator.http.SSLTubeTest
  */
-
- // FIXME * @run testng jdk.incubator.httpclient/jdk.incubator.http.SSLTubeTest
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java	Tue Nov 28 18:03:56 2017 +0300
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java	Wed Nov 29 11:15:19 2017 +0000
@@ -25,19 +25,27 @@
 
 import jdk.incubator.http.internal.common.Demand;
 import jdk.incubator.http.internal.common.FlowTube;
+import jdk.incubator.http.internal.common.SSLFlowDelegate;
 import jdk.incubator.http.internal.common.SSLTube;
 import jdk.incubator.http.internal.common.SequentialScheduler;
+import jdk.incubator.http.internal.common.Utils;
 import org.testng.annotations.Test;
 
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocket;
 import javax.net.ssl.TrustManagerFactory;
+import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.security.KeyManagementException;
 import java.security.KeyStore;
@@ -48,6 +56,7 @@
 import java.util.List;
 import java.util.Queue;
 import java.util.StringTokenizer;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
@@ -55,8 +64,10 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Flow;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SubmissionPublisher;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 @Test
@@ -80,9 +91,13 @@
         /* Start of wiring */
         ExecutorService sslExecutor = Executors.newCachedThreadPool();
         /* Emulates an echo server */
-        FlowTube server = new SSLTube(createSSLEngine(false),
-                                      sslExecutor,
-                                      new EchoTube(16));
+//        FlowTube server = new SSLTube(createSSLEngine(false),
+//                                      sslExecutor,
+//                                      new EchoTube(16));
+        SSLLoopbackSubscriber server =
+                new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor);
+        server.start();
+
         FlowTube client = new SSLTube(createSSLEngine(true),
                                       sslExecutor,
                                       server);
@@ -113,63 +128,188 @@
         }
     }
 
-    private static final class EchoTube implements FlowTube {
-
-        private final static Object EOF = new Object();
-        private final Executor executor = Executors.newSingleThreadExecutor();
+    static class SSLLoopbackSubscriber implements FlowTube {
+        private final BlockingQueue<ByteBuffer> buffer;
+        private final Socket clientSock;
+        private final SSLSocket serverSock;
+        private final Thread thread1, thread2, thread3;
+        private volatile Flow.Subscription clientSubscription;
+        private final SubmissionPublisher<List<ByteBuffer>> publisher;
 
-        private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
-        private final int maxQueueSize;
-        private final SequentialScheduler processingScheduler =
-                new SequentialScheduler(createProcessingTask());
+        SSLLoopbackSubscriber(SSLContext ctx, ExecutorService exec) throws IOException {
+            SSLServerSocketFactory fac = ctx.getServerSocketFactory();
+            SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0);
+            SSLParameters params = serv.getSSLParameters();
+            params.setApplicationProtocols(new String[]{"proto2"});
+            serv.setSSLParameters(params);
+
+
+            int serverPort = serv.getLocalPort();
+            clientSock = new Socket("127.0.0.1", serverPort);
+            serverSock = (SSLSocket) serv.accept();
+            this.buffer = new LinkedBlockingQueue<>();
+            thread1 = new Thread(this::clientWriter, "clientWriter");
+            thread2 = new Thread(this::serverLoopback, "serverLoopback");
+            thread3 = new Thread(this::clientReader, "clientReader");
+            publisher = new SubmissionPublisher<>(exec, Flow.defaultBufferSize(),
+                    this::handlePublisherException);
+            SSLFlowDelegate.Monitor.add(this::monitor);
+        }
 
-        /* Writing into this tube */
-        private long unfulfilled;
-        private Flow.Subscription subscription;
+        public void start() {
+            thread1.start();
+            thread2.start();
+            thread3.start();
+        }
+
+        private void handlePublisherException(Object o, Throwable t) {
+            System.out.println("Loopback Publisher exception");
+            t.printStackTrace(System.out);
+        }
+
+        private final AtomicInteger readCount = new AtomicInteger();
 
-        /* Reading from this tube */
-        private final Demand demand = new Demand();
-        private final AtomicBoolean cancelled = new AtomicBoolean();
-        private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
-
-        private EchoTube(int maxBufferSize) {
-            if (maxBufferSize < 1)
-                throw new IllegalArgumentException();
-            this.maxQueueSize = maxBufferSize;
+        // reads off the SSLSocket the data from the "server"
+        private void clientReader() {
+            try {
+                InputStream is = clientSock.getInputStream();
+                final int bufsize = FlowTest.randomRange(512, 16 * 1024);
+                System.out.println("clientReader: bufsize = " + bufsize);
+                while (true) {
+                    byte[] buf = new byte[bufsize];
+                    int n = is.read(buf);
+                    if (n == -1) {
+                        System.out.println("clientReader close: read "
+                                + readCount.get() + " bytes");
+                        publisher.close();
+                        sleep(2000);
+                        Utils.close(is, clientSock);
+                        return;
+                    }
+                    ByteBuffer bb = ByteBuffer.wrap(buf, 0, n);
+                    readCount.addAndGet(n);
+                    publisher.submit(List.of(bb));
+                }
+            } catch (Throwable e) {
+                e.printStackTrace();
+                Utils.close(clientSock);
+            }
         }
 
-        @Override
-        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
-            this.subscriber = subscriber;
-            this.subscriber.onSubscribe(new InternalSubscription());
+        // writes the encrypted data from SSLFLowDelegate to the j.n.Socket
+        // which is connected to the SSLSocket emulating a server.
+        private void clientWriter() {
+            long nbytes = 0;
+            try {
+                OutputStream os =
+                        new BufferedOutputStream(clientSock.getOutputStream());
+
+                while (true) {
+                    ByteBuffer buf = buffer.take();
+                    if (buf == FlowTest.SENTINEL) {
+                        // finished
+                        //Utils.sleep(2000);
+                        System.out.println("clientWriter close: " + nbytes + " written");
+                        clientSock.shutdownOutput();
+                        System.out.println("clientWriter close return");
+                        return;
+                    }
+                    int len = buf.remaining();
+                    int written = writeToStream(os, buf);
+                    assert len == written;
+                    nbytes += len;
+                    assert !buf.hasRemaining()
+                            : "buffer has " + buf.remaining() + " bytes left";
+                    clientSubscription.request(1);
+                }
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+
+        private int writeToStream(OutputStream os, ByteBuffer buf) throws IOException {
+            byte[] b = buf.array();
+            int offset = buf.arrayOffset() + buf.position();
+            int n = buf.limit() - buf.position();
+            os.write(b, offset, n);
+            buf.position(buf.limit());
+            os.flush();
+            return n;
+        }
+
+        private final AtomicInteger loopCount = new AtomicInteger();
+
+        public String monitor() {
+            return "serverLoopback: loopcount = " + loopCount.toString()
+                    + " clientRead: count = " + readCount.toString();
+        }
+
+        // thread2
+        private void serverLoopback() {
+            try {
+                InputStream is = serverSock.getInputStream();
+                OutputStream os = serverSock.getOutputStream();
+                final int bufsize = FlowTest.randomRange(512, 16 * 1024);
+                System.out.println("serverLoopback: bufsize = " + bufsize);
+                byte[] bb = new byte[bufsize];
+                while (true) {
+                    int n = is.read(bb);
+                    if (n == -1) {
+                        sleep(2000);
+                        is.close();
+                        os.close();
+                        serverSock.close();
+                        return;
+                    }
+                    os.write(bb, 0, n);
+                    os.flush();
+                    loopCount.addAndGet(n);
+                }
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+
+
+        /**
+         * This needs to be called before the chain is subscribed. It can't be
+         * supplied in the constructor.
+         */
+        public void setReturnSubscriber(Flow.Subscriber<List<ByteBuffer>> returnSubscriber) {
+            publisher.subscribe(returnSubscriber);
         }
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
-            unfulfilled = maxQueueSize;
-            (this.subscription = subscription).request(maxQueueSize);
+            clientSubscription = subscription;
+            clientSubscription.request(5);
         }
 
         @Override
         public void onNext(List<ByteBuffer> item) {
-            if (--unfulfilled == (maxQueueSize / 2)) {
-                subscription.request(maxQueueSize - unfulfilled);
-                unfulfilled = maxQueueSize;
+            try {
+                for (ByteBuffer b : item)
+                    buffer.put(b);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                Utils.close(clientSock);
             }
-            queue.add(item);
-            processingScheduler.deferOrSchedule(executor);
         }
 
         @Override
         public void onError(Throwable throwable) {
-            queue.add(throwable);
-            processingScheduler.deferOrSchedule(executor);
+            throwable.printStackTrace();
+            Utils.close(clientSock);
         }
 
         @Override
         public void onComplete() {
-            queue.add(EOF);
-            processingScheduler.deferOrSchedule(executor);
+            try {
+                buffer.put(FlowTest.SENTINEL);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                Utils.close(clientSock);
+            }
         }
 
         @Override
@@ -177,57 +317,163 @@
             return false;
         }
 
-        private class InternalSubscription implements Flow.Subscription {
-
-            @Override
-            public void request(long n) {
-                if (n <= 0) {
-                    throw new InternalError();
-                }
-                demand.increase(n);
-                processingScheduler.runOrSchedule();
-            }
-
-            @Override
-            public void cancel() {
-                cancelled.set(true);
-            }
+        @Override
+        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+            publisher.subscribe(subscriber);
         }
-
-        private SequentialScheduler.RestartableTask createProcessingTask() {
-            return new SequentialScheduler.CompleteRestartableTask() {
+    }
 
-                @Override
-                protected void run() {
-                    while (!cancelled.get()) {
-                        Object item = queue.peek();
-                        if (item == null)
-                            return;
-                        try {
-                            if (item instanceof List) {
-                                if (!demand.tryDecrement())
-                                    return;
-                                @SuppressWarnings("unchecked")
-                                List<ByteBuffer> bytes = (List<ByteBuffer>) item;
-                                subscriber.onNext(bytes);
-                            } else if (item instanceof Throwable) {
-                                cancelled.set(true);
-                                subscriber.onError((Throwable) item);
-                            } else if (item == EOF) {
-                                cancelled.set(true);
-                                subscriber.onComplete();
-                            } else {
-                                throw new InternalError(String.valueOf(item));
-                            }
-                        } finally {
-                            Object removed = queue.remove();
-                            assert removed == item;
-                        }
-                    }
-                }
-            };
+    private static void sleep(long millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException e) {
+
         }
     }
+//    private static final class EchoTube implements FlowTube {
+//
+//        private final static Object EOF = new Object();
+//        private final Executor executor = Executors.newSingleThreadExecutor();
+//
+//        private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
+//        private final int maxQueueSize;
+//        private final SequentialScheduler processingScheduler =
+//                new SequentialScheduler(createProcessingTask());
+//
+//        /* Writing into this tube */
+//        private long unfulfilled;
+//        private Flow.Subscription subscription;
+//
+//        /* Reading from this tube */
+//        private final Demand demand = new Demand();
+//        private final AtomicBoolean cancelled = new AtomicBoolean();
+//        private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
+//
+//        private EchoTube(int maxBufferSize) {
+//            if (maxBufferSize < 1)
+//                throw new IllegalArgumentException();
+//            this.maxQueueSize = maxBufferSize;
+//        }
+//
+//        @Override
+//        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+//            this.subscriber = subscriber;
+//            System.out.println("EchoTube got subscriber: " + subscriber);
+//            this.subscriber.onSubscribe(new InternalSubscription());
+//        }
+//
+//        @Override
+//        public void onSubscribe(Flow.Subscription subscription) {
+//            unfulfilled = maxQueueSize;
+//            System.out.println("EchoTube request: " + maxQueueSize);
+//            (this.subscription = subscription).request(maxQueueSize);
+//        }
+//
+//        @Override
+//        public void onNext(List<ByteBuffer> item) {
+//            if (--unfulfilled == (maxQueueSize / 2)) {
+//                long req = maxQueueSize - unfulfilled;
+//                subscription.request(req);
+//                System.out.println("EchoTube request: " + req);
+//                unfulfilled = maxQueueSize;
+//            }
+//            System.out.println("EchoTube add " + Utils.remaining(item));
+//            queue.add(item);
+//            processingScheduler.deferOrSchedule(executor);
+//        }
+//
+//        @Override
+//        public void onError(Throwable throwable) {
+//            System.out.println("EchoTube add " + throwable);
+//            queue.add(throwable);
+//            processingScheduler.deferOrSchedule(executor);
+//        }
+//
+//        @Override
+//        public void onComplete() {
+//            System.out.println("EchoTube add EOF");
+//            queue.add(EOF);
+//            processingScheduler.deferOrSchedule(executor);
+//        }
+//
+//        @Override
+//        public boolean isFinished() {
+//            return false;
+//        }
+//
+//        private class InternalSubscription implements Flow.Subscription {
+//
+//            @Override
+//            public void request(long n) {
+//                System.out.println("EchoTube got request: " + n);
+//                if (n <= 0) {
+//                    throw new InternalError();
+//                }
+//                demand.increase(n);
+//                processingScheduler.runOrSchedule();
+//            }
+//
+//            @Override
+//            public void cancel() {
+//                cancelled.set(true);
+//            }
+//        }
+//
+//        @Override
+//        public String toString() {
+//            return "EchoTube";
+//        }
+//
+//        private SequentialScheduler.RestartableTask createProcessingTask() {
+//            return new SequentialScheduler.CompleteRestartableTask() {
+//
+//                @Override
+//                protected void run() {
+//                    try {
+//                        while (!cancelled.get()) {
+//                            Object item = queue.peek();
+//                            if (item == null)
+//                                return;
+//                            try {
+//                                System.out.println("EchoTube processing item");
+//                                if (item instanceof List) {
+//                                    if (!demand.tryDecrement()) {
+//                                        System.out.println("EchoTube no demand");
+//                                        return;
+//                                    }
+//                                    @SuppressWarnings("unchecked")
+//                                    List<ByteBuffer> bytes = (List<ByteBuffer>) item;
+//                                    Object removed = queue.remove();
+//                                    assert removed == item;
+//                                    System.out.println("EchoTube processing "
+//                                            + Utils.remaining(bytes));
+//                                    subscriber.onNext(bytes);
+//                                } else if (item instanceof Throwable) {
+//                                    cancelled.set(true);
+//                                    Object removed = queue.remove();
+//                                    assert removed == item;
+//                                    System.out.println("EchoTube processing " + item);
+//                                    subscriber.onError((Throwable) item);
+//                                } else if (item == EOF) {
+//                                    cancelled.set(true);
+//                                    Object removed = queue.remove();
+//                                    assert removed == item;
+//                                    System.out.println("EchoTube processing EOF");
+//                                    subscriber.onComplete();
+//                                } else {
+//                                    throw new InternalError(String.valueOf(item));
+//                                }
+//                            } finally {
+//                            }
+//                        }
+//                    } catch(Throwable t) {
+//                        t.printStackTrace();
+//                        throw t;
+//                    }
+//                }
+//            };
+//        }
+//    }
 
     /**
      * The final subscriber which receives the decrypted looped-back data. Just
@@ -253,6 +499,7 @@
         public void onSubscribe(Flow.Subscription subscription) {
             this.subscription = subscription;
             unfulfilled = REQUEST_WINDOW;
+            System.out.println("EndSubscriber request " + REQUEST_WINDOW);
             subscription.request(REQUEST_WINDOW);
         }
 
@@ -269,7 +516,9 @@
         @Override
         public void onNext(List<ByteBuffer> buffers) {
             if (--unfulfilled == (REQUEST_WINDOW / 2)) {
-                subscription.request(REQUEST_WINDOW - unfulfilled);
+                long req = REQUEST_WINDOW - unfulfilled;
+                System.out.println("EndSubscriber request " + req);
+                subscription.request(req);
                 unfulfilled = REQUEST_WINDOW;
             }
 
@@ -277,6 +526,7 @@
             if (currval % 500 == 0) {
                 System.out.println("End: " + currval);
             }
+            System.out.println("EndSubscriber onNext " + Utils.remaining(buffers));
 
             for (ByteBuffer buf : buffers) {
                 while (buf.hasRemaining()) {
@@ -298,6 +548,7 @@
 
         @Override
         public void onError(Throwable throwable) {
+            System.out.println("EndSubscriber onError " + throwable);
             completion.completeExceptionally(throwable);
         }
 
@@ -312,6 +563,10 @@
                 completion.complete(null);
             }
         }
+        @Override
+        public String toString() {
+            return "EndSubscriber";
+        }
     }
 
     private static SSLEngine createSSLEngine(boolean client) throws IOException {