src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
author chegar
Fri, 09 Feb 2018 18:50:31 +0000
branchhttp-client-branch
changeset 56101 983e338eeb50
parent 56092 fd85b2bf2b0d
child 56184 1c7b4d7140e2
permissions -rw-r--r--
http-client-branch: minor bug fixes

/*
 * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.internal.net.http.common;

import jdk.internal.net.http.common.SubscriberWrapper.SchedulingAction;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Implements SSL using two SubscriberWrappers.
 *
 * <p> Constructor takes two Flow.Subscribers: one that receives the network
 * data (after it has been encrypted by SSLFlowDelegate) data, and one that
 * receives the application data (before it has been encrypted by SSLFlowDelegate).
 *
 * <p> Methods upstreamReader() and upstreamWriter() return the corresponding
 * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data.
 * See diagram below.
 *
 * <p> How Flow.Subscribers are used in this class, and where they come from:
 * <pre>
 * {@code
 *
 *
 *
 * --------->  data flow direction
 *
 *
 *                         +------------------+
 *        upstreamWriter   |                  | downWriter
 *        ---------------> |                  | ------------>
 *  obtained from this     |                  | supplied to constructor
 *                         | SSLFlowDelegate  |
 *        downReader       |                  | upstreamReader
 *        <--------------- |                  | <--------------
 * supplied to constructor |                  | obtained from this
 *                         +------------------+
 * }
 * </pre>
 */
public class SSLFlowDelegate {

    static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
    final System.Logger debug =
            Utils.getDebugLogger(this::dbgString, DEBUG);

    final Executor exec;
    final Reader reader;
    final Writer writer;
    final SSLEngine engine;
    final String tubeName; // hack
    private final CompletableFuture<Void> cf;
    final CompletableFuture<String> alpnCF; // completes on initial handshake
    final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
    volatile boolean close_notify_received;

    /**
     * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
     * Flow.Subscriber requires an associated {@link CompletableFuture}
     * for errors that need to be signaled from downstream to upstream.
     */
    public SSLFlowDelegate(SSLEngine engine,
                           Executor exec,
                           Subscriber<? super List<ByteBuffer>> downReader,
                           Subscriber<? super List<ByteBuffer>> downWriter)
    {
        this.tubeName = String.valueOf(downWriter);
        this.reader = new Reader();
        this.writer = new Writer();
        this.engine = engine;
        this.exec = exec;
        this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
        CompletableFuture<Void> cs = CompletableFuture.allOf(
                reader.completion(), writer.completion()).thenRun(this::normalStop);
        this.cf = MinimalFuture.of(cs);
        this.alpnCF = new MinimalFuture<>();

        // connect the Reader to the downReader and the
        // Writer to the downWriter.
        connect(downReader, downWriter);

        //Monitor.add(this::monitor);
    }

    /**
     * Returns true if the SSLFlowDelegate has detected a TLS
     * close_notify from the server.
     * @return true, if a close_notify was detected.
     */
    public boolean closeNotifyReceived() {
        return close_notify_received;
    }

    /**
     * Connects the read sink (downReader) to the SSLFlowDelegate Reader,
     * and the write sink (downWriter) to the SSLFlowDelegate Writer.
     * Called from within the constructor. Overwritten by SSLTube.
     *
     * @param downReader  The left hand side read sink (typically, the
     *                    HttpConnection read subscriber).
     * @param downWriter  The right hand side write sink (typically
     *                    the SocketTube write subscriber).
     */
    void connect(Subscriber<? super List<ByteBuffer>> downReader,
                 Subscriber<? super List<ByteBuffer>> downWriter) {
        this.reader.subscribe(downReader);
        this.writer.subscribe(downWriter);
    }

   /**
    * Returns a CompletableFuture<String> which completes after
    * the initial handshake completes, and which contains the negotiated
    * alpn.
    */
    public CompletableFuture<String> alpn() {
        return alpnCF;
    }

    private void setALPN() {
        // Handshake is finished. So, can retrieve the ALPN now
        if (alpnCF.isDone())
            return;
        String alpn = engine.getApplicationProtocol();
        debug.log(Level.DEBUG, "setALPN = %s", alpn);
        alpnCF.complete(alpn);
    }

