http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate http-client-branch
authordfuchs
Sat, 02 Dec 2017 17:40:57 +0000
branchhttp-client-branch
changeset 55942 8d4770c22b63
parent 55941 2d423c9b73bb
child 55944 b7e186aa1915
http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java
test/jdk/com/sun/net/httpserver/EchoHandler.java
test/jdk/java/net/httpclient/AbstractNoBody.java
test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java
test/jdk/java/net/httpclient/ManyRequests.java
test/jdk/java/net/httpclient/ManyRequestsLegacy.java
test/jdk/java/net/httpclient/MockServer.java
test/jdk/java/net/httpclient/NoBodyPartOne.java
test/jdk/java/net/httpclient/NoBodyPartTwo.java
test/jdk/java/net/httpclient/http2/server/Queue.java
test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java	Sat Dec 02 17:40:57 2017 +0000
@@ -77,9 +77,9 @@
 
         @Override
         public void handle() {
-            assert !connected : "Already connected";
-            assert !chan.isBlocking() : "Unexpected blocking channel";
             try {
+                assert !connected : "Already connected";
+                assert !chan.isBlocking() : "Unexpected blocking channel";
                 debug.log(Level.DEBUG, "ConnectEvent: finishing connect");
                 boolean finished = chan.finishConnect();
                 assert finished : "Expected channel to be connected";
@@ -88,7 +88,7 @@
                 connected = true;
                 // complete async since the event runs on the SelectorManager thread
                 cf.completeAsync(() -> null, client().theExecutor());
-            } catch (IOException e) {
+            } catch (Throwable e) {
                 client().theExecutor().execute( () -> cf.completeExceptionally(e));
             }
         }
@@ -102,10 +102,10 @@
 
     @Override
     public CompletableFuture<Void> connectAsync() {
-        assert !connected : "Already connected";
-        assert !chan.isBlocking() : "Unexpected blocking channel";
         CompletableFuture<Void> cf = new MinimalFuture<>();
         try {
+            assert !connected : "Already connected";
+            assert !chan.isBlocking() : "Unexpected blocking channel";
             boolean finished = false;
             PrivilegedExceptionAction<Boolean> pa = () -> chan.connect(address);
             try {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java	Sat Dec 02 17:40:57 2017 +0000
@@ -402,6 +402,7 @@
             try {
                 if (contentLength == 0) {
                     pusher.onComplete();
+                    onFinished.run();
                     onComplete.accept(null);
                 }
             } catch (Throwable t) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Sat Dec 02 17:40:57 2017 +0000
@@ -320,6 +320,8 @@
                             DEBUG_LOGGER.log(Level.DEBUG, "Increased demand by 1");
                             s.request(1);
                         }
+                        assert currentListItr != null;
+                        if (lb.isEmpty()) continue;
                     }
                     assert currentListItr != null;
                     assert currentListItr.hasNext();
@@ -359,27 +361,38 @@
 
         @Override
         public void onSubscribe(Flow.Subscription s) {
-            if (!subscribed.compareAndSet(false, true)) {
-                s.cancel();
-            } else {
-                // check whether the stream is already closed.
-                // if so, we should cancel the subscription
-                // immediately.
-                boolean closed;
-                synchronized(this) {
-                    closed = this.closed;
-                    if (!closed) {
-                        this.subscription = s;
+            try {
+                if (!subscribed.compareAndSet(false, true)) {
+                    s.cancel();
+                } else {
+                    // check whether the stream is already closed.
+                    // if so, we should cancel the subscription
+                    // immediately.
+                    boolean closed;
+                    synchronized (this) {
+                        closed = this.closed;
+                        if (!closed) {
+                            this.subscription = s;
+                        }
                     }
-                }
-                if (closed) {
-                    s.cancel();
-                    return;
+                    if (closed) {
+                        s.cancel();
+                        return;
+                    }
+                    assert buffers.remainingCapacity() > 1; // should contain at least 2
+                    DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
+                            + Math.max(1, buffers.remainingCapacity() - 1));
+                    s.request(Math.max(1, buffers.remainingCapacity() - 1));
                 }
-                assert buffers.remainingCapacity() > 1; // should contain at least 2
-                DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
-                        + Math.max(1, buffers.remainingCapacity() - 1));
-                s.request(Math.max(1, buffers.remainingCapacity() - 1));
+            } catch (Throwable t) {
+                failed = t;
+                try {
+                    close();
+                } catch (IOException x) {
+                    // OK
+                } finally {
+                    onError(t);
+                }
             }
         }
 
@@ -392,12 +405,14 @@
                     throw new IllegalStateException("queue is full");
                 }
                 DEBUG_LOGGER.log(Level.DEBUG, "item offered");
-            } catch (Exception ex) {
+            } catch (Throwable ex) {
                 failed = ex;
                 try {
                     close();
                 } catch (IOException ex1) {
                     // OK
+                } finally {
+                    onError(ex);
                 }
             }
         }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java	Sat Dec 02 17:40:57 2017 +0000
@@ -238,10 +238,7 @@
         public void incoming(List<ByteBuffer> buffers, boolean complete) {
             debugr.log(Level.DEBUG, () -> "Adding " + Utils.remaining(buffers)
                         + " bytes to read buffer");
-            addToReadBuf(buffers);
-            if (complete) {
-                this.completing = true;
-            }
+            addToReadBuf(buffers, complete);
             scheduler.runOrSchedule();
         }
 
@@ -269,7 +266,7 @@
         }
 
         // readBuf is kept ready for reading outside of this method
-        private void addToReadBuf(List<ByteBuffer> buffers) {
+        private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
             synchronized (readBufferLock) {
                 for (ByteBuffer buf : buffers) {
                     readBuf.compact();
@@ -278,6 +275,9 @@
                     readBuf.put(buf);
                     readBuf.flip();
                 }
+                if (complete) {
+                    this.completing = complete;
+                }
             }
         }
 
