src/java.net.http/share/classes/java/net/http/internal/common/SSLFlowDelegate.java
branchhttp-client-branch
changeset 56089 42208b2f224e
parent 56044 a8423a38386e
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/java.net.http/share/classes/java/net/http/internal/common/SSLFlowDelegate.java	Wed Feb 07 14:17:24 2018 +0000
@@ -0,0 +1,906 @@
+/*
+ * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package java.net.http.internal.common;
+
+import java.net.http.internal.common.SubscriberWrapper.SchedulingAction;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLException;
+import java.io.IOException;
+import java.lang.System.Logger.Level;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implements SSL using two SubscriberWrappers.
+ *
+ * <p> Constructor takes two Flow.Subscribers: one that receives the network
+ * data (after it has been encrypted by SSLFlowDelegate) data, and one that
+ * receives the application data (before it has been encrypted by SSLFlowDelegate).
+ *
+ * <p> Methods upstreamReader() and upstreamWriter() return the corresponding
+ * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data.
+ * See diagram below.
+ *
+ * <p> How Flow.Subscribers are used in this class, and where they come from:
+ * <pre>
+ * {@code
+ *
+ *
+ *
+ * --------->  data flow direction
+ *
+ *
+ *                         +------------------+
+ *        upstreamWriter   |                  | downWriter
+ *        ---------------> |                  | ------------>
+ *  obtained from this     |                  | supplied to constructor
+ *                         | SSLFlowDelegate  |
+ *        downReader       |                  | upstreamReader
+ *        <--------------- |                  | <--------------
+ * supplied to constructor |                  | obtained from this
+ *                         +------------------+
+ * }
+ * </pre>
+ */
+public class SSLFlowDelegate {
+
+    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+    final System.Logger debug =
+            Utils.getDebugLogger(this::dbgString, DEBUG);
+
+    final Executor exec;
+    final Reader reader;
+    final Writer writer;
+    final SSLEngine engine;
+    final String tubeName; // hack
+    private final CompletableFuture<Void> cf;
+    final CompletableFuture<String> alpnCF; // completes on initial handshake
+    final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
+    volatile boolean close_notify_received;
+
+    /**
+     * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
+     * Flow.Subscriber requires an associated {@link CompletableFuture}
+     * for errors that need to be signaled from downstream to upstream.
+     */
+    public SSLFlowDelegate(SSLEngine engine,
+                           Executor exec,
+                           Subscriber<? super List<ByteBuffer>> downReader,
+                           Subscriber<? super List<ByteBuffer>> downWriter)
+    {
+        this.tubeName = String.valueOf(downWriter);
+        this.reader = new Reader();
+        this.writer = new Writer();
+        this.engine = engine;
+        this.exec = exec;
+        this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
+        CompletableFuture<Void> cs = CompletableFuture.allOf(
+                reader.completion(), writer.completion()).thenRun(this::normalStop);
+        this.cf = MinimalFuture.of(cs);
+        this.alpnCF = new MinimalFuture<>();
+
+        // connect the Reader to the downReader and the
+        // Writer to the downWriter.
+        connect(downReader, downWriter);
+
+        //Monitor.add(this::monitor);
+    }
+
+    /**
+     * Returns true if the SSLFlowDelegate has detected a TLS
+     * close_notify from the server.
+     * @return true, if a close_notify was detected.
+     */
+    public boolean closeNotifyReceived() {
+        return close_notify_received;
+    }
+
+    /**
+     * Connects the read sink (downReader) to the SSLFlowDelegate Reader,
+     * and the write sink (downWriter) to the SSLFlowDelegate Writer.
+     * Called from within the constructor. Overwritten by SSLTube.
+     *
+     * @param downReader  The left hand side read sink (typically, the
+     *                    HttpConnection read subscriber).
+     * @param downWriter  The right hand side write sink (typically
+     *                    the SocketTube write subscriber).
+     */
+    void connect(Subscriber<? super List<ByteBuffer>> downReader,
+                 Subscriber<? super List<ByteBuffer>> downWriter) {
+        this.reader.subscribe(downReader);
+        this.writer.subscribe(downWriter);
+    }
+
+   /**
+    * Returns a CompletableFuture<String> which completes after
+    * the initial handshake completes, and which contains the negotiated
+    * alpn.
+    */
+    public CompletableFuture<String> alpn() {
+        return alpnCF;
+    }
+
+    private void setALPN() {
+        // Handshake is finished. So, can retrieve the ALPN now
+        if (alpnCF.isDone())
+            return;
+        String alpn = engine.getApplicationProtocol();
+        debug.log(Level.DEBUG, "setALPN = %s", alpn);
+        alpnCF.complete(alpn);
+    }
+
+    public String monitor() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("SSL: HS state: " + states(handshakeState));
+        sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
+        sb.append(" LL : ");
+        synchronized(stateList) {
+            for (String s: stateList) {
+                sb.append(s).append(" ");
+            }
+        }
+        sb.append("\r\n");
+        sb.append("Reader:: ").append(reader.toString());
+        sb.append("\r\n");
+        sb.append("Writer:: ").append(writer.toString());
+        sb.append("\r\n===================================");
+        return sb.toString();
+    }
+
+    protected SchedulingAction enterReadScheduling() {
+        return SchedulingAction.CONTINUE;
+    }
+
+
+    /**
+     * Processing function for incoming data. Pass it thru SSLEngine.unwrap().
+     * Any decrypted buffers returned to be passed downstream.
+     * Status codes:
+     *     NEED_UNWRAP: do nothing. Following incoming data will contain
+     *                  any required handshake data
+     *     NEED_WRAP: call writer.addData() with empty buffer
+     *     NEED_TASK: delegate task to executor
+     *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap
+     *     BUFFER_UNDERFLOW: keep buffer and wait for more data
+     *     OK: return generated buffers.
+     *
+     * Upstream subscription strategy is to try and keep no more than
+     * TARGET_BUFSIZE bytes in readBuf
+     */
+    class Reader extends SubscriberWrapper {
+        final SequentialScheduler scheduler;
+        static final int TARGET_BUFSIZE = 16 * 1024;
+        volatile ByteBuffer readBuf;
+        volatile boolean completing = false;
+        final Object readBufferLock = new Object();
+        final System.Logger debugr =
+            Utils.getDebugLogger(this::dbgString, DEBUG);
+
+        class ReaderDownstreamPusher implements Runnable {
+            @Override public void run() { processData(); }
+        }
+
+        Reader() {
+            super();
+            scheduler = SequentialScheduler.synchronizedScheduler(
+                                                new ReaderDownstreamPusher());
+            this.readBuf = ByteBuffer.allocate(1024);
+            readBuf.limit(0); // keep in read mode
+        }
+
+        protected SchedulingAction enterScheduling() {
+            return enterReadScheduling();
+        }
+
+        public final String dbgString() {
+            return "SSL Reader(" + tubeName + ")";
+        }
+
+        /**
+         * entry point for buffers delivered from upstream Subscriber
+         */
+        @Override
+        public void incoming(List<ByteBuffer> buffers, boolean complete) {
+            debugr.log(Level.DEBUG, () -> "Adding " + Utils.remaining(buffers)
+                        + " bytes to read buffer");
+            addToReadBuf(buffers, complete);
+            scheduler.runOrSchedule();
+        }
+
+        @Override
+        public String toString() {
+            return "READER: " + super.toString() + " readBuf: " + readBuf.toString()
+                    + " count: " + count.toString();
+        }
+
+        private void reallocReadBuf() {
+            int sz = readBuf.capacity();
+            ByteBuffer newb = ByteBuffer.allocate(sz*2);
+            readBuf.flip();
+            Utils.copy(readBuf, newb);
+            readBuf = newb;
+        }
+
+        @Override
+        protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
+            if (readBuf.remaining() > TARGET_BUFSIZE) {
+                return 0;
+            } else {
+                return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
+            }
+        }
+
+        // readBuf is kept ready for reading outside of this method
+        private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
+            synchronized (readBufferLock) {
+                for (ByteBuffer buf : buffers) {
+                    readBuf.compact();
+                    while (readBuf.remaining() < buf.remaining())
+                        reallocReadBuf();
+                    readBuf.put(buf);
+                    readBuf.flip();
+                }
+                if (complete) {
+                    this.completing = complete;
+                }
+            }
+        }
+
+        void schedule() {
+            scheduler.runOrSchedule();
+        }
+
+        void stop() {
+            debugr.log(Level.DEBUG, "stop");
+            scheduler.stop();
+        }
+
+        AtomicInteger count = new AtomicInteger(0);
+
+        // work function where it all happens
+        void processData() {
+            try {
+                debugr.log(Level.DEBUG, () -> "processData: " + readBuf.remaining()
+                           + " bytes to unwrap "
+                           + states(handshakeState)
+                           + ", " + engine.getHandshakeStatus());
+                int len;
+                boolean complete = false;
+                while ((len = readBuf.remaining()) > 0) {
+                    boolean handshaking = false;
+                    try {
+                        EngineResult result;
+                        synchronized (readBufferLock) {
+                            complete = this.completing;
+                            result = unwrapBuffer(readBuf);
+                            debugr.log(Level.DEBUG, "Unwrapped: %s", result.result);
+                        }
+                        if (result.bytesProduced() > 0) {
+                            debugr.log(Level.DEBUG, "sending %d", result.bytesProduced());
+                            count.addAndGet(result.bytesProduced());
+                            outgoing(result.destBuffer, false);
+                        }
+                        if (result.status() == Status.BUFFER_UNDERFLOW) {
+                            debugr.log(Level.DEBUG, "BUFFER_UNDERFLOW");
+                            // not enough data in the read buffer...
+                            requestMore();
+                            synchronized (readBufferLock) {
+                                // check if we have received some data
+                                if (readBuf.remaining() > len) continue;
+                                return;
+                            }
+                        }
+                        if (complete && result.status() == Status.CLOSED) {
+                            debugr.log(Level.DEBUG, "Closed: completing");
+                            outgoing(Utils.EMPTY_BB_LIST, true);
+                            return;
+                        }
+                        if (result.handshaking() && !complete) {
+                            debugr.log(Level.DEBUG, "handshaking");
+                            doHandshake(result, READER);
+                            resumeActivity();
+                            handshaking = true;
+                        } else {
+                            if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
+                                setALPN();
+                                handshaking = false;
+                                resumeActivity();
+                            }
+                        }
+                    } catch (IOException ex) {
+                        errorCommon(ex);
+                        handleError(ex);
+                    }
+                    if (handshaking && !complete)
+                        return;
+                }
+                if (!complete) {
+                    synchronized (readBufferLock) {
+                        complete = this.completing && !readBuf.hasRemaining();
+                    }
+                }
+                if (complete) {
+                    debugr.log(Level.DEBUG, "completing");
+                    // Complete the alpnCF, if not already complete, regardless of
+                    // whether or not the ALPN is available, there will be no more
+                    // activity.
+                    setALPN();
+                    outgoing(Utils.EMPTY_BB_LIST, true);
+                }
+            } catch (Throwable ex) {
+                errorCommon(ex);
+                handleError(ex);
+            }
+        }
+    }
+
+    /**
+     * Returns a CompletableFuture which completes after all activity
+     * in the delegate is terminated (whether normally or exceptionally).
+     *
+     * @return
+     */
+    public CompletableFuture<Void> completion() {
+        return cf;
+    }
+
+    public interface Monitorable {
+        public String getInfo();
+    }
+
+    public static class Monitor extends Thread {
+        final List<Monitorable> list;
+        static Monitor themon;
+
+        static {
+            themon = new Monitor();
+            themon.start(); // uncomment to enable Monitor
+        }
+
+        Monitor() {
+            super("Monitor");
+            setDaemon(true);
+            list = Collections.synchronizedList(new LinkedList<>());
+        }
+
+        void addTarget(Monitorable o) {
+            list.add(o);
+        }
+
+        public static void add(Monitorable o) {
+            themon.addTarget(o);
+        }
+
+        @Override
+        public void run() {
+            System.out.println("Monitor starting");
+            while (true) {
+                try {Thread.sleep(20*1000); } catch (Exception e) {}
+                synchronized (list) {
+                    for (Monitorable o : list) {
+                        System.out.println(o.getInfo());
+                        System.out.println("-------------------------");
+                    }
+                }
+                System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-");
+
+            }
+        }
+    }
+
+    /**
+     * Processing function for outgoing data. Pass it thru SSLEngine.wrap()
+     * Any encrypted buffers generated are passed downstream to be written.
+     * Status codes:
+     *     NEED_UNWRAP: call reader.addData() with empty buffer
+     *     NEED_WRAP: call addData() with empty buffer
+     *     NEED_TASK: delegate task to executor
+     *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap
+     *     BUFFER_UNDERFLOW: shouldn't happen on writing side
+     *     OK: return generated buffers
+     */
+    class Writer extends SubscriberWrapper {
+        final SequentialScheduler scheduler;
+        // queues of buffers received from upstream waiting
+        // to be processed by the SSLEngine
+        final List<ByteBuffer> writeList;
+        final System.Logger debugw =
+            Utils.getDebugLogger(this::dbgString, DEBUG);
+        volatile boolean completing;
+        boolean completed; // only accessed in processData
+
+        class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
+            @Override public void run() { processData(); }
+        }
+
+        Writer() {
+            super();
+            writeList = Collections.synchronizedList(new LinkedList<>());
+            scheduler = new SequentialScheduler(new WriterDownstreamPusher());
+        }
+
+        @Override
+        protected void incoming(List<ByteBuffer> buffers, boolean complete) {
+            assert complete ? buffers ==  Utils.EMPTY_BB_LIST : true;
+            assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
+            if (complete) {
+                debugw.log(Level.DEBUG, "adding SENTINEL");
+                completing = true;
+                writeList.add(SENTINEL);
+            } else {
+                writeList.addAll(buffers);
+            }
+            debugw.log(Level.DEBUG, () -> "added " + buffers.size()
+                        + " (" + Utils.remaining(buffers)
+                        + " bytes) to the writeList");
+            scheduler.runOrSchedule();
+        }
+
+        public final String dbgString() {
+            return "SSL Writer(" + tubeName + ")";
+        }
+
+        protected void onSubscribe() {
+            doHandshake(EngineResult.INIT, INIT);
+            resumeActivity();
+        }
+
+        void schedule() {
+            scheduler.runOrSchedule();
+        }
+
+        void stop() {
+            debugw.log(Level.DEBUG, "stop");
+            scheduler.stop();
+        }
+
+        @Override
+        public boolean closing() {
+            return closeNotifyReceived();
+        }
+
+        private boolean isCompleting() {
+            return completing;
+        }
+
+        @Override
+        protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
+            if (writeList.size() > 10)
+                return 0;
+            else
+                return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
+        }
+
+        private boolean hsTriggered() {
+            synchronized(writeList) {
+                for (ByteBuffer b : writeList)
+                    if (b == HS_TRIGGER)
+                        return true;
+                return false;
+            }
+        }
+
+        private void processData() {
+            boolean completing = isCompleting();
+
+            try {
+                debugw.log(Level.DEBUG, () -> "processData(" + Utils.remaining(writeList) + ")");
+                while (Utils.remaining(writeList) > 0 || hsTriggered()
+                        || needWrap()) {
+                    ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
+                    EngineResult result = wrapBuffers(outbufs);
+                    debugw.log(Level.DEBUG, "wrapBuffer returned %s", result.result);
+
+                    if (result.status() == Status.CLOSED) {
+                        if (result.bytesProduced() <= 0)
+                            return;
+
+                        if (!completing && !completed) {
+                            completing = this.completing = true;
+                            // There could still be some outgoing data in outbufs.
+                            writeList.add(SENTINEL);
+                        }
+                    }
+
+                    boolean handshaking = false;
+                    if (result.handshaking()) {
+                        debugw.log(Level.DEBUG, "handshaking");
+                        doHandshake(result, WRITER);
+                        handshaking = true;
+                    } else {
+                        if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
+                            setALPN();
+                            resumeActivity();
+                        }
+                    }
+                    cleanList(writeList); // tidy up the source list
+                    sendResultBytes(result);
+                    if (handshaking && !completing) {
+                        if (writeList.isEmpty() && !result.needUnwrap()) {
+                            writer.addData(HS_TRIGGER);
+                        }
+                        if (needWrap()) continue;
+                        return;
+                    }
+                }
+                if (completing && Utils.remaining(writeList) == 0) {
+                    /*
+                    System.out.println("WRITER DOO 3");
+                    engine.closeOutbound();
+                    EngineResult result = wrapBuffers(Utils.EMPTY_BB_ARRAY);
+                    sendResultBytes(result);
+                    */
+                    if (!completed) {
+                        completed = true;
+                        writeList.clear();
+                        outgoing(Utils.EMPTY_BB_LIST, true);
+                    }
+                    return;
+                }
+                if (writeList.isEmpty() && needWrap()) {
+                    writer.addData(HS_TRIGGER);
+                }
+            } catch (Throwable ex) {
+                errorCommon(ex);
+                handleError(ex);
+            }
+        }
+
+        private boolean needWrap() {
+            return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP;
+        }
+
+        private void sendResultBytes(EngineResult result) {
+            if (result.bytesProduced() > 0) {
+                debugw.log(Level.DEBUG, "Sending %d bytes downstream",
+                           result.bytesProduced());
+                outgoing(result.destBuffer, false);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "WRITER: " + super.toString() +
+                    " writeList size " + Integer.toString(writeList.size());
+                    //" writeList: " + writeList.toString();
+        }
+    }
+
+    private void handleError(Throwable t) {
+        debug.log(Level.DEBUG, "handleError", t);
+        cf.completeExceptionally(t);
+        // no-op if already completed
+        alpnCF.completeExceptionally(t);
+        reader.stop();
+        writer.stop();
+    }
+
+    private void normalStop() {
+        reader.stop();
+        writer.stop();
+    }
+
+    private void cleanList(List<ByteBuffer> l) {
+        synchronized (l) {
+            Iterator<ByteBuffer> iter = l.iterator();
+            while (iter.hasNext()) {
+                ByteBuffer b = iter.next();
+                if (!b.hasRemaining() && b != SENTINEL) {
+                    iter.remove();
+                }
+            }
+        }
+    }
+
+    /**
+     * States for handshake. We avoid races when accessing/updating the AtomicInt
+     * because updates always schedule an additional call to both the read()
+     * and write() functions.
+     */
+    private static final int NOT_HANDSHAKING = 0;
+    private static final int HANDSHAKING = 1;
+    private static final int INIT = 2;
+    private static final int DOING_TASKS = 4; // bit added to above state
+    private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
+
+    private static final int READER = 1;
+    private static final int WRITER = 2;
+
+    private static String states(AtomicInteger state) {
+        int s = state.get();
+        StringBuilder sb = new StringBuilder();
+        int x = s & ~DOING_TASKS;
+        switch (x) {
+            case NOT_HANDSHAKING:
+                sb.append(" NOT_HANDSHAKING ");
+                break;
+            case HANDSHAKING:
+                sb.append(" HANDSHAKING ");
+                break;
+            case INIT:
+                sb.append(" INIT ");
+                break;
+            default:
+                throw new InternalError();
+        }
+        if ((s & DOING_TASKS) > 0)
+            sb.append("|DOING_TASKS");
+        return sb.toString();
+    }
+
+    private void resumeActivity() {
+        reader.schedule();
+        writer.schedule();
+    }
+
+    final AtomicInteger handshakeState;
+    final ConcurrentLinkedQueue<String> stateList = new ConcurrentLinkedQueue<>();
+
+    private void doHandshake(EngineResult r, int caller) {
+        int s = handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
+        stateList.add(r.handshakeStatus().toString());
+        stateList.add(Integer.toString(caller));
+        switch (r.handshakeStatus()) {
+            case NEED_TASK:
+                if ((s & DOING_TASKS) > 0) // someone else was doing tasks
+                    return;
+                List<Runnable> tasks = obtainTasks();
+                executeTasks(tasks);
+                break;
+            case NEED_WRAP:
+                writer.addData(HS_TRIGGER);
+                break;
+            case NEED_UNWRAP:
+            case NEED_UNWRAP_AGAIN:
+                // do nothing else
+                break;
+            default:
+                throw new InternalError("Unexpected handshake status:"
+                                        + r.handshakeStatus());
+        }
+    }
+
+    private List<Runnable> obtainTasks() {
+        List<Runnable> l = new ArrayList<>();
+        Runnable r;
+        while ((r = engine.getDelegatedTask()) != null) {
+            l.add(r);
+        }
+        return l;
+    }
+
+    private void executeTasks(List<Runnable> tasks) {
+        exec.execute(() -> {
+            try {
+                handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
+                List<Runnable> nextTasks = tasks;
+                do {
+                    nextTasks.forEach(Runnable::run);
+                    if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+                        nextTasks = obtainTasks();
+                    } else {
+                        break;
+                    }
+                } while (true);
+                handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
+                writer.addData(HS_TRIGGER);
+                resumeActivity();
+            } catch (Throwable t) {
+                handleError(t);
+            }
+        });
+    }
+
+
+    EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
+        ByteBuffer dst = getAppBuffer();
+        while (true) {
+            SSLEngineResult sslResult = engine.unwrap(src, dst);
+            switch (sslResult.getStatus()) {
+                case BUFFER_OVERFLOW:
+                    // may happen only if app size buffer was changed.
+                    // get it again if app buffer size changed
+                    int appSize = engine.getSession().getApplicationBufferSize();
+                    ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
+                    dst.flip();
+                    b.put(dst);
+                    dst = b;
+                    break;
+                case CLOSED:
+                    return doClosure(new EngineResult(sslResult));
+                case BUFFER_UNDERFLOW:
+                    // handled implicitly by compaction/reallocation of readBuf
+                    return new EngineResult(sslResult);
+                case OK:
+                     dst.flip();
+                     return new EngineResult(sslResult, dst);
+            }
+        }
+    }
+
+    // FIXME: acknowledge a received CLOSE request from peer
+    EngineResult doClosure(EngineResult r) throws IOException {
+        debug.log(Level.DEBUG,
+                "doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]",
+                r.result, engine.getHandshakeStatus(),
+                engine.isOutboundDone(), engine.isInboundDone());
+        if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
+            // we have received TLS close_notify and need to send
+            // an acknowledgement back. We're calling doHandshake
+            // to finish the close handshake.
+            if (engine.isInboundDone() && !engine.isOutboundDone()) {
+                debug.log(Level.DEBUG, "doClosure: close_notify received");
+                close_notify_received = true;
+                doHandshake(r, READER);
+            }
+        }
+        return r;
+    }
+
+    /**
+     * Returns the upstream Flow.Subscriber of the reading (incoming) side.
+     * This flow must be given the encrypted data read from upstream (eg socket)
+     * before it is decrypted.
+     */
+    public Flow.Subscriber<List<ByteBuffer>> upstreamReader() {
+        return reader;
+    }
+
+    /**
+     * Returns the upstream Flow.Subscriber of the writing (outgoing) side.
+     * This flow contains the plaintext data before it is encrypted.
+     */
+    public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() {
+        return writer;
+    }
+
+    public boolean resumeReader() {
+        return reader.signalScheduling();
+    }
+
+    public void resetReaderDemand() {
+        reader.resetDownstreamDemand();
+    }
+
+    static class EngineResult {
+        final SSLEngineResult result;
+        final ByteBuffer destBuffer;
+
+        // normal result
+        EngineResult(SSLEngineResult result) {
+            this(result, null);
+        }
+
+        EngineResult(SSLEngineResult result, ByteBuffer destBuffer) {
+            this.result = result;
+            this.destBuffer = destBuffer;
+        }
+
+        // Special result used to trigger handshaking in constructor
+        static EngineResult INIT =
+            new EngineResult(
+                new SSLEngineResult(SSLEngineResult.Status.OK, HandshakeStatus.NEED_WRAP, 0, 0));
+
+        boolean handshaking() {
+            HandshakeStatus s = result.getHandshakeStatus();
+            return s != HandshakeStatus.FINISHED
+                   && s != HandshakeStatus.NOT_HANDSHAKING
+                   && result.getStatus() != Status.CLOSED;
+        }
+
+        boolean needUnwrap() {
+            HandshakeStatus s = result.getHandshakeStatus();
+            return s == HandshakeStatus.NEED_UNWRAP;
+        }
+
+
+        int bytesConsumed() {
+            return result.bytesConsumed();
+        }
+
+        int bytesProduced() {
+            return result.bytesProduced();
+        }
+
+        SSLEngineResult.HandshakeStatus handshakeStatus() {
+            return result.getHandshakeStatus();
+        }
+
+        SSLEngineResult.Status status() {
+            return result.getStatus();
+        }
+    }
+
+    public ByteBuffer getNetBuffer() {
+        return ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
+    }
+
+    private ByteBuffer getAppBuffer() {
+        return ByteBuffer.allocate(engine.getSession().getApplicationBufferSize());
+    }
+
+    final String dbgString() {
+        return "SSLFlowDelegate(" + tubeName + ")";
+    }
+
+    @SuppressWarnings("fallthrough")
+    EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
+        debug.log(Level.DEBUG, () -> "wrapping "
+                    + Utils.remaining(src) + " bytes");
+        ByteBuffer dst = getNetBuffer();
+        while (true) {
+            SSLEngineResult sslResult = engine.wrap(src, dst);
+            debug.log(Level.DEBUG, () -> "SSLResult: " + sslResult);
+            switch (sslResult.getStatus()) {
+                case BUFFER_OVERFLOW:
+                    // Shouldn't happen. We allocated buffer with packet size
+                    // get it again if net buffer size was changed
+                    debug.log(Level.DEBUG, "BUFFER_OVERFLOW");
+                    int appSize = engine.getSession().getApplicationBufferSize();
+                    ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
+                    dst.flip();
+                    b.put(dst);
+                    dst = b;
+                    break; // try again
+                case CLOSED:
+                    debug.log(Level.DEBUG, "CLOSED");
+                    // fallthrough. There could be some remaining data in dst.
+                    // CLOSED will be handled by the caller.
+                case OK:
+                    dst.flip();
+                    final ByteBuffer dest = dst;
+                    debug.log(Level.DEBUG, () -> "OK => produced: "
+                                           + dest.remaining()
+                                           + " not wrapped: "
+                                           + Utils.remaining(src));
+                    return new EngineResult(sslResult, dest);
+                case BUFFER_UNDERFLOW:
+                    // Shouldn't happen.  Doesn't returns when wrap()
+                    // underflow handled externally
+                    // assert false : "Buffer Underflow";
+                    debug.log(Level.DEBUG, "BUFFER_UNDERFLOW");
+                    return new EngineResult(sslResult);
+                default:
+                    debug.log(Level.DEBUG, "ASSERT");
+                    assert false;
+            }
+        }
+    }
+}