    public String monitor() {
        StringBuilder sb = new StringBuilder();
        sb.append("SSL: HS state: " + states(handshakeState));
        sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
        sb.append(" LL : ");
        for (String s: stateList) {
            sb.append(s).append(" ");
        }
        sb.append("\r\n");
        sb.append("Reader:: ").append(reader.toString());
        sb.append("\r\n");
        sb.append("Writer:: ").append(writer.toString());
        sb.append("\r\n===================================");
        return sb.toString();
    }

    protected SchedulingAction enterReadScheduling() {
        return SchedulingAction.CONTINUE;
    }


    /**
     * Processing function for incoming data. Pass it thru SSLEngine.unwrap().
     * Any decrypted buffers returned to be passed downstream.
     * Status codes:
     *     NEED_UNWRAP: do nothing. Following incoming data will contain
     *                  any required handshake data
     *     NEED_WRAP: call writer.addData() with empty buffer
     *     NEED_TASK: delegate task to executor
     *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap
     *     BUFFER_UNDERFLOW: keep buffer and wait for more data
     *     OK: return generated buffers.
     *
     * Upstream subscription strategy is to try and keep no more than
     * TARGET_BUFSIZE bytes in readBuf
     */
    class Reader extends SubscriberWrapper {
        final SequentialScheduler scheduler;
        static final int TARGET_BUFSIZE = 16 * 1024;
        volatile ByteBuffer readBuf;
        volatile boolean completing = false;
        final Object readBufferLock = new Object();
        final System.Logger debugr =
            Utils.getDebugLogger(this::dbgString, DEBUG);

        class ReaderDownstreamPusher implements Runnable {
            @Override public void run() { processData(); }
        }

        Reader() {
            super();
            scheduler = SequentialScheduler.synchronizedScheduler(
                                                new ReaderDownstreamPusher());
            this.readBuf = ByteBuffer.allocate(1024);
            readBuf.limit(0); // keep in read mode
        }

        protected SchedulingAction enterScheduling() {
            return enterReadScheduling();
        }

        public final String dbgString() {
            return "SSL Reader(" + tubeName + ")";
        }

        /**
         * entry point for buffers delivered from upstream Subscriber
         */
        @Override
        public void incoming(List<ByteBuffer> buffers, boolean complete) {
            debugr.log(Level.DEBUG, () -> "Adding " + Utils.remaining(buffers)
                        + " bytes to read buffer");
            addToReadBuf(buffers, complete);
            scheduler.runOrSchedule();
        }

        @Override
        public String toString() {
            return "READER: " + super.toString() + " readBuf: " + readBuf.toString()
                    + " count: " + count.toString();
        }

        private void reallocReadBuf() {
            int sz = readBuf.capacity();
            ByteBuffer newb = ByteBuffer.allocate(sz*2);
            readBuf.flip();
            Utils.copy(readBuf, newb);
            readBuf = newb;
        }

        @Override
        protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
            if (readBuf.remaining() > TARGET_BUFSIZE) {
                return 0;
            } else {
                return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
            }
        }