@@ -297,19 +297,32 @@
             try {
                 debugr.log(Level.DEBUG, () -> "processData: " + readBuf.remaining()
                            + " bytes to unwrap "
-                           + states(handshakeState));
-
-                while (readBuf.hasRemaining()) {
+                           + states(handshakeState)
+                           + ", " + engine.getHandshakeStatus());
+                int len;
+                boolean completing = false;
+                while ((len = readBuf.remaining()) > 0) {
                     boolean handshaking = false;
                     try {
                         EngineResult result;
                         synchronized (readBufferLock) {
+                            completing = this.completing;
                             result = unwrapBuffer(readBuf);
                             debugr.log(Level.DEBUG, "Unwrapped: %s", result.result);
                         }
+                        if (result.bytesProduced() > 0) {
+                            debugr.log(Level.DEBUG, "sending %d", result.bytesProduced());
+                            count.addAndGet(result.bytesProduced());
+                            outgoing(result.destBuffer, false);
+                        }
                         if (result.status() == Status.BUFFER_UNDERFLOW) {
                             debugr.log(Level.DEBUG, "BUFFER_UNDERFLOW");
-                            return;
+                            // not enough data in the read buffer...
+                            synchronized (readBufferLock) {
+                                // check if we have received some data
+                                if (readBuf.remaining() > len) continue;
+                                return;
+                            }
                         }
                         if (completing && result.status() == Status.CLOSED) {
                             debugr.log(Level.DEBUG, "Closed: completing");
@@ -328,11 +341,6 @@
                                 resumeActivity();
                             }
                         }
-                        if (result.bytesProduced() > 0) {
-                            debugr.log(Level.DEBUG, "sending %d", result.bytesProduced());
-                            count.addAndGet(result.bytesProduced());
-                            outgoing(result.destBuffer, false);
-                        }
                     } catch (IOException ex) {
                         errorCommon(ex);
                         handleError(ex);
@@ -340,6 +348,11 @@
                     if (handshaking && !completing)
                         return;
                 }
+                if (!completing) {
+                    synchronized (readBufferLock) {
+                        completing = this.completing && !readBuf.hasRemaining();
+                    }
+                }
                 if (completing) {
                     debugr.log(Level.DEBUG, "completing");
                     // Complete the alpnCF, if not already complete, regardless of
@@ -443,6 +456,7 @@
             assert complete ? buffers ==  Utils.EMPTY_BB_LIST : true;
             assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
             if (complete) {
+                debugw.log(Level.DEBUG, "adding SENTINEL");
                 writeList.add(SENTINEL);
             } else {
                 writeList.addAll(buffers);
@@ -507,7 +521,8 @@
 
             try {
                 debugw.log(Level.DEBUG, () -> "processData(" + Utils.remaining(writeList) + ")");
-                while (Utils.remaining(writeList) > 0 || hsTriggered()) {
+                while (Utils.remaining(writeList) > 0 || hsTriggered()
+                        || needWrap()) {
                     ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
                     EngineResult result = wrapBuffers(outbufs);
                     debugw.log(Level.DEBUG, "wrapBuffer returned %s", result.result);
@@ -538,6 +553,7 @@
                         if (writeList.isEmpty() && !result.needUnwrap()) {
                             writer.addData(HS_TRIGGER);
                         }
+                        if (needWrap()) continue;
                         return;
                     }
                 }
@@ -551,11 +567,18 @@
                     outgoing(Utils.EMPTY_BB_LIST, true);
                     return;
                 }
+                if (writeList.isEmpty() && needWrap()) {
+                    writer.addData(HS_TRIGGER);
+                }
             } catch (Throwable ex) {
                 handleError(ex);
             }
         }
 
+        private boolean needWrap() {
+            return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP;
+        }
+
         private void sendResultBytes(EngineResult result) {
             if (result.bytesProduced() > 0) {
                 debugw.log(Level.DEBUG, "Sending %d bytes downstream",
@@ -678,13 +701,19 @@
     private void executeTasks(List<Runnable> tasks) {
         exec.execute(() -> {
             handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
-            try {
-                tasks.forEach((r) -> {
-                    r.run();
-                });
-            } catch (Throwable t) {
-                handleError(t);
-            }
+            List<Runnable> nextTasks = tasks;
+            do {
+                try {
+                    nextTasks.forEach((r) -> {
+                        r.run();
+                    });
+                    if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+                        nextTasks = obtainTasks();
+                    } else break;
+                } catch (Throwable t) {
+                    handleError(t);
+                }
+            } while(true);
             handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
             writer.addData(HS_TRIGGER);
             resumeActivity();
--- a/test/jdk/com/sun/net/httpserver/EchoHandler.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/test/jdk/com/sun/net/httpserver/EchoHandler.java	Sat Dec 02 17:40:57 2017 +0000
@@ -66,8 +66,8 @@
             t.sendResponseHeaders(200, in.length);
             OutputStream os = t.getResponseBody();
             os.write(in);
-            close(os);
-            close(is);
+            close(t, os);
+            close(t, is);
         } else {
             OutputStream os = t.getResponseBody();
             byte[] buf = new byte[64 * 1024];
@@ -84,15 +84,21 @@
                 String s = Integer.toString(count);
                 os.write(s.getBytes());
             }
-            close(os);
-            close(is);
+            close(t, os);
+            close(t, is);
         }
     }
 
     protected void close(OutputStream os) throws IOException {
-            os.close();
+        os.close();
     }
     protected void close(InputStream is) throws IOException {
-            is.close();
+        is.close();
+    }
+    protected void close(HttpExchange t, OutputStream os) throws IOException {
+        close(os);
+    }
+    protected void close(HttpExchange t, InputStream is) throws IOException {
+        close(is);
     }
 }
--- a/test/jdk/java/net/httpclient/AbstractNoBody.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/test/jdk/java/net/httpclient/AbstractNoBody.java	Sat Dec 02 17:40:57 2017 +0000
@@ -25,6 +25,7 @@
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpHandler;
@@ -57,7 +58,8 @@
     static final String SIMPLE_STRING = "Hello world. Goodbye world";
     static final int ITERATION_COUNT = 10;
     // a shared executor helps reduce the amount of threads created by the test
-    static final Executor executor = Executors.newCachedThreadPool();
+    static final Executor executor = Executors.newFixedThreadPool(ITERATION_COUNT * 2);
+    static final ExecutorService serverExecutor = Executors.newFixedThreadPool(ITERATION_COUNT * 4);
 
     @DataProvider(name = "variants")
     public Object[][] variants() {
@@ -91,6 +93,7 @@
 
     @BeforeTest
     public void setup() throws Exception {
+        printStamp(START, "setup");
         sslContext = new SimpleSSLContext().get();
         if (sslContext == null)
             throw new AssertionError("Unexpected null sslContext");
@@ -100,12 +103,14 @@
         HttpHandler h1_chunkNoBodyHandler = new HTTP1_ChunkedNoBodyHandler();
         InetSocketAddress sa = new InetSocketAddress("localhost", 0);
         httpTestServer = HttpServer.create(sa, 0);
+        httpTestServer.setExecutor(serverExecutor);
         httpTestServer.createContext("/http1/noBodyFixed", h1_fixedLengthNoBodyHandler);
         httpTestServer.createContext("/http1/noBodyChunk", h1_chunkNoBodyHandler);
         httpURI_fixed = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/noBodyFixed";
         httpURI_chunk = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/noBodyChunk";
 
         httpsTestServer = HttpsServer.create(sa, 0);
+        httpsTestServer.setExecutor(serverExecutor);
         httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
         httpsTestServer.createContext("/https1/noBodyFixed", h1_fixedLengthNoBodyHandler);
         httpsTestServer.createContext("/https1/noBodyChunk", h1_chunkNoBodyHandler);
@@ -116,14 +121,14 @@
         Http2Handler h2_fixedLengthNoBodyHandler = new HTTP2_FixedLengthNoBodyHandler();
         Http2Handler h2_chunkedNoBodyHandler = new HTTP2_ChunkedNoBodyHandler();
 
-        http2TestServer = new Http2TestServer("127.0.0.1", false, 0);
+        http2TestServer = new Http2TestServer("127.0.0.1", false, 0, serverExecutor, null);
         http2TestServer.addHandler(h2_fixedLengthNoBodyHandler, "/http2/noBodyFixed");
         http2TestServer.addHandler(h2_chunkedNoBodyHandler, "/http2/noBodyChunk");
         int port = http2TestServer.getAddress().getPort();
         http2URI_fixed = "http://127.0.0.1:" + port + "/http2/noBodyFixed";
         http2URI_chunk = "http://127.0.0.1:" + port + "/http2/noBodyChunk";
 
-        https2TestServer = new Http2TestServer("127.0.0.1", true, 0);
+        https2TestServer = new Http2TestServer("127.0.0.1", true, 0, serverExecutor, sslContext);
         https2TestServer.addHandler(h2_fixedLengthNoBodyHandler, "/https2/noBodyFixed");
         https2TestServer.addHandler(h2_chunkedNoBodyHandler, "/https2/noBodyChunk");
         port = https2TestServer.getAddress().getPort();
@@ -134,16 +139,34 @@
         httpsTestServer.start();
         http2TestServer.start();
         https2TestServer.start();
+        printStamp(END,"setup");
     }
 
     @AfterTest
     public void teardown() throws Exception {
+        printStamp(START, "teardown");
         httpTestServer.stop(0);
         httpsTestServer.stop(0);
         http2TestServer.stop();
         https2TestServer.stop();
+       printStamp(END, "teardown");
     }
 
+    static final long start = System.nanoTime();
+    static final String START = "start";
+    static final String END   = "end  ";
+    static long elapsed() { return (System.nanoTime() - start)/1000_000;}
+    void printStamp(String what, String fmt, Object... args) {
+        long elapsed = elapsed();
+        long sec = elapsed/1000;
+        long ms  = elapsed % 1000;
+        String time = sec > 0 ? sec + "sec " : "";
+        time = time + ms + "ms";
+        System.out.printf("%s: %s \t [%s]\t %s%n",
+                getClass().getSimpleName(), what, time, String.format(fmt,args));
+    }
+
+
     static class HTTP1_FixedLengthNoBodyHandler implements HttpHandler {
         @Override
         public void handle(HttpExchange t) throws IOException {
--- a/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java	Sat Dec 02 17:40:57 2017 +0000
@@ -178,6 +178,7 @@
         try {
             return is.readAllBytes();
         } catch (IOException io) {
+            io.printStackTrace();
             throw new CompletionException(io);
         }
     }
--- a/test/jdk/java/net/httpclient/ManyRequests.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/test/jdk/java/net/httpclient/ManyRequests.java	Sat Dec 02 17:40:57 2017 +0000
@@ -107,19 +107,23 @@
             System.out.println("Server: received " + e.getRequestURI());
             super.handle(e);
         }
-        protected void close(OutputStream os) throws IOException {
+        @Override
+        protected void close(HttpExchange t, OutputStream os) throws IOException {
             if (INSERT_DELAY) {
                 try { Thread.sleep(rand.nextInt(200)); }
                 catch (InterruptedException e) {}
             }
-            os.close();
+            System.out.println("Server: close outbound: " + t.getRequestURI());
+            super.close(t, os);
         }
-        protected void close(InputStream is) throws IOException {
+        @Override
+        protected void close(HttpExchange t, InputStream is) throws IOException {
             if (INSERT_DELAY) {
                 try { Thread.sleep(rand.nextInt(200)); }
                 catch (InterruptedException e) {}
             }
-            is.close();
+            System.out.println("Server: close inbound: " + t.getRequestURI());
+            super.close(t, is);
         }
     }
 
--- a/test/jdk/java/net/httpclient/ManyRequestsLegacy.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/test/jdk/java/net/httpclient/ManyRequestsLegacy.java	Sat Dec 02 17:40:57 2017 +0000
@@ -204,19 +204,21 @@
             super.handle(e);
         }
         @Override
-        protected void close(OutputStream os) throws IOException {
+        protected void close(HttpExchange t, OutputStream os) throws IOException {
             if (INSERT_DELAY) {
                 try { Thread.sleep(rand.nextInt(200)); }
                 catch (InterruptedException e) {}
             }
+            System.out.println("Server: close outbound: " + t.getRequestURI());
             os.close();
         }
         @Override
-        protected void close(InputStream is) throws IOException {
+        protected void close(HttpExchange t, InputStream is) throws IOException {
             if (INSERT_DELAY) {
                 try { Thread.sleep(rand.nextInt(200)); }
                 catch (InterruptedException e) {}
             }
+            System.out.println("Server: close inbound: " + t.getRequestURI());
             is.close();
         }
     }
--- a/test/jdk/java/net/httpclient/MockServer.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/test/jdk/java/net/httpclient/MockServer.java	Sat Dec 02 17:40:57 2017 +0000
@@ -45,7 +45,7 @@
  */
 public class MockServer extends Thread implements Closeable {
 
-    ServerSocket ss;
+    final ServerSocket ss;
     private final List<Connection> sockets;
     private final List<Connection> removals;
     private final List<Connection> additions;
@@ -296,20 +296,32 @@
 
     @Override
     public void run() {
-        while (!closed) {
-            try {
-                System.out.println("Server waiting for connection");
-                Socket s = ss.accept();
-                Connection c = new Connection(s);
-                c.start();
-                System.out.println("Server got new connection: " + c);
-                synchronized (additions) {
-                    additions.add(c);
+        try {
+            while (!closed) {
+                try {
+                    System.out.println("Server waiting for connection");
+                    Socket s = ss.accept();
+                    Connection c = new Connection(s);
+                    c.start();
+                    System.out.println("Server got new connection: " + c);
+                    synchronized (additions) {
+                        additions.add(c);
+                    }
+                } catch (IOException e) {
+                    if (closed)
+                        return;
+                    e.printStackTrace(System.out);
                 }
-            } catch (IOException e) {
-                if (closed)
-                    return;
-                e.printStackTrace();
+            }
+        } catch (Throwable t) {
+            System.out.println("Unexpected exception in accept loop: " + t);
+            t.printStackTrace(System.out);
+        } finally {
+            if (closed) {
+                System.out.println("Server closed: exiting accept loop");
+            } else {
+                System.out.println("Server not closed: exiting accept loop and closing");
+                close();
             }
         }
     }
--- a/test/jdk/java/net/httpclient/NoBodyPartOne.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/test/jdk/java/net/httpclient/NoBodyPartOne.java	Sat Dec 02 17:40:57 2017 +0000
@@ -55,6 +55,7 @@
 
     @Test(dataProvider = "variants")
     public void testAsString(String uri, boolean sameClient) throws Exception {
+        printStamp(START, "testAsString(\"%s\", %s)", uri, sameClient);
         HttpClient client = null;
         for (int i=0; i< ITERATION_COUNT; i++) {
             if (!sameClient || client == null)
@@ -68,10 +69,13 @@
             String body = response.body();
             assertEquals(body, "");
         }
+        // We have created many clients here. Try to speed up their release.
+        if (!sameClient) System.gc();
     }
 
     @Test(dataProvider = "variants")
     public void testAsFile(String uri, boolean sameClient) throws Exception {
+        printStamp(START, "testAsFile(\"%s\", %s)", uri, sameClient);
         HttpClient client = null;
         for (int i=0; i< ITERATION_COUNT; i++) {
             if (!sameClient || client == null)
@@ -86,10 +90,13 @@
             assertTrue(Files.exists(bodyPath));
             assertEquals(Files.size(bodyPath), 0);
         }
+        // We have created many clients here. Try to speed up their release.
+        if (!sameClient) System.gc();
     }
 
     @Test(dataProvider = "variants")
     public void testAsByteArray(String uri, boolean sameClient) throws Exception {
+        printStamp(START, "testAsByteArray(\"%s\", %s)", uri, sameClient);
         HttpClient client = null;
         for (int i=0; i< ITERATION_COUNT; i++) {
             if (!sameClient || client == null)
@@ -102,5 +109,7 @@
             byte[] body = response.body();
             assertEquals(body.length, 0);
         }
+        // We have created many clients here. Try to speed up their release.
+        if (!sameClient) System.gc();
     }
 }
--- a/test/jdk/java/net/httpclient/NoBodyPartTwo.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/test/jdk/java/net/httpclient/NoBodyPartTwo.java	Sat Dec 02 17:40:57 2017 +0000
@@ -57,6 +57,7 @@
     volatile boolean consumerHasBeenCalled;
     @Test(dataProvider = "variants")
     public void testAsByteArrayConsumer(String uri, boolean sameClient) throws Exception {
+        printStamp(START, "testAsByteArrayConsumer(\"%s\", %s)", uri, sameClient);
         HttpClient client = null;
         for (int i=0; i< ITERATION_COUNT; i++) {
             if (!sameClient || client == null)
@@ -73,10 +74,13 @@
             client.send(req, asByteArrayConsumer(consumer));
             assertTrue(consumerHasBeenCalled);
         }
+        // We have created many clients here. Try to speed up their release.
+        if (!sameClient) System.gc();
     }
 
     @Test(dataProvider = "variants")
     public void testAsInputStream(String uri, boolean sameClient) throws Exception {
+        printStamp(START, "testAsInputStream(\"%s\", %s)", uri, sameClient);
         HttpClient client = null;
         for (int i=0; i< ITERATION_COUNT; i++) {
             if (!sameClient || client == null)
@@ -89,10 +93,13 @@
             byte[] body = response.body().readAllBytes();
             assertEquals(body.length, 0);
         }
+        // We have created many clients here. Try to speed up their release.
+        if (!sameClient) System.gc();
     }
 
     @Test(dataProvider = "variants")
     public void testBuffering(String uri, boolean sameClient) throws Exception {
+        printStamp(START, "testBuffering(\"%s\", %s)", uri, sameClient);
         HttpClient client = null;
         for (int i=0; i< ITERATION_COUNT; i++) {
             if (!sameClient || client == null)
@@ -105,10 +112,13 @@
             byte[] body = response.body();
             assertEquals(body.length, 0);
         }
+        // We have created many clients here. Try to speed up their release.
+        if (!sameClient) System.gc();
     }
 
     @Test(dataProvider = "variants")
     public void testDiscard(String uri, boolean sameClient) throws Exception {
+        printStamp(START, "testDiscard(\"%s\", %s)", uri, sameClient);
         HttpClient client = null;
         for (int i=0; i< ITERATION_COUNT; i++) {
             if (!sameClient || client == null)
@@ -121,5 +131,7 @@
             HttpResponse<Object> response = client.send(req, discard(obj));
             assertEquals(response.body(), obj);
         }
+        // We have created many clients here. Try to speed up their release.
+        if (!sameClient) System.gc();
     }
 }
--- a/test/jdk/java/net/httpclient/http2/server/Queue.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Queue.java	Sat Dec 02 17:40:57 2017 +0000
@@ -25,6 +25,7 @@
 
 import java.io.IOException;
 import java.util.LinkedList;
+import java.util.Objects;
 import java.util.stream.Stream;
 
 // Each stream has one of these for input. Each Http2Connection has one
@@ -40,7 +41,7 @@
     private final T closeSentinel;
 
     Queue(T closeSentinel) {
-        this.closeSentinel = closeSentinel;
+        this.closeSentinel = Objects.requireNonNull(closeSentinel);
     }
 
     public synchronized int size() {
@@ -48,6 +49,7 @@
     }
 
     public synchronized void put(T obj) throws IOException {
+        Objects.requireNonNull(obj);
         if (closed || closing) {
             throw new IOException("stream closed");
         }
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java	Sat Dec 02 17:40:57 2017 +0000
@@ -55,6 +55,7 @@
 import java.security.cert.CertificateException;
 import java.util.List;
 import java.util.Queue;
+import java.util.Random;
 import java.util.StringTokenizer;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -69,6 +70,8 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 @Test
 public class SSLTubeTest {
@@ -76,6 +79,17 @@
     private static final long COUNTER = 600;
     private static final int LONGS_PER_BUF = 800;
     private static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF;
+    public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0);
+
+    static final Random rand = new Random();
+
+    static int randomRange(int lower, int upper) {
+        if (lower > upper)
+            throw new IllegalArgumentException("lower > upper");
+        int diff = upper - lower;
+        int r = lower + rand.nextInt(diff);
+        return r - (r % 8); // round down to multiple of 8 (align for longs)
+    }
 
     private static ByteBuffer getBuffer(long startingAt) {
         ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8);
@@ -86,18 +100,31 @@
         return buf;
     }
 
-    @Test(timeOut = 30000)
-    public void run() throws IOException {
-        /* Start of wiring */
+    @Test
+    public void runWithSSLLoopackServer() throws IOException {
         ExecutorService sslExecutor = Executors.newCachedThreadPool();
+
+        /* Start of wiring */
         /* Emulates an echo server */
-//        FlowTube server = new SSLTube(createSSLEngine(false),
-//                                      sslExecutor,
-//                                      new EchoTube(16));
         SSLLoopbackSubscriber server =
                 new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor);
         server.start();
 
+        run(server, sslExecutor);
+    }
+
+    @Test
+    public void runWithEchoServer() throws IOException {
+        ExecutorService sslExecutor = Executors.newCachedThreadPool();
+
+        /* Start of wiring */
+        /* Emulates an echo server */
+        FlowTube server = crossOverEchoServer(sslExecutor);
+
+        run(server, sslExecutor);
+    }
+
+    private void run(FlowTube server, ExecutorService sslExecutor) throws IOException {
         FlowTube client = new SSLTube(createSSLEngine(true),
                                       sslExecutor,
                                       server);
@@ -128,6 +155,9 @@
         }
     }
 
+    /**
+     * This is a copy of the SSLLoopbackSubscriber used in FlowTest
+     */
     static class SSLLoopbackSubscriber implements FlowTube {
         private final BlockingQueue<ByteBuffer> buffer;
         private final Socket clientSock;
@@ -173,7 +203,7 @@
         private void clientReader() {
             try {
                 InputStream is = clientSock.getInputStream();
-                final int bufsize = FlowTest.randomRange(512, 16 * 1024);
+                final int bufsize = randomRange(512, 16 * 1024);
                 System.out.println("clientReader: bufsize = " + bufsize);
                 while (true) {
                     byte[] buf = new byte[bufsize];
@@ -206,7 +236,7 @@
 
                 while (true) {
                     ByteBuffer buf = buffer.take();
-                    if (buf == FlowTest.SENTINEL) {
+                    if (buf == SENTINEL) {
                         // finished
                         //Utils.sleep(2000);
                         System.out.println("clientWriter close: " + nbytes + " written");
@@ -249,7 +279,7 @@
             try {
                 InputStream is = serverSock.getInputStream();
                 OutputStream os = serverSock.getOutputStream();
-                final int bufsize = FlowTest.randomRange(512, 16 * 1024);
+                final int bufsize = randomRange(512, 16 * 1024);
                 System.out.println("serverLoopback: bufsize = " + bufsize);
                 byte[] bb = new byte[bufsize];
                 while (true) {
@@ -305,7 +335,7 @@
         @Override
         public void onComplete() {
             try {
-                buffer.put(FlowTest.SENTINEL);
+                buffer.put(SENTINEL);
             } catch (InterruptedException e) {
                 e.printStackTrace();
                 Utils.close(clientSock);
@@ -330,150 +360,366 @@
 
         }
     }
-//    private static final class EchoTube implements FlowTube {
-//
-//        private final static Object EOF = new Object();
-//        private final Executor executor = Executors.newSingleThreadExecutor();
-//
-//        private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
-//        private final int maxQueueSize;
-//        private final SequentialScheduler processingScheduler =
-//                new SequentialScheduler(createProcessingTask());
-//
-//        /* Writing into this tube */
-//        private long unfulfilled;
-//        private Flow.Subscription subscription;
-//
-//        /* Reading from this tube */
-//        private final Demand demand = new Demand();
-//        private final AtomicBoolean cancelled = new AtomicBoolean();
-//        private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
-//
-//        private EchoTube(int maxBufferSize) {
-//            if (maxBufferSize < 1)
-//                throw new IllegalArgumentException();
-//            this.maxQueueSize = maxBufferSize;
-//        }
-//
-//        @Override
-//        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
-//            this.subscriber = subscriber;
-//            System.out.println("EchoTube got subscriber: " + subscriber);
-//            this.subscriber.onSubscribe(new InternalSubscription());
-//        }
-//
-//        @Override
-//        public void onSubscribe(Flow.Subscription subscription) {
-//            unfulfilled = maxQueueSize;
-//            System.out.println("EchoTube request: " + maxQueueSize);
-//            (this.subscription = subscription).request(maxQueueSize);
-//        }
-//
-//        @Override
-//        public void onNext(List<ByteBuffer> item) {
-//            if (--unfulfilled == (maxQueueSize / 2)) {
-//                long req = maxQueueSize - unfulfilled;
-//                subscription.request(req);
-//                System.out.println("EchoTube request: " + req);
-//                unfulfilled = maxQueueSize;
-//            }
-//            System.out.println("EchoTube add " + Utils.remaining(item));
-//            queue.add(item);
-//            processingScheduler.deferOrSchedule(executor);
-//        }
-//
-//        @Override
-//        public void onError(Throwable throwable) {
-//            System.out.println("EchoTube add " + throwable);
-//            queue.add(throwable);
-//            processingScheduler.deferOrSchedule(executor);
-//        }
-//
-//        @Override
-//        public void onComplete() {
-//            System.out.println("EchoTube add EOF");
-//            queue.add(EOF);
-//            processingScheduler.deferOrSchedule(executor);
-//        }
-//
-//        @Override
-//        public boolean isFinished() {
-//            return false;
-//        }
-//
-//        private class InternalSubscription implements Flow.Subscription {
-//
-//            @Override
-//            public void request(long n) {
-//                System.out.println("EchoTube got request: " + n);
-//                if (n <= 0) {
-//                    throw new InternalError();
-//                }
-//                demand.increase(n);
-//                processingScheduler.runOrSchedule();
-//            }
-//
-//            @Override
-//            public void cancel() {
-//                cancelled.set(true);
-//            }
-//        }
-//
-//        @Override
-//        public String toString() {
-//            return "EchoTube";
-//        }
-//
-//        private SequentialScheduler.RestartableTask createProcessingTask() {
-//            return new SequentialScheduler.CompleteRestartableTask() {
-//
-//                @Override
-//                protected void run() {
-//                    try {
-//                        while (!cancelled.get()) {
-//                            Object item = queue.peek();
-//                            if (item == null)
-//                                return;
-//                            try {
-//                                System.out.println("EchoTube processing item");
-//                                if (item instanceof List) {
-//                                    if (!demand.tryDecrement()) {
-//                                        System.out.println("EchoTube no demand");
-//                                        return;
-//                                    }
-//                                    @SuppressWarnings("unchecked")
-//                                    List<ByteBuffer> bytes = (List<ByteBuffer>) item;
-//                                    Object removed = queue.remove();
-//                                    assert removed == item;
-//                                    System.out.println("EchoTube processing "
-//                                            + Utils.remaining(bytes));
-//                                    subscriber.onNext(bytes);
-//                                } else if (item instanceof Throwable) {
-//                                    cancelled.set(true);
-//                                    Object removed = queue.remove();
-//                                    assert removed == item;
-//                                    System.out.println("EchoTube processing " + item);
-//                                    subscriber.onError((Throwable) item);
-//                                } else if (item == EOF) {
-//                                    cancelled.set(true);
-//                                    Object removed = queue.remove();
-//                                    assert removed == item;
-//                                    System.out.println("EchoTube processing EOF");
-//                                    subscriber.onComplete();
-//                                } else {
-//                                    throw new InternalError(String.valueOf(item));
-//                                }
-//                            } finally {
-//                            }
-//                        }
-//                    } catch(Throwable t) {
-//                        t.printStackTrace();
-//                        throw t;
-//                    }
-//                }
-//            };
-//        }
-//    }
+
+    /**
+     * Creates a cross-over FlowTube than can be plugged into a client-side
+     * SSLTube (in place of the SSLLoopbackSubscriber).
+     * Note that the only method that can be called on the return tube
+     * is connectFlows(). Calling any other method will trigger an
+     * InternalError.
+     * @param sslExecutor an executor
+     * @return a cross-over FlowTube connected to an EchoTube.
+     * @throws IOException
+     */
+    FlowTube crossOverEchoServer(Executor sslExecutor) throws IOException {
+        LateBindingTube crossOver = new LateBindingTube();
+        FlowTube server = new SSLTube(createSSLEngine(false),
+                                      sslExecutor,
+                                      crossOver);
+        EchoTube echo = new EchoTube(6);
+        server.connectFlows(FlowTube.asTubePublisher(echo), FlowTube.asTubeSubscriber(echo));
+
+        return new CrossOverTube(crossOver);
+    }
+
+    /**
+     * A cross-over FlowTube that makes it possible to reverse the direction
+     * of flows. The typical usage is to connect an two opposite SSLTube,
+     * one encrypting, one decrypting, to e.g. an EchoTube, with the help
+     * of a LateBindingTube:
+     * {@code
+     * client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube
+     * }
+     * <p>
+     * Note that the only method that can be called on the CrossOverTube is
+     * connectFlows(). Calling any other method will cause an InternalError to
+     * be thrown.
+     * Also connectFlows() can be called only once.
+     */
+    private static final class CrossOverTube implements FlowTube {
+        final LateBindingTube tube;
+        CrossOverTube(LateBindingTube tube) {
+            this.tube = tube;
+        }
+
+        @Override
+        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+            throw newInternalError();
+        }
+
+        @Override
+        public void connectFlows(TubePublisher writePublisher, TubeSubscriber readSubscriber) {
+            tube.start(writePublisher, readSubscriber);
+        }
+
+        @Override
+        public boolean isFinished() {
+            return tube.isFinished();
+        }
+
+        Error newInternalError() {
+            InternalError error = new InternalError();
+            error.printStackTrace(System.out);
+            return error;
+        }
+
+        @Override
+        public void onSubscribe(Flow.Subscription subscription) {
+            throw newInternalError();
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            throw newInternalError();
+        }
+
+        @Override
+        public void onComplete() {
+            throw newInternalError();
+        }
+
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+            throw newInternalError();
+        }
+    }
+
+    /**
+     * A late binding tube that makes it possible to create an
+     * SSLTube before the right-hand-side tube has been created.
+     * The typical usage is to make it possible to connect two
+     * opposite SSLTube (one encrypting, one decrypting) through a
+     * CrossOverTube:
+     * {@code
+     * client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube
+     * }
+     * <p>
+     * Note that this class only supports a single call to start(): it cannot be
+     * subscribed more than once from its left-hand-side (the cross over tube side).
+     */
+    private static class LateBindingTube implements FlowTube {
+
+        final CompletableFuture<Flow.Publisher<List<ByteBuffer>>> futurePublisher
+                = new CompletableFuture<>();
+        final ConcurrentLinkedQueue<Consumer<Flow.Subscriber<? super List<ByteBuffer>>>> queue
+                = new ConcurrentLinkedQueue<>();
+        AtomicReference<Flow.Subscriber<? super List<ByteBuffer>>> subscriberRef = new AtomicReference<>();
+        SequentialScheduler scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
+        AtomicReference<Throwable> errorRef = new AtomicReference<>();
+        private volatile boolean finished;
+        private volatile boolean completed;
+
+
+        public void start(Flow.Publisher<List<ByteBuffer>> publisher,
+                          Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+            subscriberRef.set(subscriber);
+            futurePublisher.complete(publisher);
+            scheduler.runOrSchedule();
+        }
+
+        @Override
+        public boolean isFinished() {
+            return finished;
+        }
+
+        @Override
+        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+            futurePublisher.thenAccept((p) -> p.subscribe(subscriber));
+            scheduler.runOrSchedule();
+        }
+
+        @Override
+        public void onSubscribe(Flow.Subscription subscription) {
+            queue.add((s) -> s.onSubscribe(subscription));
+            scheduler.runOrSchedule();
+        }
+
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+            queue.add((s) -> s.onNext(item));
+            scheduler.runOrSchedule();
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            System.out.println("LateBindingTube onError");
+            throwable.printStackTrace(System.out);
+            queue.add((s) -> {
+                errorRef.compareAndSet(null, throwable);
+                try {
+                    System.out.println("LateBindingTube subscriber onError: " + throwable);
+                    s.onError(errorRef.get());
+                } finally {
+                    finished = true;
+                    System.out.println("LateBindingTube finished");
+                }
+            });
+            scheduler.runOrSchedule();
+        }
+
+        @Override
+        public void onComplete() {
+            System.out.println("LateBindingTube completing");
+            queue.add((s) -> {
+                completed = true;
+                try {
+                    System.out.println("LateBindingTube complete subscriber");
+                    s.onComplete();
+                } finally {
+                    finished = true;
+                    System.out.println("LateBindingTube finished");
+                }
+            });
+            scheduler.runOrSchedule();
+        }
+
+        private void loop() {
+            if (finished) {
+                scheduler.stop();
+                return;
+            }
+            Flow.Subscriber<? super List<ByteBuffer>> subscriber = subscriberRef.get();
+            if (subscriber == null) return;
+            try {
+                Consumer<Flow.Subscriber<? super List<ByteBuffer>>> s;
+                while ((s = queue.poll()) != null) {
+                    s.accept(subscriber);
+                }
+            } catch (Throwable t) {
+                if (errorRef.compareAndSet(null, t)) {
+                    onError(t);
+                }
+            }
+        }
+    }
+
+    /**
+     * An echo tube that just echoes back whatever bytes it receives.
+     * This cannot be plugged to the right-hand-side of an SSLTube
+     * since handshake data cannot be simply echoed back, and
+     * application data most likely also need to be decrypted and
+     * re-encrypted.
+     */
+    private static final class EchoTube implements FlowTube {
+
+        private final static Object EOF = new Object();
+        private final Executor executor = Executors.newSingleThreadExecutor();
+
+        private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
+        private final int maxQueueSize;
+        private final SequentialScheduler processingScheduler =
+                new SequentialScheduler(createProcessingTask());
+
+        /* Writing into this tube */
+        private volatile long requested;
+        private Flow.Subscription subscription;
+
+        /* Reading from this tube */
+        private final Demand demand = new Demand();
+        private final AtomicBoolean cancelled = new AtomicBoolean();
+        private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
+
+        private EchoTube(int maxBufferSize) {
+            if (maxBufferSize < 1)
+                throw new IllegalArgumentException();
+            this.maxQueueSize = maxBufferSize;
+        }
+
+        @Override
+        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+            this.subscriber = subscriber;
+            System.out.println("EchoTube got subscriber: " + subscriber);
+            this.subscriber.onSubscribe(new InternalSubscription());
+        }
+
+        @Override
+        public void onSubscribe(Flow.Subscription subscription) {
+            System.out.println("EchoTube request: " + maxQueueSize);
+            (this.subscription = subscription).request(requested = maxQueueSize);
+        }
+
+        private void requestMore() {
+            Flow.Subscription s = subscription;
+            if (s == null || cancelled.get()) return;
+            long unfulfilled = queue.size() + --requested;
+            if (unfulfilled <= maxQueueSize/2) {
+                long req = maxQueueSize - unfulfilled;
+                requested += req;
+                s.request(req);
+                System.out.printf("EchoTube request: %s [requested:%s, queue:%s, unfulfilled:%s]%n",
+                        req, requested-req, queue.size(), unfulfilled );
+            }
+        }
+
+        @Override
+        public void onNext(List<ByteBuffer> item) {
+            System.out.printf("EchoTube add %s [requested:%s, queue:%s]%n",
+                    Utils.remaining(item), requested, queue.size());
+            queue.add(item);
+            processingScheduler.deferOrSchedule(executor);
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            System.out.println("EchoTube add " + throwable);
+            queue.add(throwable);
+            processingScheduler.deferOrSchedule(executor);
+        }
+
+        @Override
+        public void onComplete() {
+            System.out.println("EchoTube add EOF");
+            queue.add(EOF);
+            processingScheduler.deferOrSchedule(executor);
+        }
+
+        @Override
+        public boolean isFinished() {
+            return cancelled.get();
+        }
+
+        private class InternalSubscription implements Flow.Subscription {
+
+            @Override
+            public void request(long n) {
+                System.out.println("EchoTube got request: " + n);
+                if (n <= 0) {
+                    throw new InternalError();
+                }
+                if (demand.increase(n)) {
+                    processingScheduler.deferOrSchedule(executor);
+                }
+            }
+
+            @Override
+            public void cancel() {
+                cancelled.set(true);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "EchoTube";
+        }
+
+        int transmitted = 0;
+        private SequentialScheduler.RestartableTask createProcessingTask() {
+            return new SequentialScheduler.CompleteRestartableTask() {
+
+                @Override
+                protected void run() {
+                    try {
+                        while (!cancelled.get()) {
+                            Object item = queue.peek();
+                            if (item == null) {
+                                System.out.printf("EchoTube: queue empty, requested=%s, demand=%s, transmitted=%s%n",
+                                        requested, demand.get(), transmitted);
+                                requestMore();
+                                return;
+                            }
+                            try {
+                                System.out.printf("EchoTube processing item, requested=%s, demand=%s, transmitted=%s%n",
+                                        requested, demand.get(), transmitted);
+                                if (item instanceof List) {
+                                    if (!demand.tryDecrement()) {
+                                        System.out.println("EchoTube no demand");
+                                        return;
+                                    }
+                                    @SuppressWarnings("unchecked")
+                                    List<ByteBuffer> bytes = (List<ByteBuffer>) item;
+                                    Object removed = queue.remove();
+                                    assert removed == item;
+                                    System.out.println("EchoTube processing "
+                                            + Utils.remaining(bytes));
+                                    transmitted++;
+                                    subscriber.onNext(bytes);
+                                    requestMore();
+                                } else if (item instanceof Throwable) {
+                                    cancelled.set(true);
+                                    Object removed = queue.remove();
+                                    assert removed == item;
+                                    System.out.println("EchoTube processing " + item);
+                                    subscriber.onError((Throwable) item);
+                                } else if (item == EOF) {
+                                    cancelled.set(true);
+                                    Object removed = queue.remove();
+                                    assert removed == item;
+                                    System.out.println("EchoTube processing EOF");
+                                    subscriber.onComplete();
+                                } else {
+                                    throw new InternalError(String.valueOf(item));
+                                }
+                            } finally {
+                            }
+                        }
+                    } catch(Throwable t) {
+                        t.printStackTrace();
+                        throw t;
+                    }
+                }
+            };
+        }
+    }
 
     /**
      * The final subscriber which receives the decrypted looped-back data. Just
@@ -518,13 +764,13 @@
             if (--unfulfilled == (REQUEST_WINDOW / 2)) {
                 long req = REQUEST_WINDOW - unfulfilled;
                 System.out.println("EndSubscriber request " + req);
+                unfulfilled = REQUEST_WINDOW;
                 subscription.request(req);
-                unfulfilled = REQUEST_WINDOW;
             }
 
             long currval = counter.get();
             if (currval % 500 == 0) {
-                System.out.println("End: " + currval);
+                System.out.println("EndSubscriber: " + currval);
             }
             System.out.println("EndSubscriber onNext " + Utils.remaining(buffers));