src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
changeset 49765 ee6f7a61f3a5
parent 48083 b1c1b4ef4be2
child 49944 4690a2871b44
child 56451 9585061fdb04
equal deleted inserted replaced
49707:f7fd051519ac 49765:ee6f7a61f3a5
       
     1 /*
       
     2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.internal.net.http.common;
       
    27 
       
    28 import jdk.internal.net.http.common.SubscriberWrapper.SchedulingAction;
       
    29 
       
    30 import javax.net.ssl.SSLEngine;
       
    31 import javax.net.ssl.SSLEngineResult;
       
    32 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
       
    33 import javax.net.ssl.SSLEngineResult.Status;
       
    34 import javax.net.ssl.SSLException;
       
    35 import java.io.IOException;
       
    36 import java.lang.System.Logger.Level;
       
    37 import java.nio.ByteBuffer;
       
    38 import java.util.ArrayList;
       
    39 import java.util.Collections;
       
    40 import java.util.Iterator;
       
    41 import java.util.LinkedList;
       
    42 import java.util.List;
       
    43 import java.util.concurrent.CompletableFuture;
       
    44 import java.util.concurrent.ConcurrentLinkedQueue;
       
    45 import java.util.concurrent.Executor;
       
    46 import java.util.concurrent.Flow;
       
    47 import java.util.concurrent.Flow.Subscriber;
       
    48 import java.util.concurrent.atomic.AtomicInteger;
       
    49 
       
    50 /**
       
    51  * Implements SSL using two SubscriberWrappers.
       
    52  *
       
    53  * <p> Constructor takes two Flow.Subscribers: one that receives the network
       
    54  * data (after it has been encrypted by SSLFlowDelegate) data, and one that
       
    55  * receives the application data (before it has been encrypted by SSLFlowDelegate).
       
    56  *
       
    57  * <p> Methods upstreamReader() and upstreamWriter() return the corresponding
       
    58  * Flow.Subscribers containing Flows for the encrypted/decrypted upstream data.
       
    59  * See diagram below.
       
    60  *
       
    61  * <p> How Flow.Subscribers are used in this class, and where they come from:
       
    62  * <pre>
       
    63  * {@code
       
    64  *
       
    65  *
       
    66  *
       
    67  * --------->  data flow direction
       
    68  *
       
    69  *
       
    70  *                         +------------------+
       
    71  *        upstreamWriter   |                  | downWriter
       
    72  *        ---------------> |                  | ------------>
       
    73  *  obtained from this     |                  | supplied to constructor
       
    74  *                         | SSLFlowDelegate  |
       
    75  *        downReader       |                  | upstreamReader
       
    76  *        <--------------- |                  | <--------------
       
    77  * supplied to constructor |                  | obtained from this
       
    78  *                         +------------------+
       
    79  *
       
    80  * Errors are reported to the downReader Flow.Subscriber
       
    81  *
       
    82  * }
       
    83  * </pre>
       
    84  */
       
    85 public class SSLFlowDelegate {
       
    86 
       
    87     final Logger debug =
       
    88             Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
       
    89 
       
    90     final Executor exec;
       
    91     final Reader reader;
       
    92     final Writer writer;
       
    93     final SSLEngine engine;
       
    94     final String tubeName; // hack
       
    95     final CompletableFuture<String> alpnCF; // completes on initial handshake
       
    96     final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
       
    97     volatile boolean close_notify_received;
       
    98     final CompletableFuture<Void> readerCF;
       
    99     final CompletableFuture<Void> writerCF;
       
   100     static AtomicInteger scount = new AtomicInteger(1);
       
   101     final int id;
       
   102 
       
   103     /**
       
   104      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
       
   105      * Flow.Subscriber requires an associated {@link CompletableFuture}
       
   106      * for errors that need to be signaled from downstream to upstream.
       
   107      */
       
   108     public SSLFlowDelegate(SSLEngine engine,
       
   109                            Executor exec,
       
   110                            Subscriber<? super List<ByteBuffer>> downReader,
       
   111                            Subscriber<? super List<ByteBuffer>> downWriter)
       
   112     {
       
   113         this.id = scount.getAndIncrement();
       
   114         this.tubeName = String.valueOf(downWriter);
       
   115         this.reader = new Reader();
       
   116         this.writer = new Writer();
       
   117         this.engine = engine;
       
   118         this.exec = exec;
       
   119         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
       
   120         this.readerCF = reader.completion();
       
   121         this.writerCF = reader.completion();
       
   122         readerCF.exceptionally(this::stopOnError);
       
   123         writerCF.exceptionally(this::stopOnError);
       
   124 
       
   125         CompletableFuture.allOf(reader.completion(), writer.completion())
       
   126             .thenRun(this::normalStop);
       
   127         this.alpnCF = new MinimalFuture<>();
       
   128 
       
   129         // connect the Reader to the downReader and the
       
   130         // Writer to the downWriter.
       
   131         connect(downReader, downWriter);
       
   132 
       
   133         //Monitor.add(this::monitor);
       
   134     }
       
   135 
       
   136     /**
       
   137      * Returns true if the SSLFlowDelegate has detected a TLS
       
   138      * close_notify from the server.
       
   139      * @return true, if a close_notify was detected.
       
   140      */
       
   141     public boolean closeNotifyReceived() {
       
   142         return close_notify_received;
       
   143     }
       
   144 
       
   145     /**
       
   146      * Connects the read sink (downReader) to the SSLFlowDelegate Reader,
       
   147      * and the write sink (downWriter) to the SSLFlowDelegate Writer.
       
   148      * Called from within the constructor. Overwritten by SSLTube.
       
   149      *
       
   150      * @param downReader  The left hand side read sink (typically, the
       
   151      *                    HttpConnection read subscriber).
       
   152      * @param downWriter  The right hand side write sink (typically
       
   153      *                    the SocketTube write subscriber).
       
   154      */
       
   155     void connect(Subscriber<? super List<ByteBuffer>> downReader,
       
   156                  Subscriber<? super List<ByteBuffer>> downWriter) {
       
   157         this.reader.subscribe(downReader);
       
   158         this.writer.subscribe(downWriter);
       
   159     }
       
   160 
       
   161    /**
       
   162     * Returns a CompletableFuture<String> which completes after
       
   163     * the initial handshake completes, and which contains the negotiated
       
   164     * alpn.
       
   165     */
       
   166     public CompletableFuture<String> alpn() {
       
   167         return alpnCF;
       
   168     }
       
   169 
       
   170     private void setALPN() {
       
   171         // Handshake is finished. So, can retrieve the ALPN now
       
   172         if (alpnCF.isDone())
       
   173             return;
       
   174         String alpn = engine.getApplicationProtocol();
       
   175         if (debug.on()) debug.log("setALPN = %s", alpn);
       
   176         alpnCF.complete(alpn);
       
   177     }
       
   178 
       
   179     public String monitor() {
       
   180         StringBuilder sb = new StringBuilder();
       
   181         sb.append("SSL: id ").append(id);
       
   182         sb.append(" HS state: " + states(handshakeState));
       
   183         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
       
   184         sb.append(" LL : ");
       
   185         for (String s: stateList) {
       
   186             sb.append(s).append(" ");
       
   187         }
       
   188         sb.append("\r\n");
       
   189         sb.append("Reader:: ").append(reader.toString());
       
   190         sb.append("\r\n");
       
   191         sb.append("Writer:: ").append(writer.toString());
       
   192         sb.append("\r\n===================================");
       
   193         return sb.toString();
       
   194     }
       
   195 
       
   196     protected SchedulingAction enterReadScheduling() {
       
   197         return SchedulingAction.CONTINUE;
       
   198     }
       
   199 
       
   200 
       
   201     /**
       
   202      * Processing function for incoming data. Pass it thru SSLEngine.unwrap().
       
   203      * Any decrypted buffers returned to be passed downstream.
       
   204      * Status codes:
       
   205      *     NEED_UNWRAP: do nothing. Following incoming data will contain
       
   206      *                  any required handshake data
       
   207      *     NEED_WRAP: call writer.addData() with empty buffer
       
   208      *     NEED_TASK: delegate task to executor
       
   209      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat unwrap
       
   210      *     BUFFER_UNDERFLOW: keep buffer and wait for more data
       
   211      *     OK: return generated buffers.
       
   212      *
       
   213      * Upstream subscription strategy is to try and keep no more than
       
   214      * TARGET_BUFSIZE bytes in readBuf
       
   215      */
       
   216     class Reader extends SubscriberWrapper {
       
   217         final SequentialScheduler scheduler;
       
   218         static final int TARGET_BUFSIZE = 16 * 1024;
       
   219         volatile ByteBuffer readBuf;
       
   220         volatile boolean completing;
       
   221         final Object readBufferLock = new Object();
       
   222         final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
       
   223 
       
   224         class ReaderDownstreamPusher implements Runnable {
       
   225             @Override public void run() { processData(); }
       
   226         }
       
   227 
       
   228         Reader() {
       
   229             super();
       
   230             scheduler = SequentialScheduler.synchronizedScheduler(
       
   231                                                 new ReaderDownstreamPusher());
       
   232             this.readBuf = ByteBuffer.allocate(1024);
       
   233             readBuf.limit(0); // keep in read mode
       
   234         }
       
   235 
       
   236         protected SchedulingAction enterScheduling() {
       
   237             return enterReadScheduling();
       
   238         }
       
   239 
       
   240         public final String dbgString() {
       
   241             return "SSL Reader(" + tubeName + ")";
       
   242         }
       
   243 
       
   244         /**
       
   245          * entry point for buffers delivered from upstream Subscriber
       
   246          */
       
   247         @Override
       
   248         public void incoming(List<ByteBuffer> buffers, boolean complete) {
       
   249             if (debugr.on())
       
   250                 debugr.log("Adding %d bytes to read buffer",
       
   251                            Utils.remaining(buffers));
       
   252             addToReadBuf(buffers, complete);
       
   253             scheduler.runOrSchedule();
       
   254         }
       
   255 
       
   256         @Override
       
   257         public String toString() {
       
   258             return "READER: " + super.toString() + " readBuf: " + readBuf.toString()
       
   259                     + " count: " + count.toString();
       
   260         }
       
   261 
       
   262         private void reallocReadBuf() {
       
   263             int sz = readBuf.capacity();
       
   264             ByteBuffer newb = ByteBuffer.allocate(sz*2);
       
   265             readBuf.flip();
       
   266             Utils.copy(readBuf, newb);
       
   267             readBuf = newb;
       
   268         }
       
   269 
       
   270         @Override
       
   271         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
       
   272             if (readBuf.remaining() > TARGET_BUFSIZE) {
       
   273                 return 0;
       
   274             } else {
       
   275                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
       
   276             }
       
   277         }
       
   278 
       
   279         // readBuf is kept ready for reading outside of this method
       
   280         private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
       
   281             synchronized (readBufferLock) {
       
   282                 for (ByteBuffer buf : buffers) {
       
   283                     readBuf.compact();
       
   284                     while (readBuf.remaining() < buf.remaining())
       
   285                         reallocReadBuf();
       
   286                     readBuf.put(buf);
       
   287                     readBuf.flip();
       
   288                 }
       
   289                 if (complete) {
       
   290                     this.completing = complete;
       
   291                 }
       
   292             }
       
   293         }
       
   294 
       
   295         void schedule() {
       
   296             scheduler.runOrSchedule();
       
   297         }
       
   298 
       
   299         void stop() {
       
   300             if (debugr.on()) debugr.log("stop");
       
   301             scheduler.stop();
       
   302         }
       
   303 
       
   304         AtomicInteger count = new AtomicInteger(0);
       
   305 
       
   306         // work function where it all happens
       
   307         void processData() {
       
   308             try {
       
   309                 if (debugr.on())
       
   310                     debugr.log("processData:"
       
   311                            + " readBuf remaining:" + readBuf.remaining()
       
   312                            + ", state:" + states(handshakeState)
       
   313                            + ", engine handshake status:" + engine.getHandshakeStatus());
       
   314                 int len;
       
   315                 boolean complete = false;
       
   316                 while ((len = readBuf.remaining()) > 0) {
       
   317                     boolean handshaking = false;
       
   318                     try {
       
   319                         EngineResult result;
       
   320                         synchronized (readBufferLock) {
       
   321                             complete = this.completing;
       
   322                             result = unwrapBuffer(readBuf);
       
   323                             if (debugr.on())
       
   324                                 debugr.log("Unwrapped: %s", result.result);
       
   325                         }
       
   326                         if (result.bytesProduced() > 0) {
       
   327                             if (debugr.on())
       
   328                                 debugr.log("sending %d", result.bytesProduced());
       
   329                             count.addAndGet(result.bytesProduced());
       
   330                             outgoing(result.destBuffer, false);
       
   331                         }
       
   332                         if (result.status() == Status.BUFFER_UNDERFLOW) {
       
   333                             if (debugr.on()) debugr.log("BUFFER_UNDERFLOW");
       
   334                             // not enough data in the read buffer...
       
   335                             requestMore();
       
   336                             synchronized (readBufferLock) {
       
   337                                 // check if we have received some data
       
   338                                 if (readBuf.remaining() > len) continue;
       
   339                                 return;
       
   340                             }
       
   341                         }
       
   342                         if (complete && result.status() == Status.CLOSED) {
       
   343                             if (debugr.on()) debugr.log("Closed: completing");
       
   344                             outgoing(Utils.EMPTY_BB_LIST, true);
       
   345                             return;
       
   346                         }
       
   347                         if (result.handshaking() && !complete) {
       
   348                             if (debugr.on()) debugr.log("handshaking");
       
   349                             if (doHandshake(result, READER)) {
       
   350                                 resumeActivity();
       
   351                             }
       
   352                             handshaking = true;
       
   353                         } else {
       
   354                             if ((handshakeState.getAndSet(NOT_HANDSHAKING)& ~DOING_TASKS) == HANDSHAKING) {
       
   355                                 setALPN();
       
   356                                 handshaking = false;
       
   357                                 resumeActivity();
       
   358                             }
       
   359                         }
       
   360                     } catch (IOException ex) {
       
   361                         errorCommon(ex);
       
   362                         handleError(ex);
       
   363                     }
       
   364                     if (handshaking && !complete)
       
   365                         return;
       
   366                 }
       
   367                 if (!complete) {
       
   368                     synchronized (readBufferLock) {
       
   369                         complete = this.completing && !readBuf.hasRemaining();
       
   370                     }
       
   371                 }
       
   372                 if (complete) {
       
   373                     if (debugr.on()) debugr.log("completing");
       
   374                     // Complete the alpnCF, if not already complete, regardless of
       
   375                     // whether or not the ALPN is available, there will be no more
       
   376                     // activity.
       
   377                     setALPN();
       
   378                     outgoing(Utils.EMPTY_BB_LIST, true);
       
   379                 }
       
   380             } catch (Throwable ex) {
       
   381                 errorCommon(ex);
       
   382                 handleError(ex);
       
   383             }
       
   384         }
       
   385 
       
   386         EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
       
   387             ByteBuffer dst = getAppBuffer();
       
   388             while (true) {
       
   389                 SSLEngineResult sslResult = engine.unwrap(src, dst);
       
   390                 switch (sslResult.getStatus()) {
       
   391                     case BUFFER_OVERFLOW:
       
   392                         // may happen only if app size buffer was changed.
       
   393                         // get it again if app buffer size changed
       
   394                         int appSize = engine.getSession().getApplicationBufferSize();
       
   395                         ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
       
   396                         dst.flip();
       
   397                         b.put(dst);
       
   398                         dst = b;
       
   399                         break;
       
   400                     case CLOSED:
       
   401                         return doClosure(new EngineResult(sslResult));
       
   402                     case BUFFER_UNDERFLOW:
       
   403                         // handled implicitly by compaction/reallocation of readBuf
       
   404                         return new EngineResult(sslResult);
       
   405                     case OK:
       
   406                         dst.flip();
       
   407                         return new EngineResult(sslResult, dst);
       
   408                 }
       
   409             }
       
   410         }
       
   411     }
       
   412 
       
   413     public interface Monitorable {
       
   414         public String getInfo();
       
   415     }
       
   416 
       
   417     public static class Monitor extends Thread {
       
   418         final List<Monitorable> list;
       
   419         static Monitor themon;
       
   420 
       
   421         static {
       
   422             themon = new Monitor();
       
   423             themon.start(); // uncomment to enable Monitor
       
   424         }
       
   425 
       
   426         Monitor() {
       
   427             super("Monitor");
       
   428             setDaemon(true);
       
   429             list = Collections.synchronizedList(new LinkedList<>());
       
   430         }
       
   431 
       
   432         void addTarget(Monitorable o) {
       
   433             list.add(o);
       
   434         }
       
   435 
       
   436         public static void add(Monitorable o) {
       
   437             themon.addTarget(o);
       
   438         }
       
   439 
       
   440         @Override
       
   441         public void run() {
       
   442             System.out.println("Monitor starting");
       
   443             try {
       
   444                 while (true) {
       
   445                     Thread.sleep(20 * 1000);
       
   446                     synchronized (list) {
       
   447                         for (Monitorable o : list) {
       
   448                             System.out.println(o.getInfo());
       
   449                             System.out.println("-------------------------");
       
   450                         }
       
   451                     }
       
   452                     System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-");
       
   453                 }
       
   454             } catch (InterruptedException e) {
       
   455                 System.out.println("Monitor exiting with " + e);
       
   456             }
       
   457         }
       
   458     }
       
   459 
       
   460     /**
       
   461      * Processing function for outgoing data. Pass it thru SSLEngine.wrap()
       
   462      * Any encrypted buffers generated are passed downstream to be written.
       
   463      * Status codes:
       
   464      *     NEED_UNWRAP: call reader.addData() with empty buffer
       
   465      *     NEED_WRAP: call addData() with empty buffer
       
   466      *     NEED_TASK: delegate task to executor
       
   467      *     BUFFER_OVERFLOW: allocate larger output buffer. Repeat wrap
       
   468      *     BUFFER_UNDERFLOW: shouldn't happen on writing side
       
   469      *     OK: return generated buffers
       
   470      */
       
   471     class Writer extends SubscriberWrapper {
       
   472         final SequentialScheduler scheduler;
       
   473         // queues of buffers received from upstream waiting
       
   474         // to be processed by the SSLEngine
       
   475         final List<ByteBuffer> writeList;
       
   476         final Logger debugw =  Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
       
   477         volatile boolean completing;
       
   478         boolean completed; // only accessed in processData
       
   479 
       
   480         class WriterDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
       
   481             @Override public void run() { processData(); }
       
   482         }
       
   483 
       
   484         Writer() {
       
   485             super();
       
   486             writeList = Collections.synchronizedList(new LinkedList<>());
       
   487             scheduler = new SequentialScheduler(new WriterDownstreamPusher());
       
   488         }
       
   489 
       
   490         @Override
       
   491         protected void incoming(List<ByteBuffer> buffers, boolean complete) {
       
   492             assert complete ? buffers ==  Utils.EMPTY_BB_LIST : true;
       
   493             assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
       
   494             if (complete) {
       
   495                 if (debugw.on()) debugw.log("adding SENTINEL");
       
   496                 completing = true;
       
   497                 writeList.add(SENTINEL);
       
   498             } else {
       
   499                 writeList.addAll(buffers);
       
   500             }
       
   501             if (debugw.on())
       
   502                 debugw.log("added " + buffers.size()
       
   503                            + " (" + Utils.remaining(buffers)
       
   504                            + " bytes) to the writeList");
       
   505             scheduler.runOrSchedule();
       
   506         }
       
   507 
       
   508         public final String dbgString() {
       
   509             return "SSL Writer(" + tubeName + ")";
       
   510         }
       
   511 
       
   512         protected void onSubscribe() {
       
   513             if (debugw.on()) debugw.log("onSubscribe initiating handshaking");
       
   514             addData(HS_TRIGGER);  // initiates handshaking
       
   515         }
       
   516 
       
   517         void schedule() {
       
   518             scheduler.runOrSchedule();
       
   519         }
       
   520 
       
   521         void stop() {
       
   522             if (debugw.on()) debugw.log("stop");
       
   523             scheduler.stop();
       
   524         }
       
   525 
       
   526         @Override
       
   527         public boolean closing() {
       
   528             return closeNotifyReceived();
       
   529         }
       
   530 
       
   531         private boolean isCompleting() {
       
   532             return completing;
       
   533         }
       
   534 
       
   535         @Override
       
   536         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
       
   537             if (writeList.size() > 10)
       
   538                 return 0;
       
   539             else
       
   540                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
       
   541         }
       
   542 
       
   543         private boolean hsTriggered() {
       
   544             synchronized(writeList) {
       
   545                 for (ByteBuffer b : writeList)
       
   546                     if (b == HS_TRIGGER)
       
   547                         return true;
       
   548                 return false;
       
   549             }
       
   550         }
       
   551 
       
   552         private void processData() {
       
   553             boolean completing = isCompleting();
       
   554 
       
   555             try {
       
   556                 if (debugw.on())
       
   557                     debugw.log("processData, writeList remaining:"
       
   558                                 + Utils.remaining(writeList) + ", hsTriggered:"
       
   559                                 + hsTriggered() + ", needWrap:" + needWrap());
       
   560 
       
   561                 while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) {
       
   562                     ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
       
   563                     EngineResult result = wrapBuffers(outbufs);
       
   564                     if (debugw.on())
       
   565                         debugw.log("wrapBuffer returned %s", result.result);
       
   566 
       
   567                     if (result.status() == Status.CLOSED) {
       
   568                         if (!upstreamCompleted) {
       
   569                             upstreamCompleted = true;
       
   570                             upstreamSubscription.cancel();
       
   571                         }
       
   572                         if (result.bytesProduced() <= 0)
       
   573                             return;
       
   574 
       
   575                         if (!completing && !completed) {
       
   576                             completing = this.completing = true;
       
   577                             // There could still be some outgoing data in outbufs.
       
   578                             writeList.add(SENTINEL);
       
   579                         }
       
   580                     }
       
   581 
       
   582                     boolean handshaking = false;
       
   583                     if (result.handshaking()) {
       
   584                         if (debugw.on()) debugw.log("handshaking");
       
   585                         doHandshake(result, WRITER);  // ok to ignore return
       
   586                         handshaking = true;
       
   587                     } else {
       
   588                         if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
       
   589                             setALPN();
       
   590                             resumeActivity();
       
   591                         }
       
   592                     }
       
   593                     cleanList(writeList); // tidy up the source list
       
   594                     sendResultBytes(result);
       
   595                     if (handshaking && !completing) {
       
   596                         if (needWrap()) {
       
   597                             continue;
       
   598                         } else {
       
   599                             return;
       
   600                         }
       
   601                     }
       
   602                 }
       
   603                 if (completing && Utils.remaining(writeList) == 0) {
       
   604                     if (!completed) {
       
   605                         completed = true;
       
   606                         writeList.clear();
       
   607                         outgoing(Utils.EMPTY_BB_LIST, true);
       
   608                     }
       
   609                     return;
       
   610                 }
       
   611                 if (writeList.isEmpty() && needWrap()) {
       
   612                     writer.addData(HS_TRIGGER);
       
   613                 }
       
   614             } catch (Throwable ex) {
       
   615                 errorCommon(ex);
       
   616                 handleError(ex);
       
   617             }
       
   618         }
       
   619 
       
   620         @SuppressWarnings("fallthrough")
       
   621         EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
       
   622             if (debugw.on())
       
   623                 debugw.log("wrapping " + Utils.remaining(src) + " bytes");
       
   624             ByteBuffer dst = getNetBuffer();
       
   625             while (true) {
       
   626                 SSLEngineResult sslResult = engine.wrap(src, dst);
       
   627                 if (debugw.on()) debugw.log("SSLResult: " + sslResult);
       
   628                 switch (sslResult.getStatus()) {
       
   629                     case BUFFER_OVERFLOW:
       
   630                         // Shouldn't happen. We allocated buffer with packet size
       
   631                         // get it again if net buffer size was changed
       
   632                         if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
       
   633                         int appSize = engine.getSession().getApplicationBufferSize();
       
   634                         ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
       
   635                         dst.flip();
       
   636                         b.put(dst);
       
   637                         dst = b;
       
   638                         break; // try again
       
   639                     case CLOSED:
       
   640                         if (debugw.on()) debugw.log("CLOSED");
       
   641                         // fallthrough. There could be some remaining data in dst.
       
   642                         // CLOSED will be handled by the caller.
       
   643                     case OK:
       
   644                         dst.flip();
       
   645                         final ByteBuffer dest = dst;
       
   646                         if (debugw.on())
       
   647                             debugw.log("OK => produced: %d, not wrapped: %d",
       
   648                                        dest.remaining(),  Utils.remaining(src));
       
   649                         return new EngineResult(sslResult, dest);
       
   650                     case BUFFER_UNDERFLOW:
       
   651                         // Shouldn't happen.  Doesn't returns when wrap()
       
   652                         // underflow handled externally
       
   653                         // assert false : "Buffer Underflow";
       
   654                         if (debug.on()) debug.log("BUFFER_UNDERFLOW");
       
   655                         return new EngineResult(sslResult);
       
   656                     default:
       
   657                         if (debugw.on())
       
   658                             debugw.log("result: %s", sslResult.getStatus());
       
   659                         assert false : "result:" + sslResult.getStatus();
       
   660                 }
       
   661             }
       
   662         }
       
   663 
       
   664         private boolean needWrap() {
       
   665             return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP;
       
   666         }
       
   667 
       
   668         private void sendResultBytes(EngineResult result) {
       
   669             if (result.bytesProduced() > 0) {
       
   670                 if (debugw.on())
       
   671                     debugw.log("Sending %d bytes downstream",
       
   672                                result.bytesProduced());
       
   673                 outgoing(result.destBuffer, false);
       
   674             }
       
   675         }
       
   676 
       
   677         @Override
       
   678         public String toString() {
       
   679             return "WRITER: " + super.toString() +
       
   680                     " writeList size " + Integer.toString(writeList.size());
       
   681                     //" writeList: " + writeList.toString();
       
   682         }
       
   683     }
       
   684 
       
   685     private void handleError(Throwable t) {
       
   686         if (debug.on()) debug.log("handleError", t);
       
   687         readerCF.completeExceptionally(t);
       
   688         writerCF.completeExceptionally(t);
       
   689         // no-op if already completed
       
   690         alpnCF.completeExceptionally(t);
       
   691         reader.stop();
       
   692         writer.stop();
       
   693     }
       
   694 
       
   695     boolean stopped;
       
   696 
       
   697     private synchronized void normalStop() {
       
   698         if (stopped)
       
   699             return;
       
   700         stopped = true;
       
   701         reader.stop();
       
   702         writer.stop();
       
   703     }
       
   704 
       
   705     private Void stopOnError(Throwable currentlyUnused) {
       
   706         // maybe log, etc
       
   707         normalStop();
       
   708         return null;
       
   709     }
       
   710 
       
   711     private void cleanList(List<ByteBuffer> l) {
       
   712         synchronized (l) {
       
   713             Iterator<ByteBuffer> iter = l.iterator();
       
   714             while (iter.hasNext()) {
       
   715                 ByteBuffer b = iter.next();
       
   716                 if (!b.hasRemaining() && b != SENTINEL) {
       
   717                     iter.remove();
       
   718                 }
       
   719             }
       
   720         }
       
   721     }
       
   722 
       
   723     /**
       
   724      * States for handshake. We avoid races when accessing/updating the AtomicInt
       
   725      * because updates always schedule an additional call to both the read()
       
   726      * and write() functions.
       
   727      */
       
   728     private static final int NOT_HANDSHAKING = 0;
       
   729     private static final int HANDSHAKING = 1;
       
   730 
       
   731     private static final int DOING_TASKS = 4; // bit added to above state
       
   732     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
       
   733 
       
   734     private static final int READER = 1;
       
   735     private static final int WRITER = 2;
       
   736 
       
   737     private static String states(AtomicInteger state) {
       
   738         int s = state.get();
       
   739         StringBuilder sb = new StringBuilder();
       
   740         int x = s & ~DOING_TASKS;
       
   741         switch (x) {
       
   742             case NOT_HANDSHAKING:
       
   743                 sb.append(" NOT_HANDSHAKING ");
       
   744                 break;
       
   745             case HANDSHAKING:
       
   746                 sb.append(" HANDSHAKING ");
       
   747                 break;
       
   748             default:
       
   749                 throw new InternalError();
       
   750         }
       
   751         if ((s & DOING_TASKS) > 0)
       
   752             sb.append("|DOING_TASKS");
       
   753         return sb.toString();
       
   754     }
       
   755 
       
   756     private void resumeActivity() {
       
   757         reader.schedule();
       
   758         writer.schedule();
       
   759     }
       
   760 
       
   761     final AtomicInteger handshakeState;
       
   762     final ConcurrentLinkedQueue<String> stateList = new ConcurrentLinkedQueue<>();
       
   763 
       
   764     private boolean doHandshake(EngineResult r, int caller) {
       
   765         // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS
       
   766         handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
       
   767         stateList.add(r.handshakeStatus().toString());
       
   768         stateList.add(Integer.toString(caller));
       
   769         switch (r.handshakeStatus()) {
       
   770             case NEED_TASK:
       
   771                 int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
       
   772                 if ((s & DOING_TASKS) > 0) // someone else was doing tasks
       
   773                     return false;
       
   774 
       
   775                 if (debug.on()) debug.log("obtaining and initiating task execution");
       
   776                 List<Runnable> tasks = obtainTasks();
       
   777                 executeTasks(tasks);
       
   778                 return false;  // executeTasks will resume activity
       
   779             case NEED_WRAP:
       
   780                 if (caller == READER) {
       
   781                     writer.addData(HS_TRIGGER);
       
   782                     return false;
       
   783                 }
       
   784                 break;
       
   785             case NEED_UNWRAP:
       
   786             case NEED_UNWRAP_AGAIN:
       
   787                 // do nothing else
       
   788                 // receiving-side data will trigger unwrap
       
   789                 break;
       
   790             default:
       
   791                 throw new InternalError("Unexpected handshake status:"
       
   792                                         + r.handshakeStatus());
       
   793         }
       
   794         return true;
       
   795     }
       
   796 
       
   797     private List<Runnable> obtainTasks() {
       
   798         List<Runnable> l = new ArrayList<>();
       
   799         Runnable r;
       
   800         while ((r = engine.getDelegatedTask()) != null) {
       
   801             l.add(r);
       
   802         }
       
   803         return l;
       
   804     }
       
   805 
       
   806     private void executeTasks(List<Runnable> tasks) {
       
   807         if (tasks.isEmpty())
       
   808             return;
       
   809         exec.execute(() -> {
       
   810             try {
       
   811                 List<Runnable> nextTasks = tasks;
       
   812                 do {
       
   813                     nextTasks.forEach(Runnable::run);
       
   814                     if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
       
   815                         nextTasks = obtainTasks();
       
   816                     } else {
       
   817                         break;
       
   818                     }
       
   819                 } while (true);
       
   820                 handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
       
   821                 //writer.addData(HS_TRIGGER);
       
   822                 resumeActivity();
       
   823             } catch (Throwable t) {
       
   824                 handleError(t);
       
   825             }
       
   826         });
       
   827     }
       
   828 
       
   829     // FIXME: acknowledge a received CLOSE request from peer
       
   830     EngineResult doClosure(EngineResult r) throws IOException {
       
   831         if (debug.on())
       
   832             debug.log("doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]",
       
   833                       r.result, engine.getHandshakeStatus(),
       
   834                       engine.isOutboundDone(), engine.isInboundDone());
       
   835         if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
       
   836             // we have received TLS close_notify and need to send
       
   837             // an acknowledgement back. We're calling doHandshake
       
   838             // to finish the close handshake.
       
   839             if (engine.isInboundDone() && !engine.isOutboundDone()) {
       
   840                 if (debug.on()) debug.log("doClosure: close_notify received");
       
   841                 close_notify_received = true;
       
   842                 doHandshake(r, READER);
       
   843             }
       
   844         }
       
   845         return r;
       
   846     }
       
   847 
       
   848     /**
       
   849      * Returns the upstream Flow.Subscriber of the reading (incoming) side.
       
   850      * This flow must be given the encrypted data read from upstream (eg socket)
       
   851      * before it is decrypted.
       
   852      */
       
   853     public Flow.Subscriber<List<ByteBuffer>> upstreamReader() {
       
   854         return reader;
       
   855     }
       
   856 
       
   857     /**
       
   858      * Returns the upstream Flow.Subscriber of the writing (outgoing) side.
       
   859      * This flow contains the plaintext data before it is encrypted.
       
   860      */
       
   861     public Flow.Subscriber<List<ByteBuffer>> upstreamWriter() {
       
   862         return writer;
       
   863     }
       
   864 
       
   865     public boolean resumeReader() {
       
   866         return reader.signalScheduling();
       
   867     }
       
   868 
       
   869     public void resetReaderDemand() {
       
   870         reader.resetDownstreamDemand();
       
   871     }
       
   872 
       
   873     static class EngineResult {
       
   874         final SSLEngineResult result;
       
   875         final ByteBuffer destBuffer;
       
   876 
       
   877         // normal result
       
   878         EngineResult(SSLEngineResult result) {
       
   879             this(result, null);
       
   880         }
       
   881 
       
   882         EngineResult(SSLEngineResult result, ByteBuffer destBuffer) {
       
   883             this.result = result;
       
   884             this.destBuffer = destBuffer;
       
   885         }
       
   886 
       
   887         boolean handshaking() {
       
   888             HandshakeStatus s = result.getHandshakeStatus();
       
   889             return s != HandshakeStatus.FINISHED
       
   890                    && s != HandshakeStatus.NOT_HANDSHAKING
       
   891                    && result.getStatus() != Status.CLOSED;
       
   892         }
       
   893 
       
   894         boolean needUnwrap() {
       
   895             HandshakeStatus s = result.getHandshakeStatus();
       
   896             return s == HandshakeStatus.NEED_UNWRAP;
       
   897         }
       
   898 
       
   899 
       
   900         int bytesConsumed() {
       
   901             return result.bytesConsumed();
       
   902         }
       
   903 
       
   904         int bytesProduced() {
       
   905             return result.bytesProduced();
       
   906         }
       
   907 
       
   908         SSLEngineResult.HandshakeStatus handshakeStatus() {
       
   909             return result.getHandshakeStatus();
       
   910         }
       
   911 
       
   912         SSLEngineResult.Status status() {
       
   913             return result.getStatus();
       
   914         }
       
   915     }
       
   916 
       
   917     public ByteBuffer getNetBuffer() {
       
   918         return ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
       
   919     }
       
   920 
       
   921     private ByteBuffer getAppBuffer() {
       
   922         return ByteBuffer.allocate(engine.getSession().getApplicationBufferSize());
       
   923     }
       
   924 
       
   925     final String dbgString() {
       
   926         return "SSLFlowDelegate(" + tubeName + ")";
       
   927     }
       
   928 }