        // readBuf is kept ready for reading outside of this method
        private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
            synchronized (readBufferLock) {
                for (ByteBuffer buf : buffers) {
                    readBuf.compact();
                    while (readBuf.remaining() < buf.remaining())
                        reallocReadBuf();
                    readBuf.put(buf);
                    readBuf.flip();
                }
                if (complete) {
                    this.completing = complete;
                }
            }
        }

        void schedule() {
            scheduler.runOrSchedule();
        }

        void stop() {
            debugr.log(Level.DEBUG, "stop");
            scheduler.stop();
        }

        AtomicInteger count = new AtomicInteger(0);

        // work function where it all happens
        void processData() {
            try {
                debugr.log(Level.DEBUG, () -> "processData: " + readBuf.remaining()
                           + " bytes to unwrap "
                           + states(handshakeState)
                           + ", " + engine.getHandshakeStatus());
                int len;
                boolean complete = false;
                while ((len = readBuf.remaining()) > 0) {
                    boolean handshaking = false;
                    try {
                        EngineResult result;
                        synchronized (readBufferLock) {
                            complete = this.completing;
                            result = unwrapBuffer(readBuf);
                            debugr.log(Level.DEBUG, "Unwrapped: %s", result.result);
                        }
                        if (result.bytesProduced() > 0) {
                            debugr.log(Level.DEBUG, "sending %d", result.bytesProduced());
                            count.addAndGet(result.bytesProduced());
                            outgoing(result.destBuffer, false);
                        }
                        if (result.status() == Status.BUFFER_UNDERFLOW) {
                            debugr.log(Level.DEBUG, "BUFFER_UNDERFLOW");
                            // not enough data in the read buffer...
                            requestMore();
                            synchronized (readBufferLock) {
                                // check if we have received some data
                                if (readBuf.remaining() > len) continue;
                                return;
                            }
                        }
                        if (complete && result.status() == Status.CLOSED) {
                            debugr.log(Level.DEBUG, "Closed: completing");
                            outgoing(Utils.EMPTY_BB_LIST, true);
                            return;
                        }
                        if (result.handshaking() && !complete) {
                            debugr.log(Level.DEBUG, "handshaking");
                            doHandshake(result, READER);
                            resumeActivity();
                            handshaking = true;
                        } else {
                            if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
                                setALPN();
                                handshaking = false;
                                resumeActivity();
                            }
                        }
                    } catch (IOException ex) {
                        errorCommon(ex);
                        handleError(ex);
                    }
                    if (handshaking && !complete)
                        return;
                }
                if (!complete) {
                    synchronized (readBufferLock) {
                        complete = this.completing && !readBuf.hasRemaining();
                    }
                }
                if (complete) {
                    debugr.log(Level.DEBUG, "completing");
                    // Complete the alpnCF, if not already complete, regardless of
                    // whether or not the ALPN is available, there will be no more
                    // activity.
                    setALPN();
                    outgoing(Utils.EMPTY_BB_LIST, true);
                }
            } catch (Throwable ex) {
                errorCommon(ex);
                handleError(ex);
            }
        }
    }

    /**
     * Returns a CompletableFuture which completes after all activity
     * in the delegate is terminated (whether normally or exceptionally).
     *
     * @return
     */
    public CompletableFuture<Void> completion() {
        return cf;
    }

    public interface Monitorable {
        public String getInfo();
    }

    public static class Monitor extends Thread {
        final List<Monitorable> list;
        static Monitor themon;

        static {
            themon = new Monitor();
            themon.start(); // uncomment to enable Monitor
        }

        Monitor() {
            super("Monitor");
            setDaemon(true);
            list = Collections.synchronizedList(new LinkedList<>());
        }

        void addTarget(Monitorable o) {
            list.add(o);
        }

        public static void add(Monitorable o) {
            themon.addTarget(o);
        }

        @Override
        public void run() {
            System.out.println("Monitor starting");
            try {
                while (true) {
                    Thread.sleep(20 * 1000);
                    synchronized (list) {
                        for (Monitorable o : list) {
                            System.out.println(o.getInfo());
                            System.out.println("-------------------------");
                        }
                    }
                    System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-");
                }
            } catch (InterruptedException e) {
                System.out.println("Monitor exiting with " + e);
            }
        }
    }

    /**
     * Processing function for outgoing data. Pass it thru SSLEngine.wrap()
     * Any encrypted buffers generated are passed downstream to be written.
     * Status codes:
     *     NEED_UNWRAP: call reader.addData() with empty buffer
     *     NEED_WRAP: call addData() with empty buffer
     *     NEED_TASK: delegate task to executor
     *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap
     *     BUFFER_UNDERFLOW: shouldn't happen on writing side
     *     OK: return generated buffers
     */
    class Writer extends SubscriberWrapper {
        final SequentialScheduler scheduler;
        // queues of buffers received from upstream waiting
        // to be processed by the SSLEngine
        final List<ByteBuffer> writeList;
        final System.Logger debugw =
            Utils.getDebugLogger(this::dbgString, DEBUG);
        volatile boolean completing;
        boolean completed; // only accessed in processData

        class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
            @Override public void run() { processData(); }
        }

        Writer() {
            super();
            writeList = Collections.synchronizedList(new LinkedList<>());
            scheduler = new SequentialScheduler(new WriterDownstreamPusher());
        }

        @Override
        protected void incoming(List<ByteBuffer> buffers, boolean complete) {
            assert complete ? buffers ==  Utils.EMPTY_BB_LIST : true;
            assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
            if (complete) {
                debugw.log(Level.DEBUG, "adding SENTINEL");
                completing = true;
                writeList.add(SENTINEL);
            } else {
                writeList.addAll(buffers);
            }
            debugw.log(Level.DEBUG, () -> "added " + buffers.size()
                        + " (" + Utils.remaining(buffers)
                        + " bytes) to the writeList");
            scheduler.runOrSchedule();
        }

        public final String dbgString() {
            return "SSL Writer(" + tubeName + ")";
        }

        protected void onSubscribe() {
            doHandshake(EngineResult.INIT, INIT);
            resumeActivity();
        }

        void schedule() {
            scheduler.runOrSchedule();
        }

        void stop() {
            debugw.log(Level.DEBUG, "stop");
            scheduler.stop();
        }

        @Override
        public boolean closing() {
            return closeNotifyReceived();
        }

        private boolean isCompleting() {
            return completing;
        }

        @Override
        protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
            if (writeList.size() > 10)
                return 0;
            else
                return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
        }

        private boolean hsTriggered() {
            synchronized(writeList) {
                for (ByteBuffer b : writeList)
                    if (b == HS_TRIGGER)
                        return true;
                return false;
            }
        }

        private void processData() {
            boolean completing = isCompleting();

            try {
                debugw.log(Level.DEBUG, () -> "processData(" + Utils.remaining(writeList) + ")");
                while (Utils.remaining(writeList) > 0 || hsTriggered()
                        || needWrap()) {
                    ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
                    EngineResult result = wrapBuffers(outbufs);
                    debugw.log(Level.DEBUG, "wrapBuffer returned %s", result.result);

                    if (result.status() == Status.CLOSED) {
                        if (result.bytesProduced() <= 0)
                            return;

                        if (!completing && !completed) {
                            completing = this.completing = true;
                            // There could still be some outgoing data in outbufs.
                            writeList.add(SENTINEL);
                        }
                    }

                    boolean handshaking = false;
                    if (result.handshaking()) {
                        debugw.log(Level.DEBUG, "handshaking");
                        doHandshake(result, WRITER);
                        handshaking = true;
                    } else {
                        if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
                            setALPN();
                            resumeActivity();
                        }
                    }
                    cleanList(writeList); // tidy up the source list
                    sendResultBytes(result);
                    if (handshaking && !completing) {
                        if (writeList.isEmpty() && !result.needUnwrap()) {
                            writer.addData(HS_TRIGGER);
                        }
                        if (needWrap()) continue;
                        return;
                    }
                }
                if (completing && Utils.remaining(writeList) == 0) {
                    /*
                    System.out.println("WRITER DOO 3");
                    engine.closeOutbound();
                    EngineResult result = wrapBuffers(Utils.EMPTY_BB_ARRAY);
                    sendResultBytes(result);
                    */
                    if (!completed) {
                        completed = true;
                        writeList.clear();
                        outgoing(Utils.EMPTY_BB_LIST, true);
                    }
                    return;
                }
                if (writeList.isEmpty() && needWrap()) {
                    writer.addData(HS_TRIGGER);
                }
            } catch (Throwable ex) {
                errorCommon(ex);
                handleError(ex);
            }
        }

        private boolean needWrap() {
            return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP;
        }

        private void sendResultBytes(EngineResult result) {
            if (result.bytesProduced() > 0) {
                debugw.log(Level.DEBUG, "Sending %d bytes downstream",
                           result.bytesProduced());
                outgoing(result.destBuffer, false);
            }
        }

        @Override
        public String toString() {
            return "WRITER: " + super.toString() +
                    " writeList size " + Integer.toString(writeList.size());
                    //" writeList: " + writeList.toString();
        }
    }

    private void handleError(Throwable t) {
        debug.log(Level.DEBUG, "handleError", t);
        cf.completeExceptionally(t);
        // no-op if already completed
        alpnCF.completeExceptionally(t);
        reader.stop();
        writer.stop();
    }

    private void normalStop() {
        reader.stop();
        writer.stop();
    }

    private void cleanList(List<ByteBuffer> l) {
        synchronized (l) {
            Iterator<ByteBuffer> iter = l.iterator();
            while (iter.hasNext()) {
                ByteBuffer b = iter.next();
                if (!b.hasRemaining() && b != SENTINEL) {
                    iter.remove();
                }
            }
        }
    }

    /**
     * States for handshake. We avoid races when accessing/updating the AtomicInt
     * because updates always schedule an additional call to both the read()
     * and write() functions.
     */
    private static final int NOT_HANDSHAKING = 0;
    private static final int HANDSHAKING = 1;
    private static final int INIT = 2;
    private static final int DOING_TASKS = 4; // bit added to above state
    private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);

    private static final int READER = 1;
    private static final int WRITER = 2;

    private static String states(AtomicInteger state) {
        int s = state.get();
        StringBuilder sb = new StringBuilder();
        int x = s & ~DOING_TASKS;
        switch (x) {
            case NOT_HANDSHAKING:
                sb.append(" NOT_HANDSHAKING ");
                break;
            case HANDSHAKING:
                sb.append(" HANDSHAKING ");
                break;
            case INIT:
                sb.append(" INIT ");
                break;
            default:
                throw new InternalError();
        }
        if ((s & DOING_TASKS) > 0)
            sb.append("|DOING_TASKS");
        return sb.toString();
    }

    private void resumeActivity() {
        reader.schedule();
        writer.schedule();
    }

    final AtomicInteger handshakeState;
    final ConcurrentLinkedQueue<String> stateList = new ConcurrentLinkedQueue<>();

    private void doHandshake(EngineResult r, int caller) {
        int s = handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
        stateList.add(r.handshakeStatus().toString());
        stateList.add(Integer.toString(caller));
        switch (r.handshakeStatus()) {
            case NEED_TASK:
                if ((s & DOING_TASKS) > 0) // someone else was doing tasks
                    return;
                List<Runnable> tasks = obtainTasks();
                executeTasks(tasks);
                break;
            case NEED_WRAP:
                writer.addData(HS_TRIGGER);
                break;
            case NEED_UNWRAP:
            case NEED_UNWRAP_AGAIN:
                // do nothing else
                break;
            default:
                throw new InternalError("Unexpected handshake status:"
                                        + r.handshakeStatus());
        }
    }

    private List<Runnable> obtainTasks() {
        List<Runnable> l = new ArrayList<>();
        Runnable r;
        while ((r = engine.getDelegatedTask()) != null) {
            l.add(r);
        }
        return l;
    }

    private void executeTasks(List<Runnable> tasks) {
        exec.execute(() -> {
            try {
                handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
                List<Runnable> nextTasks = tasks;
                do {
                    nextTasks.forEach(Runnable::run);
                    if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
                        nextTasks = obtainTasks();
                    } else {
                        break;
                    }
                } while (true);
                handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
                writer.addData(HS_TRIGGER);
                resumeActivity();
            } catch (Throwable t) {
                handleError(t);
            }
        });
    }


    EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
        ByteBuffer dst = getAppBuffer();
        while (true) {
            SSLEngineResult sslResult = engine.unwrap(src, dst);
            switch (sslResult.getStatus()) {
                case BUFFER_OVERFLOW:
                    // may happen only if app size buffer was changed.
                    // get it again if app buffer size changed
                    int appSize = engine.getSession().getApplicationBufferSize();
                    ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
                    dst.flip();
                    b.put(dst);
                    dst = b;
                    break;
                case CLOSED:
                    return doClosure(new EngineResult(sslResult));
                case BUFFER_UNDERFLOW:
                    // handled implicitly by compaction/reallocation of readBuf
                    return new EngineResult(sslResult);
                case OK:
                     dst.flip();
                     return new EngineResult(sslResult, dst);
            }
        }
    }

    // FIXME: acknowledge a received CLOSE request from peer
    EngineResult doClosure(EngineResult r) throws IOException {
        debug.log(Level.DEBUG,
                "doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]",
                r.result, engine.getHandshakeStatus(),
                engine.isOutboundDone(), engine.isInboundDone());
        if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
            // we have received TLS close_notify and need to send
            // an acknowledgement back. We're calling doHandshake
            // to finish the close handshake.
            if (engine.isInboundDone() && !engine.isOutboundDone()) {
                debug.log(Level.DEBUG, "doClosure: close_notify received");
                close_notify_received = true;
                doHandshake(r, READER);
            }
        }
        return r;
    }

    /**
     * Returns the upstream Flow.Subscriber of the reading (incoming) side.
     * This flow must be given the encrypted data read from upstream (eg socket)
     * before it is decrypted.
     */
    public Flow.Subscriber<List<ByteBuffer>> upstreamReader() {
        return reader;
    }

    /**
     * Returns the upstream Flow.Subscriber of the writing (outgoing) side.
     * This flow contains the plaintext data before it is encrypted.
     */
    public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() {
        return writer;
    }

    public boolean resumeReader() {
        return reader.signalScheduling();
    }

    public void resetReaderDemand() {
        reader.resetDownstreamDemand();
    }

    static class EngineResult {
        final SSLEngineResult result;
        final ByteBuffer destBuffer;

        // normal result
        EngineResult(SSLEngineResult result) {
            this(result, null);
        }

        EngineResult(SSLEngineResult result, ByteBuffer destBuffer) {
            this.result = result;
            this.destBuffer = destBuffer;
        }

        // Special result used to trigger handshaking in constructor
        static EngineResult INIT =
            new EngineResult(
                new SSLEngineResult(SSLEngineResult.Status.OK, HandshakeStatus.NEED_WRAP, 0, 0));

        boolean handshaking() {
            HandshakeStatus s = result.getHandshakeStatus();
            return s != HandshakeStatus.FINISHED
                   && s != HandshakeStatus.NOT_HANDSHAKING
                   && result.getStatus() != Status.CLOSED;
        }

        boolean needUnwrap() {
            HandshakeStatus s = result.getHandshakeStatus();
            return s == HandshakeStatus.NEED_UNWRAP;
        }


        int bytesConsumed() {
            return result.bytesConsumed();
        }

        int bytesProduced() {
            return result.bytesProduced();
        }

        SSLEngineResult.HandshakeStatus handshakeStatus() {
            return result.getHandshakeStatus();
        }

        SSLEngineResult.Status status() {
            return result.getStatus();
        }
    }

    public ByteBuffer getNetBuffer() {
        return ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
    }

    private ByteBuffer getAppBuffer() {
        return ByteBuffer.allocate(engine.getSession().getApplicationBufferSize());
    }

    final String dbgString() {
        return "SSLFlowDelegate(" + tubeName + ")";
    }

    @SuppressWarnings("fallthrough")
    EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
        debug.log(Level.DEBUG, () -> "wrapping "
                    + Utils.remaining(src) + " bytes");
        ByteBuffer dst = getNetBuffer();
        while (true) {
            SSLEngineResult sslResult = engine.wrap(src, dst);
            debug.log(Level.DEBUG, () -> "SSLResult: " + sslResult);
            switch (sslResult.getStatus()) {
                case BUFFER_OVERFLOW:
                    // Shouldn't happen. We allocated buffer with packet size
                    // get it again if net buffer size was changed
                    debug.log(Level.DEBUG, "BUFFER_OVERFLOW");
                    int appSize = engine.getSession().getApplicationBufferSize();
                    ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
                    dst.flip();
                    b.put(dst);
                    dst = b;
                    break; // try again
                case CLOSED:
                    debug.log(Level.DEBUG, "CLOSED");
                    // fallthrough. There could be some remaining data in dst.
                    // CLOSED will be handled by the caller.
                case OK:
                    dst.flip();
                    final ByteBuffer dest = dst;
                    debug.log(Level.DEBUG, () -> "OK => produced: "
                                           + dest.remaining()
                                           + " not wrapped: "
                                           + Utils.remaining(src));
                    return new EngineResult(sslResult, dest);
                case BUFFER_UNDERFLOW:
                    // Shouldn't happen.  Doesn't returns when wrap()
                    // underflow handled externally
                    // assert false : "Buffer Underflow";
                    debug.log(Level.DEBUG, "BUFFER_UNDERFLOW");
                    return new EngineResult(sslResult);
                default:
                    debug.log(Level.DEBUG, "ASSERT");
                    assert false;
            }
        }
    }
}