src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java
branchhttp-client-branch
changeset 55763 634d8e14c172
child 55768 8674257c75ce
equal deleted inserted replaced
55762:e947a3a50a95 55763:634d8e14c172
       
     1 /*
       
     2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http.internal.common;
       
    27 
       
    28 import java.io.IOException;
       
    29 import java.lang.System.Logger.Level;
       
    30 import java.nio.ByteBuffer;
       
    31 import java.util.concurrent.Executor;
       
    32 import java.util.concurrent.Flow;
       
    33 import java.util.concurrent.Flow.Subscriber;
       
    34 import java.util.List;
       
    35 import java.util.ArrayList;
       
    36 import java.util.Collections;
       
    37 import java.util.Iterator;
       
    38 import java.util.LinkedList;
       
    39 import java.util.concurrent.CompletableFuture;
       
    40 import java.util.concurrent.ConcurrentLinkedQueue;
       
    41 import java.util.concurrent.atomic.AtomicInteger;
       
    42 import javax.net.ssl.SSLEngine;
       
    43 import javax.net.ssl.SSLEngineResult;
       
    44 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
       
    45 import javax.net.ssl.SSLEngineResult.Status;
       
    46 import javax.net.ssl.SSLException;
       
    47 
       
    48 /**
       
    49  * Implements SSL using two SubscriberWrappers.
       
    50  *
       
    51  * <p> Constructor takes two Flow.Subscribers: one that receives the network
       
    52  * data (after it has been encrypted by SSLFlowDelegate) data, and one that
       
    53  * receives the application data (before it has been encrypted by SSLFlowDelegate).
       
    54  *
       
    55  * <p> Methods upstreamReader() and upstreamWriter() return the corresponding
       
    56  * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data.
       
    57  * See diagram below.
       
    58  *
       
    59  * <p> How Flow.Subscribers are used in this class, and where they come from:
       
    60  * <pre>
       
    61  * {@code
       
    62  *
       
    63  *
       
    64  *
       
    65  * --------->  data flow direction
       
    66  *
       
    67  *
       
    68  *                         +------------------+
       
    69  *        upstreamWriter   |                  | downWriter
       
    70  *        ---------------> |                  | ------------>
       
    71  *  obtained from this     |                  | supplied to constructor
       
    72  *                         | SSLFlowDelegate  |
       
    73  *        downReader       |                  | upstreamReader
       
    74  *        <--------------- |                  | <--------------
       
    75  * supplied to constructor |                  | obtained from this
       
    76  *                         +------------------+
       
    77  * }
       
    78  * </pre>
       
    79  */
       
    80 public class SSLFlowDelegate {
       
    81 
       
    82     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    83     final System.Logger debug =
       
    84             Utils.getDebugLogger(this::dbgString, DEBUG);
       
    85 
       
    86     final Executor exec;
       
    87     final Reader reader;
       
    88     final Writer writer;
       
    89     final SSLEngine engine;
       
    90     final String tubeName; // hack
       
    91     final CompletableFuture<Void> cf;
       
    92     final CompletableFuture<String> alpnCF; // completes on initial handshake
       
    93     final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
       
    94 
       
    95     /**
       
    96      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
       
    97      * Flow.Subscriber requires an associated {@link CompletableFuture}
       
    98      * for errors that need to be signaled from downstream to upstream.
       
    99      */
       
   100     public SSLFlowDelegate(SSLEngine engine,
       
   101                            Executor exec,
       
   102                            Subscriber<? super List<ByteBuffer>> downReader,
       
   103                            Subscriber<? super List<ByteBuffer>> downWriter)
       
   104     {
       
   105         this.tubeName = String.valueOf(downWriter);
       
   106         this.reader = new Reader();
       
   107         this.reader.subscribe(downReader);
       
   108         this.writer = new Writer();
       
   109         this.writer.subscribe(downWriter);
       
   110         this.engine = engine;
       
   111         this.exec = exec;
       
   112         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
       
   113         this.cf = CompletableFuture.allOf(reader.completion(), writer.completion())
       
   114                                    .thenRun(this::normalStop);
       
   115         this.alpnCF = new CompletableFuture<>();
       
   116         //Monitor.add(this::monitor);
       
   117     }
       
   118 
       
   119    /**
       
   120     * Returns a CompletableFuture<String> which completes after
       
   121     * the initial handshake completes, and which contains the negotiated
       
   122     * alpn.
       
   123     */
       
   124     public CompletableFuture<String> alpn() {
       
   125         return alpnCF;
       
   126     }
       
   127 
       
   128     private void setALPN() {
       
   129         // Handshake is finished. So, can retrieve the ALPN now
       
   130         if (alpnCF.isDone())
       
   131             return;
       
   132         String alpn = engine.getApplicationProtocol();
       
   133         debug.log(Level.DEBUG, "setALPN = %s", alpn);
       
   134         alpnCF.complete(alpn);
       
   135     }
       
   136 
       
   137     public String monitor() {
       
   138         StringBuilder sb = new StringBuilder();
       
   139         sb.append("SSL: HS state: " + states(handshakeState));
       
   140         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
       
   141         sb.append(" LL : ");
       
   142         synchronized(stateList) {
       
   143             for (String s: stateList) {
       
   144                 sb.append(s).append(" ");
       
   145             }
       
   146         }
       
   147         sb.append("\r\n");
       
   148         sb.append("Reader:: ").append(reader.toString());
       
   149         sb.append("\r\n");
       
   150         sb.append("Writer:: ").append(writer.toString());
       
   151         sb.append("\r\n===================================");
       
   152         return sb.toString();
       
   153     }
       
   154 
       
   155     /**
       
   156      * Processing function for incoming data. Pass it thru SSLEngine.unwrap().
       
   157      * Any decrypted buffers returned to be passed downstream.
       
   158      * Status codes:
       
   159      *     NEED_UNWRAP: do nothing. Following incoming data will contain
       
   160      *                  any required handshake data
       
   161      *     NEED_WRAP: call writer.addData() with empty buffer
       
   162      *     NEED_TASK: delegate task to executor
       
   163      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap
       
   164      *     BUFFER_UNDERFLOW: keep buffer and wait for more data
       
   165      *     OK: return generated buffers.
       
   166      *
       
   167      * Upstream subscription strategy is to try and keep no more than
       
   168      * TARGET_BUFSIZE bytes in readBuf
       
   169      */
       
   170     class Reader extends SubscriberWrapper {
       
   171         final SequentialScheduler scheduler;
       
   172         static final int TARGET_BUFSIZE = 16 * 1024;
       
   173         volatile ByteBuffer readBuf;
       
   174         volatile boolean completing = false;
       
   175         final Object readLock = new Object();
       
   176         final System.Logger debugr =
       
   177             Utils.getDebugLogger(this::dbgString, DEBUG);
       
   178 
       
   179         class ReaderDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
       
   180             @Override public void run() { processData(); }
       
   181         }
       
   182 
       
   183         Reader() {
       
   184             super();
       
   185             scheduler = new SequentialScheduler(new ReaderDownstreamPusher());
       
   186             this.readBuf = ByteBuffer.allocate(1024);
       
   187             readBuf.limit(0); // keep in read mode
       
   188         }
       
   189 
       
   190         public final String dbgString() {
       
   191             return "SSL Reader(" + tubeName + ")";
       
   192         }
       
   193 
       
   194         /**
       
   195          * entry point for buffers delivered from upstream Subscriber
       
   196          */
       
   197         @Override
       
   198         public void incoming(List<ByteBuffer> buffers, boolean complete) {
       
   199             debugr.log(Level.DEBUG, () -> "Adding " + Utils.remaining(buffers)
       
   200                         + " bytes to read buffer");
       
   201             addToReadBuf(buffers);
       
   202             if (complete) {
       
   203                 this.completing = true;
       
   204             }
       
   205             scheduler.runOrSchedule();
       
   206         }
       
   207 
       
   208         @Override
       
   209         public String toString() {
       
   210             return "READER: " + super.toString() + " readBuf: " + readBuf.toString()
       
   211                     + " count: " + count.toString();
       
   212         }
       
   213 
       
   214         private void reallocReadBuf() {
       
   215             int sz = readBuf.capacity();
       
   216             ByteBuffer newb = ByteBuffer.allocate(sz*2);
       
   217             readBuf.flip();
       
   218             Utils.copy(readBuf, newb);
       
   219             readBuf = newb;
       
   220         }
       
   221 
       
   222         @Override
       
   223         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
       
   224             if (readBuf.remaining() > TARGET_BUFSIZE) {
       
   225                 return 0;
       
   226             } else {
       
   227                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
       
   228             }
       
   229         }
       
   230 
       
   231         // readBuf is kept ready for reading outside of this method
       
   232         private void addToReadBuf(List<ByteBuffer> buffers) {
       
   233             synchronized (readLock) {
       
   234                 for (ByteBuffer buf : buffers) {
       
   235                     readBuf.compact();
       
   236                     while (readBuf.remaining() < buf.remaining())
       
   237                         reallocReadBuf();
       
   238                     readBuf.put(buf);
       
   239                     readBuf.flip();
       
   240                 }
       
   241             }
       
   242         }
       
   243 
       
   244         void schedule() {
       
   245             scheduler.runOrSchedule();
       
   246         }
       
   247 
       
   248         void stop() {
       
   249             debugr.log(Level.DEBUG, "stop");
       
   250             scheduler.stop();
       
   251         }
       
   252 
       
   253         AtomicInteger count = new AtomicInteger(0);
       
   254 
       
   255         // work function where it all happens
       
   256         void processData() {
       
   257             try {
       
   258                 debugr.log(Level.DEBUG, () -> "processData: " + readBuf.remaining()
       
   259                            + " bytes to unwrap "
       
   260                            + states(handshakeState));
       
   261 
       
   262                 while (readBuf.hasRemaining()) {
       
   263                     boolean handshaking = false;
       
   264                     try {
       
   265                         EngineResult result;
       
   266                         synchronized (readLock) {
       
   267                             result = unwrapBuffer(readBuf);
       
   268                             debugr.log(Level.DEBUG, "Unwrapped: %s", result.result);
       
   269                         }
       
   270                         if (result.status() == Status.BUFFER_UNDERFLOW) {
       
   271                             debugr.log(Level.DEBUG, "BUFFER_UNDERFLOW");
       
   272                             return;
       
   273                         }
       
   274                         if (completing && result.status() == Status.CLOSED) {
       
   275                             debugr.log(Level.DEBUG, "Closed: completing");
       
   276                             outgoing(Utils.EMPTY_BB_LIST, true);
       
   277                             return;
       
   278                         }
       
   279                         if (result.handshaking() && !completing) {
       
   280                             debugr.log(Level.DEBUG, "handshaking");
       
   281                             doHandshake(result, READER);
       
   282                             resumeActivity();
       
   283                             handshaking = true;
       
   284                         } else {
       
   285                             if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
       
   286                                 setALPN();
       
   287                                 handshaking = false;
       
   288                                 resumeActivity();
       
   289                             }
       
   290                         }
       
   291                         if (result.bytesProduced() > 0) {
       
   292                             debugr.log(Level.DEBUG, "sending %d", result.bytesProduced());
       
   293                             count.addAndGet(result.bytesProduced());
       
   294                             outgoing(result.destBuffer, false);
       
   295                         }
       
   296                     } catch (IOException ex) {
       
   297                         errorCommon(ex);
       
   298                         handleError(ex);
       
   299                     }
       
   300                     if (handshaking && !completing)
       
   301                         return;
       
   302                 }
       
   303                 if (completing) {
       
   304                     debugr.log(Level.DEBUG, "completing");
       
   305                     // Complete the alpnCF, if not already complete, regardless of
       
   306                     // whether or not the ALPN is available, there will be no more
       
   307                     // activity.
       
   308                     setALPN();
       
   309                     outgoing(Utils.EMPTY_BB_LIST, true);
       
   310                 }
       
   311             } catch (Throwable ex) {
       
   312                 errorCommon(ex);
       
   313                 handleError(ex);
       
   314             }
       
   315         }
       
   316     }
       
   317     /**
       
   318      * Returns a CompletableFuture which completes after all activity
       
   319      * in the delegate is terminated (whether normally or exceptionally).
       
   320      *
       
   321      * @return
       
   322      */
       
   323     public CompletableFuture<Void> completion() {
       
   324         return cf;
       
   325     }
       
   326 
       
   327     private String xxx(List<ByteBuffer> i) {
       
   328         StringBuilder sb = new StringBuilder();
       
   329         sb.append("xxx size=" + i.size());
       
   330         int x = 0;
       
   331         for (ByteBuffer b : i)
       
   332             x += b.remaining();
       
   333         sb.append(" total " + x);
       
   334         return sb.toString();
       
   335     }
       
   336 
       
   337     public interface Monitorable {
       
   338         public String getInfo();
       
   339     }
       
   340 
       
   341     public static class Monitor extends Thread {
       
   342         final List<Monitorable> list;
       
   343         static Monitor themon;
       
   344 
       
   345         static {
       
   346             themon = new Monitor();
       
   347             themon.start(); // uncomment to enable Monitor
       
   348         }
       
   349 
       
   350         Monitor() {
       
   351             super("Monitor");
       
   352             setDaemon(true);
       
   353             list = Collections.synchronizedList(new LinkedList<>());
       
   354         }
       
   355 
       
   356         void addTarget(Monitorable o) {
       
   357             list.add(o);
       
   358         }
       
   359 
       
   360         public static void add(Monitorable o) {
       
   361             themon.addTarget(o);
       
   362         }
       
   363 
       
   364         @Override
       
   365         public void run() {
       
   366             System.out.println("Monitor starting");
       
   367             while (true) {
       
   368                 try {Thread.sleep(20*1000); } catch (Exception e) {}
       
   369                 synchronized (list) {
       
   370                     for (Monitorable o : list) {
       
   371                         System.out.println(o.getInfo());
       
   372                         System.out.println("-------------------------");
       
   373                     }
       
   374                 }
       
   375                 System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-");
       
   376 
       
   377             }
       
   378         }
       
   379     }
       
   380 
       
   381     /**
       
   382      * Processing function for outgoing data. Pass it thru SSLEngine.wrap()
       
   383      * Any encrypted buffers generated are passed downstream to be written.
       
   384      * Status codes:
       
   385      *     NEED_UNWRAP: call reader.addData() with empty buffer
       
   386      *     NEED_WRAP: call addData() with empty buffer
       
   387      *     NEED_TASK: delegate task to executor
       
   388      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap
       
   389      *     BUFFER_UNDERFLOW: shouldn't happen on writing side
       
   390      *     OK: return generated buffers
       
   391      */
       
   392     class Writer extends SubscriberWrapper {
       
   393         final SequentialScheduler scheduler;
       
   394         // queues of buffers received from upstream waiting
       
   395         // to be processed by the SSLEngine
       
   396         final List<ByteBuffer> writeList;
       
   397         final System.Logger debugw =
       
   398             Utils.getDebugLogger(this::dbgString, DEBUG);
       
   399 
       
   400         class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
       
   401             @Override public void run() { processData(); }
       
   402         }
       
   403 
       
   404         Writer() {
       
   405             super();
       
   406             writeList = Collections.synchronizedList(new LinkedList<>());
       
   407             scheduler = new SequentialScheduler(new WriterDownstreamPusher());
       
   408         }
       
   409 
       
   410         @Override
       
   411         protected void incoming(List<ByteBuffer> buffers, boolean complete) {
       
   412             assert complete ? buffers ==  Utils.EMPTY_BB_LIST : true;
       
   413             assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
       
   414             if (complete) {
       
   415                 writeList.add(SENTINEL);
       
   416             } else {
       
   417                 writeList.addAll(buffers);
       
   418             }
       
   419             debugw.log(Level.DEBUG, () -> "added " + buffers.size()
       
   420                         + " (" + Utils.remaining(buffers)
       
   421                         + " bytes) to the writeList");
       
   422             scheduler.runOrSchedule();
       
   423         }
       
   424 
       
   425         public final String dbgString() {
       
   426             return "SSL Writer(" + tubeName + ")";
       
   427         }
       
   428 
       
   429         protected void onSubscribe() {
       
   430             doHandshake(EngineResult.INIT, INIT);
       
   431             resumeActivity();
       
   432         }
       
   433 
       
   434         void schedule() {
       
   435             scheduler.runOrSchedule();
       
   436         }
       
   437 
       
   438         void stop() {
       
   439             debugw.log(Level.DEBUG, "stop");
       
   440             scheduler.stop();
       
   441         }
       
   442 
       
   443         private boolean isCompleting() {
       
   444             synchronized(writeList) {
       
   445                 int lastIndex = writeList.size() - 1;
       
   446                 if (lastIndex < 0)
       
   447                     return false;
       
   448                 return writeList.get(lastIndex) == SENTINEL;
       
   449             }
       
   450         }
       
   451 
       
   452         @Override
       
   453         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
       
   454             if (writeList.size() > 10)
       
   455                 return 0;
       
   456             else
       
   457                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
       
   458         }
       
   459 
       
   460         private boolean hsTriggered() {
       
   461             synchronized(writeList) {
       
   462                 for (ByteBuffer b : writeList)
       
   463                     if (b == HS_TRIGGER)
       
   464                         return true;
       
   465                 return false;
       
   466             }
       
   467         }
       
   468 
       
   469         private void processData() {
       
   470             boolean completing = isCompleting();
       
   471 
       
   472             try {
       
   473                 debugw.log(Level.DEBUG, () -> "processData(" + Utils.remaining(writeList) + ")");
       
   474                 while (Utils.remaining(writeList) > 0 || hsTriggered()) {
       
   475                     ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
       
   476                     EngineResult result = wrapBuffers(outbufs);
       
   477                     debugw.log(Level.DEBUG, "wrapBuffer returned %s", result.result);
       
   478 
       
   479                     if (result.status() == Status.CLOSED) {
       
   480                         if (result.bytesProduced() <= 0)
       
   481                             return;
       
   482 
       
   483                         completing = true;
       
   484                         // There could still be some outgoing data in outbufs.
       
   485                         writeList.add(SENTINEL);
       
   486                     }
       
   487 
       
   488                     boolean handshaking = false;
       
   489                     if (result.handshaking()) {
       
   490                         debugw.log(Level.DEBUG, "handshaking");
       
   491                         doHandshake(result, WRITER);
       
   492                         handshaking = true;
       
   493                     } else {
       
   494                         if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
       
   495                             setALPN();
       
   496                             resumeActivity();
       
   497                         }
       
   498                     }
       
   499                     cleanList(writeList); // tidy up the source list
       
   500                     sendResultBytes(result);
       
   501                     if (handshaking && !completing) {
       
   502                         if (writeList.isEmpty() && !result.needUnwrap()) {
       
   503                             writer.addData(HS_TRIGGER);
       
   504                         }
       
   505                         return;
       
   506                     }
       
   507                 }
       
   508                 if (completing && Utils.remaining(writeList) == 0) {
       
   509                     /*
       
   510                     System.out.println("WRITER DOO 3");
       
   511                     engine.closeOutbound();
       
   512                     EngineResult result = wrapBuffers(Utils.EMPTY_BB_ARRAY);
       
   513                     sendResultBytes(result);
       
   514                     */
       
   515                     outgoing(Utils.EMPTY_BB_LIST, true);
       
   516                     return;
       
   517                 }
       
   518             } catch (Throwable ex) {
       
   519                 handleError(ex);
       
   520             }
       
   521         }
       
   522 
       
   523         private void sendResultBytes(EngineResult result) {
       
   524             if (result.bytesProduced() > 0) {
       
   525                 debugw.log(Level.DEBUG, "Sending %d bytes downstream",
       
   526                            result.bytesProduced());
       
   527                 outgoing(result.destBuffer, false);
       
   528             }
       
   529         }
       
   530 
       
   531         @Override
       
   532         public String toString() {
       
   533             return "WRITER: " + super.toString() +
       
   534                     " writeList size " + Integer.toString(writeList.size());
       
   535                     //" writeList: " + writeList.toString();
       
   536         }
       
   537     }
       
   538 
       
   539     private void handleError(Throwable t) {
       
   540         debug.log(Level.DEBUG, "handleError", t);
       
   541         cf.completeExceptionally(t);
       
   542         // no-op if already completed
       
   543         alpnCF.completeExceptionally(t);
       
   544         reader.stop();
       
   545         writer.stop();
       
   546     }
       
   547 
       
   548     private void normalStop() {
       
   549         reader.stop();
       
   550         writer.stop();
       
   551     }
       
   552 
       
   553     private void cleanList(List<ByteBuffer> l) {
       
   554         synchronized (l) {
       
   555             Iterator<ByteBuffer> iter = l.iterator();
       
   556             while (iter.hasNext()) {
       
   557                 ByteBuffer b = iter.next();
       
   558                 if (!b.hasRemaining()) {
       
   559                     iter.remove();
       
   560                 }
       
   561             }
       
   562         }
       
   563     }
       
   564 
       
   565     /**
       
   566      * States for handshake. We avoid races when accessing/updating the AtomicInt
       
   567      * because updates always schedule an additional call to both the read()
       
   568      * and write() functions.
       
   569      */
       
   570     private static final int NOT_HANDSHAKING = 0;
       
   571     private static final int HANDSHAKING = 1;
       
   572     private static final int INIT = 2;
       
   573     private static final int DOING_TASKS = 4; // bit added to above state
       
   574     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
       
   575 
       
   576     private static final int READER = 1;
       
   577     private static final int WRITER = 2;
       
   578 
       
   579     private static String states(AtomicInteger state) {
       
   580         int s = state.get();
       
   581         StringBuilder sb = new StringBuilder();
       
   582         int x = s & ~DOING_TASKS;
       
   583         switch (x) {
       
   584             case NOT_HANDSHAKING:
       
   585                 sb.append(" NOT_HANDSHAKING ");
       
   586                 break;
       
   587             case HANDSHAKING:
       
   588                 sb.append(" HANDSHAKING ");
       
   589                 break;
       
   590             case INIT:
       
   591                 sb.append(" INIT ");
       
   592                 break;
       
   593             default:
       
   594                 throw new InternalError();
       
   595         }
       
   596         if ((s & DOING_TASKS) > 0)
       
   597             sb.append("|DOING_TASKS");
       
   598         return sb.toString();
       
   599     }
       
   600 
       
   601     private void resumeActivity() {
       
   602         reader.schedule();
       
   603         writer.schedule();
       
   604     }
       
   605 
       
   606     final AtomicInteger handshakeState;
       
   607     final ConcurrentLinkedQueue<String> stateList = new ConcurrentLinkedQueue<>();
       
   608 
       
   609     private void doHandshake(EngineResult r, int caller) {
       
   610         int s = handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
       
   611         stateList.add(r.handshakeStatus().toString());
       
   612         stateList.add(Integer.toString(caller));
       
   613         switch (r.handshakeStatus()) {
       
   614             case NEED_TASK:
       
   615                 if ((s & DOING_TASKS) > 0) // someone else was doing tasks
       
   616                     return;
       
   617                 List<Runnable> tasks = obtainTasks();
       
   618                 executeTasks(tasks);
       
   619                 break;
       
   620             case NEED_WRAP:
       
   621                 writer.addData(HS_TRIGGER);
       
   622                 break;
       
   623             case NEED_UNWRAP:
       
   624             case NEED_UNWRAP_AGAIN:
       
   625                 // do nothing else
       
   626                 break;
       
   627             default:
       
   628                 throw new InternalError("Unexpected handshake status:"
       
   629                                         + r.handshakeStatus());
       
   630         }
       
   631     }
       
   632 
       
   633     private List<Runnable> obtainTasks() {
       
   634         List<Runnable> l = new ArrayList<>();
       
   635         Runnable r;
       
   636         while ((r = engine.getDelegatedTask()) != null) {
       
   637             l.add(r);
       
   638         }
       
   639         return l;
       
   640     }
       
   641 
       
   642     private void executeTasks(List<Runnable> tasks) {
       
   643         exec.execute(() -> {
       
   644             handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
       
   645             try {
       
   646                 tasks.forEach((r) -> {
       
   647                     r.run();
       
   648                 });
       
   649             } catch (Throwable t) {
       
   650                 handleError(t);
       
   651             }
       
   652             handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
       
   653             writer.addData(HS_TRIGGER);
       
   654             resumeActivity();
       
   655         });
       
   656     }
       
   657 
       
   658 
       
   659     EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
       
   660         ByteBuffer dst = getAppBuffer();
       
   661         while (true) {
       
   662             SSLEngineResult sslResult = engine.unwrap(src, dst);
       
   663             switch (sslResult.getStatus()) {
       
   664                 case BUFFER_OVERFLOW:
       
   665                     // may happen only if app size buffer was changed.
       
   666                     // get it again if app buffer size changed
       
   667                     int appSize = engine.getSession().getApplicationBufferSize();
       
   668                     ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
       
   669                     dst.flip();
       
   670                     b.put(dst);
       
   671                     dst = b;
       
   672                     break;
       
   673                 case CLOSED:
       
   674                     doClosure();
       
   675                     return new EngineResult(sslResult);
       
   676                 case BUFFER_UNDERFLOW:
       
   677                     // handled implicitly by compaction/reallocation of readBuf
       
   678                     return new EngineResult(sslResult);
       
   679                 case OK:
       
   680                      dst.flip();
       
   681                      return new EngineResult(sslResult, dst);
       
   682             }
       
   683         }
       
   684     }
       
   685 
       
   686     // TODO: acknowledge a received CLOSE request from peer
       
   687     void doClosure() throws IOException {
       
   688         //while (!wrapAndSend(emptyArray))
       
   689             //;
       
   690     }
       
   691 
       
   692     /**
       
   693      * Returns the upstream Flow.Subscriber of the reading (incoming) side.
       
   694      * This flow must be given the encrypted data read from upstream (eg socket)
       
   695      * before it is decrypted.
       
   696      */
       
   697     public Flow.Subscriber<List<ByteBuffer>> upstreamReader() {
       
   698         return reader;
       
   699     }
       
   700 
       
   701     /**
       
   702      * Returns the upstream Flow.Subscriber of the writing (outgoing) side.
       
   703      * This flow contains the plaintext data before it is encrypted.
       
   704      */
       
   705     public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() {
       
   706         return writer;
       
   707     }
       
   708 
       
   709     public void resumeReader() {
       
   710         reader.schedule();
       
   711     }
       
   712 
       
   713     public void resetReaderDemand() {
       
   714         reader.resetDownstreamDemand();
       
   715     }
       
   716 
       
   717     static class EngineResult {
       
   718         final SSLEngineResult result;
       
   719         final ByteBuffer destBuffer;
       
   720 
       
   721         // normal result
       
   722         EngineResult(SSLEngineResult result) {
       
   723             this(result, null);
       
   724         }
       
   725 
       
   726         EngineResult(SSLEngineResult result, ByteBuffer destBuffer) {
       
   727             this.result = result;
       
   728             this.destBuffer = destBuffer;
       
   729         }
       
   730 
       
   731         // Special result used to trigger handshaking in constructor
       
   732         static EngineResult INIT =
       
   733             new EngineResult(
       
   734                 new SSLEngineResult(SSLEngineResult.Status.OK, HandshakeStatus.NEED_WRAP, 0, 0));
       
   735 
       
   736         boolean handshaking() {
       
   737             HandshakeStatus s = result.getHandshakeStatus();
       
   738             return s != HandshakeStatus.FINISHED
       
   739                    && s != HandshakeStatus.NOT_HANDSHAKING
       
   740                    && result.getStatus() != Status.CLOSED;
       
   741         }
       
   742 
       
   743         boolean needUnwrap() {
       
   744             HandshakeStatus s = result.getHandshakeStatus();
       
   745             return s == HandshakeStatus.NEED_UNWRAP;
       
   746         }
       
   747 
       
   748 
       
   749         int bytesConsumed() {
       
   750             return result.bytesConsumed();
       
   751         }
       
   752 
       
   753         int bytesProduced() {
       
   754             return result.bytesProduced();
       
   755         }
       
   756 
       
   757         SSLEngineResult.HandshakeStatus handshakeStatus() {
       
   758             return result.getHandshakeStatus();
       
   759         }
       
   760 
       
   761         SSLEngineResult.Status status() {
       
   762             return result.getStatus();
       
   763         }
       
   764     }
       
   765 
       
   766     public ByteBuffer getNetBuffer() {
       
   767         return ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
       
   768     }
       
   769 
       
   770     private ByteBuffer getAppBuffer() {
       
   771         return ByteBuffer.allocate(engine.getSession().getApplicationBufferSize());
       
   772     }
       
   773 
       
   774     final String dbgString() {
       
   775         return "SSLFlowDelegate(" + tubeName + ")";
       
   776     }
       
   777 
       
   778     @SuppressWarnings("fallthrough")
       
   779     EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
       
   780         debug.log(Level.DEBUG, () -> "wrapping "
       
   781                     + Utils.remaining(src) + " bytes");
       
   782         ByteBuffer dst = getNetBuffer();
       
   783         while (true) {
       
   784             SSLEngineResult sslResult = engine.wrap(src, dst);
       
   785             debug.log(Level.DEBUG, () -> "SSLResult: " + sslResult);
       
   786             switch (sslResult.getStatus()) {
       
   787                 case BUFFER_OVERFLOW:
       
   788                     // Shouldn't happen. We allocated buffer with packet size
       
   789                     // get it again if net buffer size was changed
       
   790                     debug.log(Level.DEBUG, "BUFFER_OVERFLOW");
       
   791                     int appSize = engine.getSession().getApplicationBufferSize();
       
   792                     ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
       
   793                     dst.flip();
       
   794                     b.put(dst);
       
   795                     dst = b;
       
   796                     break; // try again
       
   797                 case CLOSED:
       
   798                     debug.log(Level.DEBUG, "CLOSED");
       
   799                     // fallthrough. There could be some remaining data in dst.
       
   800                     // CLOSED will be handled by the caller.
       
   801                 case OK:
       
   802                     dst.flip();
       
   803                     final ByteBuffer dest = dst;
       
   804                     debug.log(Level.DEBUG, () -> "OK => produced: "
       
   805                                            + dest.remaining()
       
   806                                            + " not wrapped: "
       
   807                                            + Utils.remaining(src));
       
   808                     return new EngineResult(sslResult, dest);
       
   809                 case BUFFER_UNDERFLOW:
       
   810                     // Shouldn't happen.  Doesn't returns when wrap()
       
   811                     // underflow handled externally
       
   812                     // assert false : "Buffer Underflow";
       
   813                     debug.log(Level.DEBUG, "BUFFER_UNDERFLOW");
       
   814                     return new EngineResult(sslResult);
       
   815                 default:
       
   816                     debug.log(Level.DEBUG, "ASSERT");
       
   817                     assert false;
       
   818             }
       
   819         }
       
   820     }
       
   821 }