src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
changeset 50681 4254bed3c09d
parent 49944 4690a2871b44
child 50985 cd41f34e548c
child 56795 03ece2518428
equal deleted inserted replaced
50678:818a23db260c 50681:4254bed3c09d
    44 import java.util.concurrent.Executor;
    44 import java.util.concurrent.Executor;
    45 import java.util.concurrent.Flow;
    45 import java.util.concurrent.Flow;
    46 import java.util.concurrent.Flow.Subscriber;
    46 import java.util.concurrent.Flow.Subscriber;
    47 import java.util.concurrent.atomic.AtomicInteger;
    47 import java.util.concurrent.atomic.AtomicInteger;
    48 import java.util.function.Consumer;
    48 import java.util.function.Consumer;
       
    49 import java.util.function.IntBinaryOperator;
    49 
    50 
    50 /**
    51 /**
    51  * Implements SSL using two SubscriberWrappers.
    52  * Implements SSL using two SubscriberWrappers.
    52  *
    53  *
    53  * <p> Constructor takes two Flow.Subscribers: one that receives the network
    54  * <p> Constructor takes two Flow.Subscribers: one that receives the network
    85 public class SSLFlowDelegate {
    86 public class SSLFlowDelegate {
    86 
    87 
    87     final Logger debug =
    88     final Logger debug =
    88             Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
    89             Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
    89 
    90 
       
    91     private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
       
    92     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
       
    93     // When handshake is in progress trying to wrap may produce no bytes.
       
    94     private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
       
    95     private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
       
    96 
    90     final Executor exec;
    97     final Executor exec;
    91     final Reader reader;
    98     final Reader reader;
    92     final Writer writer;
    99     final Writer writer;
    93     final SSLEngine engine;
   100     final SSLEngine engine;
    94     final String tubeName; // hack
   101     final String tubeName; // hack
    95     final CompletableFuture<String> alpnCF; // completes on initial handshake
   102     final CompletableFuture<String> alpnCF; // completes on initial handshake
    96     final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
       
    97     volatile boolean close_notify_received;
   103     volatile boolean close_notify_received;
    98     final CompletableFuture<Void> readerCF;
   104     final CompletableFuture<Void> readerCF;
    99     final CompletableFuture<Void> writerCF;
   105     final CompletableFuture<Void> writerCF;
   100     final Consumer<ByteBuffer> recycler;
   106     final Consumer<ByteBuffer> recycler;
   101     static AtomicInteger scount = new AtomicInteger(1);
   107     static AtomicInteger scount = new AtomicInteger(1);
   144 
   150 
   145         // connect the Reader to the downReader and the
   151         // connect the Reader to the downReader and the
   146         // Writer to the downWriter.
   152         // Writer to the downWriter.
   147         connect(downReader, downWriter);
   153         connect(downReader, downWriter);
   148 
   154 
   149         //Monitor.add(this::monitor);
   155         if (monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true")))
       
   156             Monitor.add(this::monitor);
   150     }
   157     }
   151 
   158 
   152     /**
   159     /**
   153      * Returns true if the SSLFlowDelegate has detected a TLS
   160      * Returns true if the SSLFlowDelegate has detected a TLS
   154      * close_notify from the server.
   161      * close_notify from the server.
   243         volatile boolean completing;
   250         volatile boolean completing;
   244         final Object readBufferLock = new Object();
   251         final Object readBufferLock = new Object();
   245         final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
   252         final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
   246 
   253 
   247         private final class ReaderDownstreamPusher implements Runnable {
   254         private final class ReaderDownstreamPusher implements Runnable {
   248             @Override public void run() { processData(); }
   255             @Override
       
   256             public void run() {
       
   257                 processData();
       
   258             }
   249         }
   259         }
   250 
   260 
   251         Reader() {
   261         Reader() {
   252             super();
   262             super();
   253             scheduler = SequentialScheduler.synchronizedScheduler(
   263             scheduler = SequentialScheduler.synchronizedScheduler(
   254                                                 new ReaderDownstreamPusher());
   264                     new ReaderDownstreamPusher());
   255             this.readBuf = ByteBuffer.allocate(1024);
   265             this.readBuf = ByteBuffer.allocate(1024);
   256             readBuf.limit(0); // keep in read mode
   266             readBuf.limit(0); // keep in read mode
   257         }
   267         }
   258 
   268 
   259         @Override
   269         @Override
   274          */
   284          */
   275         @Override
   285         @Override
   276         public void incoming(List<ByteBuffer> buffers, boolean complete) {
   286         public void incoming(List<ByteBuffer> buffers, boolean complete) {
   277             if (debugr.on())
   287             if (debugr.on())
   278                 debugr.log("Adding %d bytes to read buffer",
   288                 debugr.log("Adding %d bytes to read buffer",
   279                            Utils.remaining(buffers));
   289                         Utils.remaining(buffers));
   280             addToReadBuf(buffers, complete);
   290             addToReadBuf(buffers, complete);
   281             scheduler.runOrSchedule(exec);
   291             scheduler.runOrSchedule(exec);
   282         }
   292         }
   283 
   293 
   284         @Override
   294         @Override
   287                     + " count: " + count.toString();
   297                     + " count: " + count.toString();
   288         }
   298         }
   289 
   299 
   290         private void reallocReadBuf() {
   300         private void reallocReadBuf() {
   291             int sz = readBuf.capacity();
   301             int sz = readBuf.capacity();
   292             ByteBuffer newb = ByteBuffer.allocate(sz*2);
   302             ByteBuffer newb = ByteBuffer.allocate(sz * 2);
   293             readBuf.flip();
   303             readBuf.flip();
   294             Utils.copy(readBuf, newb);
   304             Utils.copy(readBuf, newb);
   295             readBuf = newb;
   305             readBuf = newb;
   296         }
   306         }
   297 
   307 
   298         @Override
   308         @Override
   299         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
   309         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
   300             if (readBuf.remaining() > TARGET_BUFSIZE) {
   310             if (readBuf.remaining() > TARGET_BUFSIZE) {
   301                 if (debugr.on())
   311                 if (debugr.on())
   302                     debugr.log("readBuf has more than TARGET_BUFSIZE: %d",
   312                     debugr.log("readBuf has more than TARGET_BUFSIZE: %d",
   303                                readBuf.remaining());
   313                             readBuf.remaining());
   304                 return 0;
   314                 return 0;
   305             } else {
   315             } else {
   306                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
   316                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
   307             }
   317             }
   308         }
   318         }
   309 
   319 
   310         // readBuf is kept ready for reading outside of this method
   320         // readBuf is kept ready for reading outside of this method
   311         private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
   321         private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
       
   322             assert Utils.remaining(buffers) > 0 || buffers.isEmpty();
   312             synchronized (readBufferLock) {
   323             synchronized (readBufferLock) {
   313                 for (ByteBuffer buf : buffers) {
   324                 for (ByteBuffer buf : buffers) {
   314                     readBuf.compact();
   325                     readBuf.compact();
   315                     while (readBuf.remaining() < buf.remaining())
   326                     while (readBuf.remaining() < buf.remaining())
   316                         reallocReadBuf();
   327                         reallocReadBuf();
   342         // minimum number of bytes required to call unwrap.
   353         // minimum number of bytes required to call unwrap.
   343         // Usually this is 0, unless there was a buffer underflow.
   354         // Usually this is 0, unless there was a buffer underflow.
   344         // In this case we need to wait for more bytes than what
   355         // In this case we need to wait for more bytes than what
   345         // we had before calling unwrap() again.
   356         // we had before calling unwrap() again.
   346         volatile int minBytesRequired;
   357         volatile int minBytesRequired;
       
   358 
   347         // work function where it all happens
   359         // work function where it all happens
   348         final void processData() {
   360         final void processData() {
   349             try {
   361             try {
   350                 if (debugr.on())
   362                 if (debugr.on())
   351                     debugr.log("processData:"
   363                     debugr.log("processData:"
   352                            + " readBuf remaining:" + readBuf.remaining()
   364                             + " readBuf remaining:" + readBuf.remaining()
   353                            + ", state:" + states(handshakeState)
   365                             + ", state:" + states(handshakeState)
   354                            + ", engine handshake status:" + engine.getHandshakeStatus());
   366                             + ", engine handshake status:" + engine.getHandshakeStatus());
   355                 int len;
   367                 int len;
   356                 boolean complete = false;
   368                 boolean complete = false;
   357                 while (readBuf.remaining() > (len = minBytesRequired)) {
   369                 while (readBuf.remaining() > (len = minBytesRequired)) {
   358                     boolean handshaking = false;
   370                     boolean handshaking = false;
   359                     try {
   371                     try {
   398                         if (complete && result.status() == Status.CLOSED) {
   410                         if (complete && result.status() == Status.CLOSED) {
   399                             if (debugr.on()) debugr.log("Closed: completing");
   411                             if (debugr.on()) debugr.log("Closed: completing");
   400                             outgoing(Utils.EMPTY_BB_LIST, true);
   412                             outgoing(Utils.EMPTY_BB_LIST, true);
   401                             return;
   413                             return;
   402                         }
   414                         }
   403                         if (result.handshaking() && !complete) {
   415                         if (result.handshaking()) {
       
   416                             handshaking = true;
   404                             if (debugr.on()) debugr.log("handshaking");
   417                             if (debugr.on()) debugr.log("handshaking");
   405                             if (doHandshake(result, READER)) {
   418                             if (doHandshake(result, READER)) continue; // need unwrap
   406                                 resumeActivity();
   419                             else break; // doHandshake will have triggered the write scheduler if necessary
   407                             }
       
   408                             handshaking = true;
       
   409                         } else {
   420                         } else {
   410                             if ((handshakeState.getAndSet(NOT_HANDSHAKING)& ~DOING_TASKS) == HANDSHAKING) {
   421                             if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
   411                                 handshaking = false;
   422                                 handshaking = false;
   412                                 applicationBufferSize = engine.getSession().getApplicationBufferSize();
   423                                 applicationBufferSize = engine.getSession().getApplicationBufferSize();
   413                                 packetBufferSize = engine.getSession().getPacketBufferSize();
   424                                 packetBufferSize = engine.getSession().getPacketBufferSize();
   414                                 setALPN();
   425                                 setALPN();
   415                                 resumeActivity();
   426                                 resumeActivity();
   441             }
   452             }
   442         }
   453         }
   443 
   454 
   444         EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
   455         EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
   445             ByteBuffer dst = getAppBuffer();
   456             ByteBuffer dst = getAppBuffer();
       
   457             int len = src.remaining();
   446             while (true) {
   458             while (true) {
   447                 SSLEngineResult sslResult = engine.unwrap(src, dst);
   459                 SSLEngineResult sslResult = engine.unwrap(src, dst);
   448                 switch (sslResult.getStatus()) {
   460                 switch (sslResult.getStatus()) {
   449                     case BUFFER_OVERFLOW:
   461                     case BUFFER_OVERFLOW:
   450                         // may happen only if app size buffer was changed.
   462                         // may happen if app size buffer was changed, or if
   451                         // get it again if app buffer size changed
   463                         // our 'adaptiveBufferSize' guess was too small for
       
   464                         // the current payload. In that case, update the
       
   465                         // value of applicationBufferSize, and allocate a
       
   466                         // buffer of that size, which we are sure will be
       
   467                         // big enough to decode whatever needs to be
       
   468                         // decoded. We will later update adaptiveBufferSize
       
   469                         // in OK: below.
   452                         int appSize = applicationBufferSize =
   470                         int appSize = applicationBufferSize =
   453                                 engine.getSession().getApplicationBufferSize();
   471                                 engine.getSession().getApplicationBufferSize();
   454                         ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
   472                         ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
   455                         dst.flip();
   473                         dst.flip();
   456                         b.put(dst);
   474                         b.put(dst);
   457                         dst = b;
   475                         dst = b;
   458                         break;
   476                         break;
   459                     case CLOSED:
   477                     case CLOSED:
       
   478                         assert dst.position() == 0;
   460                         return doClosure(new EngineResult(sslResult));
   479                         return doClosure(new EngineResult(sslResult));
   461                     case BUFFER_UNDERFLOW:
   480                     case BUFFER_UNDERFLOW:
   462                         // handled implicitly by compaction/reallocation of readBuf
   481                         // handled implicitly by compaction/reallocation of readBuf
       
   482                         assert dst.position() == 0;
   463                         return new EngineResult(sslResult);
   483                         return new EngineResult(sslResult);
   464                     case OK:
   484                     case OK:
       
   485                         int size = dst.position();
       
   486                         if (debug.on()) {
       
   487                             debugr.log("Decoded " + size + " bytes out of " + len
       
   488                                     + " into buffer of " + dst.capacity()
       
   489                                     + " remaining to decode: " + src.remaining());
       
   490                         }
       
   491                         // if the record payload was bigger than what was originally
       
   492                         // allocated, then sets the adaptiveAppBufferSize to size
       
   493                         // and we will use that new size as a guess for the next app
       
   494                         // buffer.
       
   495                         if (size > adaptiveAppBufferSize) {
       
   496                             adaptiveAppBufferSize = ((size + 7) >>> 3) << 3;
       
   497                         }
   465                         dst.flip();
   498                         dst.flip();
   466                         return new EngineResult(sslResult, dst);
   499                         return new EngineResult(sslResult, dst);
   467                 }
   500                 }
   468             }
   501             }
   469         }
   502         }
   660                             resumeActivity();
   693                             resumeActivity();
   661                         }
   694                         }
   662                     }
   695                     }
   663                     cleanList(writeList); // tidy up the source list
   696                     cleanList(writeList); // tidy up the source list
   664                     sendResultBytes(result);
   697                     sendResultBytes(result);
   665                     if (handshaking && !completing) {
   698                     if (handshaking) {
   666                         if (needWrap()) {
   699                         if (!completing && needWrap()) {
   667                             continue;
   700                             continue;
   668                         } else {
   701                         } else {
   669                             return;
   702                             return;
   670                         }
   703                         }
   671                     }
   704                     }
   685                 errorCommon(ex);
   718                 errorCommon(ex);
   686                 handleError(ex);
   719                 handleError(ex);
   687             }
   720             }
   688         }
   721         }
   689 
   722 
       
   723         // The SSLEngine insists on being given a buffer that is at least
       
   724         // SSLSession.getPacketBufferSize() long (usually 16K). If given
       
   725         // a smaller buffer it will go in BUFFER_OVERFLOW, even if it only
       
   726         // has 6 bytes to wrap. Typical usage shows that for GET we
       
   727         // usually produce an average of ~ 100 bytes.
       
   728         // To avoid wasting space, and because allocating and zeroing
       
   729         // 16K buffers for encoding 6 bytes is costly, we are reusing the
       
   730         // same writeBuffer to interact with SSLEngine.wrap().
       
   731         // If the SSLEngine produces less than writeBuffer.capacity() / 2,
       
   732         // then we copy off the bytes to a smaller buffer that we send
       
   733         // downstream. Otherwise, we send the writeBuffer downstream
       
   734         // and will allocate a new one next time.
       
   735         volatile ByteBuffer writeBuffer;
   690         @SuppressWarnings("fallthrough")
   736         @SuppressWarnings("fallthrough")
   691         EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
   737         EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
       
   738             long len = Utils.remaining(src);
   692             if (debugw.on())
   739             if (debugw.on())
   693                 debugw.log("wrapping " + Utils.remaining(src) + " bytes");
   740                 debugw.log("wrapping " + len + " bytes");
   694             ByteBuffer dst = getNetBuffer();
   741 
       
   742             ByteBuffer dst = writeBuffer;
       
   743             if (dst == null) dst = writeBuffer = getNetBuffer();
       
   744             assert dst.position() == 0 : "buffer position is " + dst.position();
       
   745             assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity();
       
   746 
   695             while (true) {
   747             while (true) {
   696                 SSLEngineResult sslResult = engine.wrap(src, dst);
   748                 SSLEngineResult sslResult = engine.wrap(src, dst);
   697                 if (debugw.on()) debugw.log("SSLResult: " + sslResult);
   749                 if (debugw.on()) debugw.log("SSLResult: " + sslResult);
   698                 switch (sslResult.getStatus()) {
   750                 switch (sslResult.getStatus()) {
   699                     case BUFFER_OVERFLOW:
   751                     case BUFFER_OVERFLOW:
   700                         // Shouldn't happen. We allocated buffer with packet size
   752                         // Shouldn't happen. We allocated buffer with packet size
   701                         // get it again if net buffer size was changed
   753                         // get it again if net buffer size was changed
   702                         if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
   754                         if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
   703                         int netSize = packetBufferSize
   755                         int netSize = packetBufferSize
   704                                 = engine.getSession().getPacketBufferSize();
   756                                 = engine.getSession().getPacketBufferSize();
   705                         ByteBuffer b = ByteBuffer.allocate(netSize + dst.position());
   757                         ByteBuffer b = writeBuffer = ByteBuffer.allocate(netSize + dst.position());
   706                         dst.flip();
   758                         dst.flip();
   707                         b.put(dst);
   759                         b.put(dst);
   708                         dst = b;
   760                         dst = b;
   709                         break; // try again
   761                         break; // try again
   710                     case CLOSED:
   762                     case CLOSED:
   711                         if (debugw.on()) debugw.log("CLOSED");
   763                         if (debugw.on()) debugw.log("CLOSED");
   712                         // fallthrough. There could be some remaining data in dst.
   764                         // fallthrough. There could be some remaining data in dst.
   713                         // CLOSED will be handled by the caller.
   765                         // CLOSED will be handled by the caller.
   714                     case OK:
   766                     case OK:
   715                         dst.flip();
   767                         final ByteBuffer dest;
   716                         final ByteBuffer dest = dst;
   768                         if (dst.position() == 0) {
       
   769                             dest = NOTHING; // can happen if handshake is in progress
       
   770                         } else if (dst.position() < dst.capacity() / 2) {
       
   771                             // less than half the buffer was used.
       
   772                             // copy off the bytes to a smaller buffer, and keep
       
   773                             // the writeBuffer for next time.
       
   774                             dst.flip();
       
   775                             dest = Utils.copyAligned(dst);
       
   776                             dst.clear();
       
   777                         } else {
       
   778                             // more than half the buffer was used.
       
   779                             // just send that buffer downstream, and we will
       
   780                             // get a new writeBuffer next time it is needed.
       
   781                             dst.flip();
       
   782                             dest = dst;
       
   783                             writeBuffer = null;
       
   784                         }
   717                         if (debugw.on())
   785                         if (debugw.on())
   718                             debugw.log("OK => produced: %d, not wrapped: %d",
   786                             debugw.log("OK => produced: %d bytes into %d, not wrapped: %d",
   719                                        dest.remaining(),  Utils.remaining(src));
   787                                        dest.remaining(),  dest.capacity(), Utils.remaining(src));
   720                         return new EngineResult(sslResult, dest);
   788                         return new EngineResult(sslResult, dest);
   721                     case BUFFER_UNDERFLOW:
   789                     case BUFFER_UNDERFLOW:
   722                         // Shouldn't happen.  Doesn't returns when wrap()
   790                         // Shouldn't happen.  Doesn't returns when wrap()
   723                         // underflow handled externally
   791                         // underflow handled externally
   724                         // assert false : "Buffer Underflow";
   792                         // assert false : "Buffer Underflow";
   797      * and write() functions.
   865      * and write() functions.
   798      */
   866      */
   799     private static final int NOT_HANDSHAKING = 0;
   867     private static final int NOT_HANDSHAKING = 0;
   800     private static final int HANDSHAKING = 1;
   868     private static final int HANDSHAKING = 1;
   801 
   869 
   802     private static final int DOING_TASKS = 4; // bit added to above state
   870     // Bit flags
   803     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
   871     // a thread is currently executing tasks
       
   872     private static final int DOING_TASKS = 4;
       
   873     // a thread wants to execute tasks, while another thread is executing
       
   874     private static final int REQUESTING_TASKS = 8;
       
   875     private static final int TASK_BITS = 12; // Both bits
   804 
   876 
   805     private static final int READER = 1;
   877     private static final int READER = 1;
   806     private static final int WRITER = 2;
   878     private static final int WRITER = 2;
   807 
   879 
   808     private static String states(AtomicInteger state) {
   880     private static String states(AtomicInteger state) {
   809         int s = state.get();
   881         int s = state.get();
   810         StringBuilder sb = new StringBuilder();
   882         StringBuilder sb = new StringBuilder();
   811         int x = s & ~DOING_TASKS;
   883         int x = s & ~TASK_BITS;
   812         switch (x) {
   884         switch (x) {
   813             case NOT_HANDSHAKING:
   885             case NOT_HANDSHAKING:
   814                 sb.append(" NOT_HANDSHAKING ");
   886                 sb.append(" NOT_HANDSHAKING ");
   815                 break;
   887                 break;
   816             case HANDSHAKING:
   888             case HANDSHAKING:
   819             default:
   891             default:
   820                 throw new InternalError();
   892                 throw new InternalError();
   821         }
   893         }
   822         if ((s & DOING_TASKS) > 0)
   894         if ((s & DOING_TASKS) > 0)
   823             sb.append("|DOING_TASKS");
   895             sb.append("|DOING_TASKS");
       
   896         if ((s & REQUESTING_TASKS) > 0)
       
   897             sb.append("|REQUESTING_TASKS");
   824         return sb.toString();
   898         return sb.toString();
   825     }
   899     }
   826 
   900 
   827     private void resumeActivity() {
   901     private void resumeActivity() {
   828         reader.schedule();
   902         reader.schedule();
   831 
   905 
   832     final AtomicInteger handshakeState;
   906     final AtomicInteger handshakeState;
   833     final ConcurrentLinkedQueue<String> stateList =
   907     final ConcurrentLinkedQueue<String> stateList =
   834             debug.on() ? new ConcurrentLinkedQueue<>() : null;
   908             debug.on() ? new ConcurrentLinkedQueue<>() : null;
   835 
   909 
       
   910     // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS
       
   911     // depending on previous value
       
   912     private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> {
       
   913         if ((current & DOING_TASKS) == 0)
       
   914             return DOING_TASKS | (current & HANDSHAKING);
       
   915         else
       
   916             return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING);
       
   917     };
       
   918 
       
   919     // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set
       
   920     // clears bits if not.
       
   921     private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> {
       
   922         if ((current & REQUESTING_TASKS) != 0)
       
   923             return DOING_TASKS | (current & HANDSHAKING);
       
   924         // clear both bits
       
   925         return (current & HANDSHAKING);
       
   926     };
       
   927 
   836     private boolean doHandshake(EngineResult r, int caller) {
   928     private boolean doHandshake(EngineResult r, int caller) {
   837         // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS
   929         // unconditionally sets the HANDSHAKING bit, while preserving task bits
   838         handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
   930         handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS));
   839         if (stateList != null && debug.on()) {
   931         if (stateList != null && debug.on()) {
   840             stateList.add(r.handshakeStatus().toString());
   932             stateList.add(r.handshakeStatus().toString());
   841             stateList.add(Integer.toString(caller));
   933             stateList.add(Integer.toString(caller));
   842         }
   934         }
   843         switch (r.handshakeStatus()) {
   935         switch (r.handshakeStatus()) {
   844             case NEED_TASK:
   936             case NEED_TASK:
   845                 int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
   937                 int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS);
   846                 if ((s & DOING_TASKS) > 0) // someone else was doing tasks
   938                 if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks
   847                     return false;
   939                     return false;
       
   940                 }
   848 
   941 
   849                 if (debug.on()) debug.log("obtaining and initiating task execution");
   942                 if (debug.on()) debug.log("obtaining and initiating task execution");
   850                 List<Runnable> tasks = obtainTasks();
   943                 List<Runnable> tasks = obtainTasks();
   851                 executeTasks(tasks);
   944                 executeTasks(tasks);
   852                 return false;  // executeTasks will resume activity
   945                 return false;  // executeTasks will resume activity
   876         }
   969         }
   877         return l;
   970         return l;
   878     }
   971     }
   879 
   972 
   880     private void executeTasks(List<Runnable> tasks) {
   973     private void executeTasks(List<Runnable> tasks) {
   881         if (tasks.isEmpty())
       
   882             return;
       
   883         exec.execute(() -> {
   974         exec.execute(() -> {
   884             try {
   975             try {
   885                 List<Runnable> nextTasks = tasks;
   976                 List<Runnable> nextTasks = tasks;
       
   977                 if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size()));
   886                 do {
   978                 do {
   887                     nextTasks.forEach(Runnable::run);
   979                     nextTasks.forEach(Runnable::run);
   888                     if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
   980                     if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
   889                         nextTasks = obtainTasks();
   981                         nextTasks = obtainTasks();
   890                     } else {
   982                     } else {
       
   983                         int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS);
       
   984                         if ((s & DOING_TASKS) != 0) {
       
   985                             if (debug.on()) debug.log("re-running tasks (B)");
       
   986                             nextTasks = obtainTasks();
       
   987                             continue;
       
   988                         }
   891                         break;
   989                         break;
   892                     }
   990                     }
   893                 } while (true);
   991                 } while (true);
   894                 handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
   992                 if (debug.on()) debug.log("finished task execution");
   895                 resumeActivity();
   993                 resumeActivity();
   896             } catch (Throwable t) {
   994             } catch (Throwable t) {
   897                 handleError(t);
   995                 handleError(t);
   898             }
   996             }
   899         });
   997         });
   995         SSLEngineResult.Status status() {
  1093         SSLEngineResult.Status status() {
   996             return result.getStatus();
  1094             return result.getStatus();
   997         }
  1095         }
   998     }
  1096     }
   999 
  1097 
       
  1098     // The maximum network buffer size negotiated during
       
  1099     // the handshake. Usually 16K.
  1000     volatile int packetBufferSize;
  1100     volatile int packetBufferSize;
  1001     final ByteBuffer getNetBuffer() {
  1101     final ByteBuffer getNetBuffer() {
  1002         int netSize = packetBufferSize;
  1102         int netSize = packetBufferSize;
  1003         if (netSize <= 0) {
  1103         if (netSize <= 0) {
  1004             packetBufferSize = netSize = engine.getSession().getPacketBufferSize();
  1104             packetBufferSize = netSize = engine.getSession().getPacketBufferSize();
  1005         }
  1105         }
  1006         return ByteBuffer.allocate(netSize);
  1106         return ByteBuffer.allocate(netSize);
  1007     }
  1107     }
  1008 
  1108 
       
  1109     // The maximum application buffer size negotiated during
       
  1110     // the handshake. Usually close to 16K.
  1009     volatile int applicationBufferSize;
  1111     volatile int applicationBufferSize;
       
  1112     // Despite of the maximum applicationBufferSize negotiated
       
  1113     // above, TLS records usually have a much smaller payload.
       
  1114     // The adaptativeAppBufferSize records the max payload
       
  1115     // ever decoded, and we use that as a guess for how big
       
  1116     // a buffer we will need for the next payload.
       
  1117     // This avoids allocating and zeroing a 16K buffer for
       
  1118     // nothing...
       
  1119     volatile int adaptiveAppBufferSize;
  1010     final ByteBuffer getAppBuffer() {
  1120     final ByteBuffer getAppBuffer() {
  1011         int appSize = applicationBufferSize;
  1121         int appSize = applicationBufferSize;
  1012         if (appSize <= 0) {
  1122         if (appSize <= 0) {
  1013             applicationBufferSize = appSize = engine.getSession().getApplicationBufferSize();
  1123             applicationBufferSize = appSize
  1014         }
  1124                     = engine.getSession().getApplicationBufferSize();
  1015         return ByteBuffer.allocate(appSize);
  1125         }
       
  1126         int size = adaptiveAppBufferSize;
       
  1127         if (size <= 0) {
       
  1128             size = 512; // start with 512 this is usually enough for handshaking / headers
       
  1129         } else if (size > appSize) {
       
  1130             size = appSize;
       
  1131         }
       
  1132         // will cause a BUFFER_OVERFLOW if not big enough, but
       
  1133         // that's OK.
       
  1134         return ByteBuffer.allocate(size);
  1016     }
  1135     }
  1017 
  1136 
  1018     final String dbgString() {
  1137     final String dbgString() {
  1019         return "SSLFlowDelegate(" + tubeName + ")";
  1138         return "SSLFlowDelegate(" + tubeName + ")";
  1020     }
  1139     }