src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56089 42208b2f224e
child 56101 983e338eeb50
equal deleted inserted replaced
56091:aedd6133e7a0 56092:fd85b2bf2b0d
       
     1 /*
       
     2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.internal.net.http.common;
       
    27 
       
    28 import jdk.internal.net.http.common.SubscriberWrapper.SchedulingAction;
       
    29 
       
    30 import javax.net.ssl.SSLEngine;
       
    31 import javax.net.ssl.SSLEngineResult;
       
    32 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
       
    33 import javax.net.ssl.SSLEngineResult.Status;
       
    34 import javax.net.ssl.SSLException;
       
    35 import java.io.IOException;
       
    36 import java.lang.System.Logger.Level;
       
    37 import java.nio.ByteBuffer;
       
    38 import java.util.ArrayList;
       
    39 import java.util.Collections;
       
    40 import java.util.Iterator;
       
    41 import java.util.LinkedList;
       
    42 import java.util.List;
       
    43 import java.util.concurrent.CompletableFuture;
       
    44 import java.util.concurrent.ConcurrentLinkedQueue;
       
    45 import java.util.concurrent.Executor;
       
    46 import java.util.concurrent.Flow;
       
    47 import java.util.concurrent.Flow.Subscriber;
       
    48 import java.util.concurrent.atomic.AtomicInteger;
       
    49 
       
    50 /**
       
    51  * Implements SSL using two SubscriberWrappers.
       
    52  *
       
    53  * <p> Constructor takes two Flow.Subscribers: one that receives the network
       
    54  * data (after it has been encrypted by SSLFlowDelegate) data, and one that
       
    55  * receives the application data (before it has been encrypted by SSLFlowDelegate).
       
    56  *
       
    57  * <p> Methods upstreamReader() and upstreamWriter() return the corresponding
       
    58  * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data.
       
    59  * See diagram below.
       
    60  *
       
    61  * <p> How Flow.Subscribers are used in this class, and where they come from:
       
    62  * <pre>
       
    63  * {@code
       
    64  *
       
    65  *
       
    66  *
       
    67  * --------->  data flow direction
       
    68  *
       
    69  *
       
    70  *                         +------------------+
       
    71  *        upstreamWriter   |                  | downWriter
       
    72  *        ---------------> |                  | ------------>
       
    73  *  obtained from this     |                  | supplied to constructor
       
    74  *                         | SSLFlowDelegate  |
       
    75  *        downReader       |                  | upstreamReader
       
    76  *        <--------------- |                  | <--------------
       
    77  * supplied to constructor |                  | obtained from this
       
    78  *                         +------------------+
       
    79  * }
       
    80  * </pre>
       
    81  */
       
    82 public class SSLFlowDelegate {
       
    83 
       
    84     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    85     final System.Logger debug =
       
    86             Utils.getDebugLogger(this::dbgString, DEBUG);
       
    87 
       
    88     final Executor exec;
       
    89     final Reader reader;
       
    90     final Writer writer;
       
    91     final SSLEngine engine;
       
    92     final String tubeName; // hack
       
    93     private final CompletableFuture<Void> cf;
       
    94     final CompletableFuture<String> alpnCF; // completes on initial handshake
       
    95     final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
       
    96     volatile boolean close_notify_received;
       
    97 
       
    98     /**
       
    99      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
       
   100      * Flow.Subscriber requires an associated {@link CompletableFuture}
       
   101      * for errors that need to be signaled from downstream to upstream.
       
   102      */
       
   103     public SSLFlowDelegate(SSLEngine engine,
       
   104                            Executor exec,
       
   105                            Subscriber<? super List<ByteBuffer>> downReader,
       
   106                            Subscriber<? super List<ByteBuffer>> downWriter)
       
   107     {
       
   108         this.tubeName = String.valueOf(downWriter);
       
   109         this.reader = new Reader();
       
   110         this.writer = new Writer();
       
   111         this.engine = engine;
       
   112         this.exec = exec;
       
   113         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
       
   114         CompletableFuture<Void> cs = CompletableFuture.allOf(
       
   115                 reader.completion(), writer.completion()).thenRun(this::normalStop);
       
   116         this.cf = MinimalFuture.of(cs);
       
   117         this.alpnCF = new MinimalFuture<>();
       
   118 
       
   119         // connect the Reader to the downReader and the
       
   120         // Writer to the downWriter.
       
   121         connect(downReader, downWriter);
       
   122 
       
   123         //Monitor.add(this::monitor);
       
   124     }
       
   125 
       
   126     /**
       
   127      * Returns true if the SSLFlowDelegate has detected a TLS
       
   128      * close_notify from the server.
       
   129      * @return true, if a close_notify was detected.
       
   130      */
       
   131     public boolean closeNotifyReceived() {
       
   132         return close_notify_received;
       
   133     }
       
   134 
       
   135     /**
       
   136      * Connects the read sink (downReader) to the SSLFlowDelegate Reader,
       
   137      * and the write sink (downWriter) to the SSLFlowDelegate Writer.
       
   138      * Called from within the constructor. Overwritten by SSLTube.
       
   139      *
       
   140      * @param downReader  The left hand side read sink (typically, the
       
   141      *                    HttpConnection read subscriber).
       
   142      * @param downWriter  The right hand side write sink (typically
       
   143      *                    the SocketTube write subscriber).
       
   144      */
       
   145     void connect(Subscriber<? super List<ByteBuffer>> downReader,
       
   146                  Subscriber<? super List<ByteBuffer>> downWriter) {
       
   147         this.reader.subscribe(downReader);
       
   148         this.writer.subscribe(downWriter);
       
   149     }
       
   150 
       
   151    /**
       
   152     * Returns a CompletableFuture<String> which completes after
       
   153     * the initial handshake completes, and which contains the negotiated
       
   154     * alpn.
       
   155     */
       
   156     public CompletableFuture<String> alpn() {
       
   157         return alpnCF;
       
   158     }
       
   159 
       
   160     private void setALPN() {
       
   161         // Handshake is finished. So, can retrieve the ALPN now
       
   162         if (alpnCF.isDone())
       
   163             return;
       
   164         String alpn = engine.getApplicationProtocol();
       
   165         debug.log(Level.DEBUG, "setALPN = %s", alpn);
       
   166         alpnCF.complete(alpn);
       
   167     }
       
   168 
       
   169     public String monitor() {
       
   170         StringBuilder sb = new StringBuilder();
       
   171         sb.append("SSL: HS state: " + states(handshakeState));
       
   172         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
       
   173         sb.append(" LL : ");
       
   174         synchronized(stateList) {
       
   175             for (String s: stateList) {
       
   176                 sb.append(s).append(" ");
       
   177             }
       
   178         }
       
   179         sb.append("\r\n");
       
   180         sb.append("Reader:: ").append(reader.toString());
       
   181         sb.append("\r\n");
       
   182         sb.append("Writer:: ").append(writer.toString());
       
   183         sb.append("\r\n===================================");
       
   184         return sb.toString();
       
   185     }
       
   186 
       
   187     protected SchedulingAction enterReadScheduling() {
       
   188         return SchedulingAction.CONTINUE;
       
   189     }
       
   190 
       
   191 
       
   192     /**
       
   193      * Processing function for incoming data. Pass it thru SSLEngine.unwrap().
       
   194      * Any decrypted buffers returned to be passed downstream.
       
   195      * Status codes:
       
   196      *     NEED_UNWRAP: do nothing. Following incoming data will contain
       
   197      *                  any required handshake data
       
   198      *     NEED_WRAP: call writer.addData() with empty buffer
       
   199      *     NEED_TASK: delegate task to executor
       
   200      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap
       
   201      *     BUFFER_UNDERFLOW: keep buffer and wait for more data
       
   202      *     OK: return generated buffers.
       
   203      *
       
   204      * Upstream subscription strategy is to try and keep no more than
       
   205      * TARGET_BUFSIZE bytes in readBuf
       
   206      */
       
   207     class Reader extends SubscriberWrapper {
       
   208         final SequentialScheduler scheduler;
       
   209         static final int TARGET_BUFSIZE = 16 * 1024;
       
   210         volatile ByteBuffer readBuf;
       
   211         volatile boolean completing = false;
       
   212         final Object readBufferLock = new Object();
       
   213         final System.Logger debugr =
       
   214             Utils.getDebugLogger(this::dbgString, DEBUG);
       
   215 
       
   216         class ReaderDownstreamPusher implements Runnable {
       
   217             @Override public void run() { processData(); }
       
   218         }
       
   219 
       
   220         Reader() {
       
   221             super();
       
   222             scheduler = SequentialScheduler.synchronizedScheduler(
       
   223                                                 new ReaderDownstreamPusher());
       
   224             this.readBuf = ByteBuffer.allocate(1024);
       
   225             readBuf.limit(0); // keep in read mode
       
   226         }
       
   227 
       
   228         protected SchedulingAction enterScheduling() {
       
   229             return enterReadScheduling();
       
   230         }
       
   231 
       
   232         public final String dbgString() {
       
   233             return "SSL Reader(" + tubeName + ")";
       
   234         }
       
   235 
       
   236         /**
       
   237          * entry point for buffers delivered from upstream Subscriber
       
   238          */
       
   239         @Override
       
   240         public void incoming(List<ByteBuffer> buffers, boolean complete) {
       
   241             debugr.log(Level.DEBUG, () -> "Adding " + Utils.remaining(buffers)
       
   242                         + " bytes to read buffer");
       
   243             addToReadBuf(buffers, complete);
       
   244             scheduler.runOrSchedule();
       
   245         }
       
   246 
       
   247         @Override
       
   248         public String toString() {
       
   249             return "READER: " + super.toString() + " readBuf: " + readBuf.toString()
       
   250                     + " count: " + count.toString();
       
   251         }
       
   252 
       
   253         private void reallocReadBuf() {
       
   254             int sz = readBuf.capacity();
       
   255             ByteBuffer newb = ByteBuffer.allocate(sz*2);
       
   256             readBuf.flip();
       
   257             Utils.copy(readBuf, newb);
       
   258             readBuf = newb;
       
   259         }
       
   260 
       
   261         @Override
       
   262         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
       
   263             if (readBuf.remaining() > TARGET_BUFSIZE) {
       
   264                 return 0;
       
   265             } else {
       
   266                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
       
   267             }
       
   268         }
       
   269 
       
   270         // readBuf is kept ready for reading outside of this method
       
   271         private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
       
   272             synchronized (readBufferLock) {
       
   273                 for (ByteBuffer buf : buffers) {
       
   274                     readBuf.compact();
       
   275                     while (readBuf.remaining() < buf.remaining())
       
   276                         reallocReadBuf();
       
   277                     readBuf.put(buf);
       
   278                     readBuf.flip();
       
   279                 }
       
   280                 if (complete) {
       
   281                     this.completing = complete;
       
   282                 }
       
   283             }
       
   284         }
       
   285 
       
   286         void schedule() {
       
   287             scheduler.runOrSchedule();
       
   288         }
       
   289 
       
   290         void stop() {
       
   291             debugr.log(Level.DEBUG, "stop");
       
   292             scheduler.stop();
       
   293         }
       
   294 
       
   295         AtomicInteger count = new AtomicInteger(0);
       
   296 
       
   297         // work function where it all happens
       
   298         void processData() {
       
   299             try {
       
   300                 debugr.log(Level.DEBUG, () -> "processData: " + readBuf.remaining()
       
   301                            + " bytes to unwrap "
       
   302                            + states(handshakeState)
       
   303                            + ", " + engine.getHandshakeStatus());
       
   304                 int len;
       
   305                 boolean complete = false;
       
   306                 while ((len = readBuf.remaining()) > 0) {
       
   307                     boolean handshaking = false;
       
   308                     try {
       
   309                         EngineResult result;
       
   310                         synchronized (readBufferLock) {
       
   311                             complete = this.completing;
       
   312                             result = unwrapBuffer(readBuf);
       
   313                             debugr.log(Level.DEBUG, "Unwrapped: %s", result.result);
       
   314                         }
       
   315                         if (result.bytesProduced() > 0) {
       
   316                             debugr.log(Level.DEBUG, "sending %d", result.bytesProduced());
       
   317                             count.addAndGet(result.bytesProduced());
       
   318                             outgoing(result.destBuffer, false);
       
   319                         }
       
   320                         if (result.status() == Status.BUFFER_UNDERFLOW) {
       
   321                             debugr.log(Level.DEBUG, "BUFFER_UNDERFLOW");
       
   322                             // not enough data in the read buffer...
       
   323                             requestMore();
       
   324                             synchronized (readBufferLock) {
       
   325                                 // check if we have received some data
       
   326                                 if (readBuf.remaining() > len) continue;
       
   327                                 return;
       
   328                             }
       
   329                         }
       
   330                         if (complete && result.status() == Status.CLOSED) {
       
   331                             debugr.log(Level.DEBUG, "Closed: completing");
       
   332                             outgoing(Utils.EMPTY_BB_LIST, true);
       
   333                             return;
       
   334                         }
       
   335                         if (result.handshaking() && !complete) {
       
   336                             debugr.log(Level.DEBUG, "handshaking");
       
   337                             doHandshake(result, READER);
       
   338                             resumeActivity();
       
   339                             handshaking = true;
       
   340                         } else {
       
   341                             if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
       
   342                                 setALPN();
       
   343                                 handshaking = false;
       
   344                                 resumeActivity();
       
   345                             }
       
   346                         }
       
   347                     } catch (IOException ex) {
       
   348                         errorCommon(ex);
       
   349                         handleError(ex);
       
   350                     }
       
   351                     if (handshaking && !complete)
       
   352                         return;
       
   353                 }
       
   354                 if (!complete) {
       
   355                     synchronized (readBufferLock) {
       
   356                         complete = this.completing && !readBuf.hasRemaining();
       
   357                     }
       
   358                 }
       
   359                 if (complete) {
       
   360                     debugr.log(Level.DEBUG, "completing");
       
   361                     // Complete the alpnCF, if not already complete, regardless of
       
   362                     // whether or not the ALPN is available, there will be no more
       
   363                     // activity.
       
   364                     setALPN();
       
   365                     outgoing(Utils.EMPTY_BB_LIST, true);
       
   366                 }
       
   367             } catch (Throwable ex) {
       
   368                 errorCommon(ex);
       
   369                 handleError(ex);
       
   370             }
       
   371         }
       
   372     }
       
   373 
       
   374     /**
       
   375      * Returns a CompletableFuture which completes after all activity
       
   376      * in the delegate is terminated (whether normally or exceptionally).
       
   377      *
       
   378      * @return
       
   379      */
       
   380     public CompletableFuture<Void> completion() {
       
   381         return cf;
       
   382     }
       
   383 
       
   384     public interface Monitorable {
       
   385         public String getInfo();
       
   386     }
       
   387 
       
   388     public static class Monitor extends Thread {
       
   389         final List<Monitorable> list;
       
   390         static Monitor themon;
       
   391 
       
   392         static {
       
   393             themon = new Monitor();
       
   394             themon.start(); // uncomment to enable Monitor
       
   395         }
       
   396 
       
   397         Monitor() {
       
   398             super("Monitor");
       
   399             setDaemon(true);
       
   400             list = Collections.synchronizedList(new LinkedList<>());
       
   401         }
       
   402 
       
   403         void addTarget(Monitorable o) {
       
   404             list.add(o);
       
   405         }
       
   406 
       
   407         public static void add(Monitorable o) {
       
   408             themon.addTarget(o);
       
   409         }
       
   410 
       
   411         @Override
       
   412         public void run() {
       
   413             System.out.println("Monitor starting");
       
   414             while (true) {
       
   415                 try {Thread.sleep(20*1000); } catch (Exception e) {}
       
   416                 synchronized (list) {
       
   417                     for (Monitorable o : list) {
       
   418                         System.out.println(o.getInfo());
       
   419                         System.out.println("-------------------------");
       
   420                     }
       
   421                 }
       
   422                 System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-");
       
   423 
       
   424             }
       
   425         }
       
   426     }
       
   427 
       
   428     /**
       
   429      * Processing function for outgoing data. Pass it thru SSLEngine.wrap()
       
   430      * Any encrypted buffers generated are passed downstream to be written.
       
   431      * Status codes:
       
   432      *     NEED_UNWRAP: call reader.addData() with empty buffer
       
   433      *     NEED_WRAP: call addData() with empty buffer
       
   434      *     NEED_TASK: delegate task to executor
       
   435      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap
       
   436      *     BUFFER_UNDERFLOW: shouldn't happen on writing side
       
   437      *     OK: return generated buffers
       
   438      */
       
   439     class Writer extends SubscriberWrapper {
       
   440         final SequentialScheduler scheduler;
       
   441         // queues of buffers received from upstream waiting
       
   442         // to be processed by the SSLEngine
       
   443         final List<ByteBuffer> writeList;
       
   444         final System.Logger debugw =
       
   445             Utils.getDebugLogger(this::dbgString, DEBUG);
       
   446         volatile boolean completing;
       
   447         boolean completed; // only accessed in processData
       
   448 
       
   449         class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
       
   450             @Override public void run() { processData(); }
       
   451         }
       
   452 
       
   453         Writer() {
       
   454             super();
       
   455             writeList = Collections.synchronizedList(new LinkedList<>());
       
   456             scheduler = new SequentialScheduler(new WriterDownstreamPusher());
       
   457         }
       
   458 
       
   459         @Override
       
   460         protected void incoming(List<ByteBuffer> buffers, boolean complete) {
       
   461             assert complete ? buffers ==  Utils.EMPTY_BB_LIST : true;
       
   462             assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
       
   463             if (complete) {
       
   464                 debugw.log(Level.DEBUG, "adding SENTINEL");
       
   465                 completing = true;
       
   466                 writeList.add(SENTINEL);
       
   467             } else {
       
   468                 writeList.addAll(buffers);
       
   469             }
       
   470             debugw.log(Level.DEBUG, () -> "added " + buffers.size()
       
   471                         + " (" + Utils.remaining(buffers)
       
   472                         + " bytes) to the writeList");
       
   473             scheduler.runOrSchedule();
       
   474         }
       
   475 
       
   476         public final String dbgString() {
       
   477             return "SSL Writer(" + tubeName + ")";
       
   478         }
       
   479 
       
   480         protected void onSubscribe() {
       
   481             doHandshake(EngineResult.INIT, INIT);
       
   482             resumeActivity();
       
   483         }
       
   484 
       
   485         void schedule() {
       
   486             scheduler.runOrSchedule();
       
   487         }
       
   488 
       
   489         void stop() {
       
   490             debugw.log(Level.DEBUG, "stop");
       
   491             scheduler.stop();
       
   492         }
       
   493 
       
   494         @Override
       
   495         public boolean closing() {
       
   496             return closeNotifyReceived();
       
   497         }
       
   498 
       
   499         private boolean isCompleting() {
       
   500             return completing;
       
   501         }
       
   502 
       
   503         @Override
       
   504         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
       
   505             if (writeList.size() > 10)
       
   506                 return 0;
       
   507             else
       
   508                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
       
   509         }
       
   510 
       
   511         private boolean hsTriggered() {
       
   512             synchronized(writeList) {
       
   513                 for (ByteBuffer b : writeList)
       
   514                     if (b == HS_TRIGGER)
       
   515                         return true;
       
   516                 return false;
       
   517             }
       
   518         }
       
   519 
       
   520         private void processData() {
       
   521             boolean completing = isCompleting();
       
   522 
       
   523             try {
       
   524                 debugw.log(Level.DEBUG, () -> "processData(" + Utils.remaining(writeList) + ")");
       
   525                 while (Utils.remaining(writeList) > 0 || hsTriggered()
       
   526                         || needWrap()) {
       
   527                     ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
       
   528                     EngineResult result = wrapBuffers(outbufs);
       
   529                     debugw.log(Level.DEBUG, "wrapBuffer returned %s", result.result);
       
   530 
       
   531                     if (result.status() == Status.CLOSED) {
       
   532                         if (result.bytesProduced() <= 0)
       
   533                             return;
       
   534 
       
   535                         if (!completing && !completed) {
       
   536                             completing = this.completing = true;
       
   537                             // There could still be some outgoing data in outbufs.
       
   538                             writeList.add(SENTINEL);
       
   539                         }
       
   540                     }
       
   541 
       
   542                     boolean handshaking = false;
       
   543                     if (result.handshaking()) {
       
   544                         debugw.log(Level.DEBUG, "handshaking");
       
   545                         doHandshake(result, WRITER);
       
   546                         handshaking = true;
       
   547                     } else {
       
   548                         if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
       
   549                             setALPN();
       
   550                             resumeActivity();
       
   551                         }
       
   552                     }
       
   553                     cleanList(writeList); // tidy up the source list
       
   554                     sendResultBytes(result);
       
   555                     if (handshaking && !completing) {
       
   556                         if (writeList.isEmpty() && !result.needUnwrap()) {
       
   557                             writer.addData(HS_TRIGGER);
       
   558                         }
       
   559                         if (needWrap()) continue;
       
   560                         return;
       
   561                     }
       
   562                 }
       
   563                 if (completing && Utils.remaining(writeList) == 0) {
       
   564                     /*
       
   565                     System.out.println("WRITER DOO 3");
       
   566                     engine.closeOutbound();
       
   567                     EngineResult result = wrapBuffers(Utils.EMPTY_BB_ARRAY);
       
   568                     sendResultBytes(result);
       
   569                     */
       
   570                     if (!completed) {
       
   571                         completed = true;
       
   572                         writeList.clear();
       
   573                         outgoing(Utils.EMPTY_BB_LIST, true);
       
   574                     }
       
   575                     return;
       
   576                 }
       
   577                 if (writeList.isEmpty() && needWrap()) {
       
   578                     writer.addData(HS_TRIGGER);
       
   579                 }
       
   580             } catch (Throwable ex) {
       
   581                 errorCommon(ex);
       
   582                 handleError(ex);
       
   583             }
       
   584         }
       
   585 
       
   586         private boolean needWrap() {
       
   587             return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP;
       
   588         }
       
   589 
       
   590         private void sendResultBytes(EngineResult result) {
       
   591             if (result.bytesProduced() > 0) {
       
   592                 debugw.log(Level.DEBUG, "Sending %d bytes downstream",
       
   593                            result.bytesProduced());
       
   594                 outgoing(result.destBuffer, false);
       
   595             }
       
   596         }
       
   597 
       
   598         @Override
       
   599         public String toString() {
       
   600             return "WRITER: " + super.toString() +
       
   601                     " writeList size " + Integer.toString(writeList.size());
       
   602                     //" writeList: " + writeList.toString();
       
   603         }
       
   604     }
       
   605 
       
   606     private void handleError(Throwable t) {
       
   607         debug.log(Level.DEBUG, "handleError", t);
       
   608         cf.completeExceptionally(t);
       
   609         // no-op if already completed
       
   610         alpnCF.completeExceptionally(t);
       
   611         reader.stop();
       
   612         writer.stop();
       
   613     }
       
   614 
       
   615     private void normalStop() {
       
   616         reader.stop();
       
   617         writer.stop();
       
   618     }
       
   619 
       
   620     private void cleanList(List<ByteBuffer> l) {
       
   621         synchronized (l) {
       
   622             Iterator<ByteBuffer> iter = l.iterator();
       
   623             while (iter.hasNext()) {
       
   624                 ByteBuffer b = iter.next();
       
   625                 if (!b.hasRemaining() && b != SENTINEL) {
       
   626                     iter.remove();
       
   627                 }
       
   628             }
       
   629         }
       
   630     }
       
   631 
       
   632     /**
       
   633      * States for handshake. We avoid races when accessing/updating the AtomicInt
       
   634      * because updates always schedule an additional call to both the read()
       
   635      * and write() functions.
       
   636      */
       
   637     private static final int NOT_HANDSHAKING = 0;
       
   638     private static final int HANDSHAKING = 1;
       
   639     private static final int INIT = 2;
       
   640     private static final int DOING_TASKS = 4; // bit added to above state
       
   641     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
       
   642 
       
   643     private static final int READER = 1;
       
   644     private static final int WRITER = 2;
       
   645 
       
   646     private static String states(AtomicInteger state) {
       
   647         int s = state.get();
       
   648         StringBuilder sb = new StringBuilder();
       
   649         int x = s & ~DOING_TASKS;
       
   650         switch (x) {
       
   651             case NOT_HANDSHAKING:
       
   652                 sb.append(" NOT_HANDSHAKING ");
       
   653                 break;
       
   654             case HANDSHAKING:
       
   655                 sb.append(" HANDSHAKING ");
       
   656                 break;
       
   657             case INIT:
       
   658                 sb.append(" INIT ");
       
   659                 break;
       
   660             default:
       
   661                 throw new InternalError();
       
   662         }
       
   663         if ((s & DOING_TASKS) > 0)
       
   664             sb.append("|DOING_TASKS");
       
   665         return sb.toString();
       
   666     }
       
   667 
       
   668     private void resumeActivity() {
       
   669         reader.schedule();
       
   670         writer.schedule();
       
   671     }
       
   672 
       
   673     final AtomicInteger handshakeState;
       
   674     final ConcurrentLinkedQueue<String> stateList = new ConcurrentLinkedQueue<>();
       
   675 
       
   676     private void doHandshake(EngineResult r, int caller) {
       
   677         int s = handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
       
   678         stateList.add(r.handshakeStatus().toString());
       
   679         stateList.add(Integer.toString(caller));
       
   680         switch (r.handshakeStatus()) {
       
   681             case NEED_TASK:
       
   682                 if ((s & DOING_TASKS) > 0) // someone else was doing tasks
       
   683                     return;
       
   684                 List<Runnable> tasks = obtainTasks();
       
   685                 executeTasks(tasks);
       
   686                 break;
       
   687             case NEED_WRAP:
       
   688                 writer.addData(HS_TRIGGER);
       
   689                 break;
       
   690             case NEED_UNWRAP:
       
   691             case NEED_UNWRAP_AGAIN:
       
   692                 // do nothing else
       
   693                 break;
       
   694             default:
       
   695                 throw new InternalError("Unexpected handshake status:"
       
   696                                         + r.handshakeStatus());
       
   697         }
       
   698     }
       
   699 
       
   700     private List<Runnable> obtainTasks() {
       
   701         List<Runnable> l = new ArrayList<>();
       
   702         Runnable r;
       
   703         while ((r = engine.getDelegatedTask()) != null) {
       
   704             l.add(r);
       
   705         }
       
   706         return l;
       
   707     }
       
   708 
       
   709     private void executeTasks(List<Runnable> tasks) {
       
   710         exec.execute(() -> {
       
   711             try {
       
   712                 handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
       
   713                 List<Runnable> nextTasks = tasks;
       
   714                 do {
       
   715                     nextTasks.forEach(Runnable::run);
       
   716                     if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
       
   717                         nextTasks = obtainTasks();
       
   718                     } else {
       
   719                         break;
       
   720                     }
       
   721                 } while (true);
       
   722                 handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
       
   723                 writer.addData(HS_TRIGGER);
       
   724                 resumeActivity();
       
   725             } catch (Throwable t) {
       
   726                 handleError(t);
       
   727             }
       
   728         });
       
   729     }
       
   730 
       
   731 
       
   732     EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
       
   733         ByteBuffer dst = getAppBuffer();
       
   734         while (true) {
       
   735             SSLEngineResult sslResult = engine.unwrap(src, dst);
       
   736             switch (sslResult.getStatus()) {
       
   737                 case BUFFER_OVERFLOW:
       
   738                     // may happen only if app size buffer was changed.
       
   739                     // get it again if app buffer size changed
       
   740                     int appSize = engine.getSession().getApplicationBufferSize();
       
   741                     ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
       
   742                     dst.flip();
       
   743                     b.put(dst);
       
   744                     dst = b;
       
   745                     break;
       
   746                 case CLOSED:
       
   747                     return doClosure(new EngineResult(sslResult));
       
   748                 case BUFFER_UNDERFLOW:
       
   749                     // handled implicitly by compaction/reallocation of readBuf
       
   750                     return new EngineResult(sslResult);
       
   751                 case OK:
       
   752                      dst.flip();
       
   753                      return new EngineResult(sslResult, dst);
       
   754             }
       
   755         }
       
   756     }
       
   757 
       
   758     // FIXME: acknowledge a received CLOSE request from peer
       
   759     EngineResult doClosure(EngineResult r) throws IOException {
       
   760         debug.log(Level.DEBUG,
       
   761                 "doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]",
       
   762                 r.result, engine.getHandshakeStatus(),
       
   763                 engine.isOutboundDone(), engine.isInboundDone());
       
   764         if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
       
   765             // we have received TLS close_notify and need to send
       
   766             // an acknowledgement back. We're calling doHandshake
       
   767             // to finish the close handshake.
       
   768             if (engine.isInboundDone() && !engine.isOutboundDone()) {
       
   769                 debug.log(Level.DEBUG, "doClosure: close_notify received");
       
   770                 close_notify_received = true;
       
   771                 doHandshake(r, READER);
       
   772             }
       
   773         }
       
   774         return r;
       
   775     }
       
   776 
       
   777     /**
       
   778      * Returns the upstream Flow.Subscriber of the reading (incoming) side.
       
   779      * This flow must be given the encrypted data read from upstream (eg socket)
       
   780      * before it is decrypted.
       
   781      */
       
   782     public Flow.Subscriber<List<ByteBuffer>> upstreamReader() {
       
   783         return reader;
       
   784     }
       
   785 
       
   786     /**
       
   787      * Returns the upstream Flow.Subscriber of the writing (outgoing) side.
       
   788      * This flow contains the plaintext data before it is encrypted.
       
   789      */
       
   790     public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() {
       
   791         return writer;
       
   792     }
       
   793 
       
   794     public boolean resumeReader() {
       
   795         return reader.signalScheduling();
       
   796     }
       
   797 
       
   798     public void resetReaderDemand() {
       
   799         reader.resetDownstreamDemand();
       
   800     }
       
   801 
       
   802     static class EngineResult {
       
   803         final SSLEngineResult result;
       
   804         final ByteBuffer destBuffer;
       
   805 
       
   806         // normal result
       
   807         EngineResult(SSLEngineResult result) {
       
   808             this(result, null);
       
   809         }
       
   810 
       
   811         EngineResult(SSLEngineResult result, ByteBuffer destBuffer) {
       
   812             this.result = result;
       
   813             this.destBuffer = destBuffer;
       
   814         }
       
   815 
       
   816         // Special result used to trigger handshaking in constructor
       
   817         static EngineResult INIT =
       
   818             new EngineResult(
       
   819                 new SSLEngineResult(SSLEngineResult.Status.OK, HandshakeStatus.NEED_WRAP, 0, 0));
       
   820 
       
   821         boolean handshaking() {
       
   822             HandshakeStatus s = result.getHandshakeStatus();
       
   823             return s != HandshakeStatus.FINISHED
       
   824                    && s != HandshakeStatus.NOT_HANDSHAKING
       
   825                    && result.getStatus() != Status.CLOSED;
       
   826         }
       
   827 
       
   828         boolean needUnwrap() {
       
   829             HandshakeStatus s = result.getHandshakeStatus();
       
   830             return s == HandshakeStatus.NEED_UNWRAP;
       
   831         }
       
   832 
       
   833 
       
   834         int bytesConsumed() {
       
   835             return result.bytesConsumed();
       
   836         }
       
   837 
       
   838         int bytesProduced() {
       
   839             return result.bytesProduced();
       
   840         }
       
   841 
       
   842         SSLEngineResult.HandshakeStatus handshakeStatus() {
       
   843             return result.getHandshakeStatus();
       
   844         }
       
   845 
       
   846         SSLEngineResult.Status status() {
       
   847             return result.getStatus();
       
   848         }
       
   849     }
       
   850 
       
   851     public ByteBuffer getNetBuffer() {
       
   852         return ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
       
   853     }
       
   854 
       
   855     private ByteBuffer getAppBuffer() {
       
   856         return ByteBuffer.allocate(engine.getSession().getApplicationBufferSize());
       
   857     }
       
   858 
       
   859     final String dbgString() {
       
   860         return "SSLFlowDelegate(" + tubeName + ")";
       
   861     }
       
   862 
       
   863     @SuppressWarnings("fallthrough")
       
   864     EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
       
   865         debug.log(Level.DEBUG, () -> "wrapping "
       
   866                     + Utils.remaining(src) + " bytes");
       
   867         ByteBuffer dst = getNetBuffer();
       
   868         while (true) {
       
   869             SSLEngineResult sslResult = engine.wrap(src, dst);
       
   870             debug.log(Level.DEBUG, () -> "SSLResult: " + sslResult);
       
   871             switch (sslResult.getStatus()) {
       
   872                 case BUFFER_OVERFLOW:
       
   873                     // Shouldn't happen. We allocated buffer with packet size
       
   874                     // get it again if net buffer size was changed
       
   875                     debug.log(Level.DEBUG, "BUFFER_OVERFLOW");
       
   876                     int appSize = engine.getSession().getApplicationBufferSize();
       
   877                     ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
       
   878                     dst.flip();
       
   879                     b.put(dst);
       
   880                     dst = b;
       
   881                     break; // try again
       
   882                 case CLOSED:
       
   883                     debug.log(Level.DEBUG, "CLOSED");
       
   884                     // fallthrough. There could be some remaining data in dst.
       
   885                     // CLOSED will be handled by the caller.
       
   886                 case OK:
       
   887                     dst.flip();
       
   888                     final ByteBuffer dest = dst;
       
   889                     debug.log(Level.DEBUG, () -> "OK => produced: "
       
   890                                            + dest.remaining()
       
   891                                            + " not wrapped: "
       
   892                                            + Utils.remaining(src));
       
   893                     return new EngineResult(sslResult, dest);
       
   894                 case BUFFER_UNDERFLOW:
       
   895                     // Shouldn't happen.  Doesn't returns when wrap()
       
   896                     // underflow handled externally
       
   897                     // assert false : "Buffer Underflow";
       
   898                     debug.log(Level.DEBUG, "BUFFER_UNDERFLOW");
       
   899                     return new EngineResult(sslResult);
       
   900                 default:
       
   901                     debug.log(Level.DEBUG, "ASSERT");
       
   902                     assert false;
       
   903             }
       
   904         }
       
   905     }
       
   906 }