src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
changeset 49944 4690a2871b44
parent 49765 ee6f7a61f3a5
child 50681 4254bed3c09d
child 56507 2294c51eae30
equal deleted inserted replaced
49943:8e1ed2a15845 49944:4690a2871b44
    31 import javax.net.ssl.SSLEngineResult;
    31 import javax.net.ssl.SSLEngineResult;
    32 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
    32 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
    33 import javax.net.ssl.SSLEngineResult.Status;
    33 import javax.net.ssl.SSLEngineResult.Status;
    34 import javax.net.ssl.SSLException;
    34 import javax.net.ssl.SSLException;
    35 import java.io.IOException;
    35 import java.io.IOException;
    36 import java.lang.System.Logger.Level;
       
    37 import java.nio.ByteBuffer;
    36 import java.nio.ByteBuffer;
    38 import java.util.ArrayList;
    37 import java.util.ArrayList;
    39 import java.util.Collections;
    38 import java.util.Collections;
    40 import java.util.Iterator;
    39 import java.util.Iterator;
    41 import java.util.LinkedList;
    40 import java.util.LinkedList;
    44 import java.util.concurrent.ConcurrentLinkedQueue;
    43 import java.util.concurrent.ConcurrentLinkedQueue;
    45 import java.util.concurrent.Executor;
    44 import java.util.concurrent.Executor;
    46 import java.util.concurrent.Flow;
    45 import java.util.concurrent.Flow;
    47 import java.util.concurrent.Flow.Subscriber;
    46 import java.util.concurrent.Flow.Subscriber;
    48 import java.util.concurrent.atomic.AtomicInteger;
    47 import java.util.concurrent.atomic.AtomicInteger;
       
    48 import java.util.function.Consumer;
    49 
    49 
    50 /**
    50 /**
    51  * Implements SSL using two SubscriberWrappers.
    51  * Implements SSL using two SubscriberWrappers.
    52  *
    52  *
    53  * <p> Constructor takes two Flow.Subscribers: one that receives the network
    53  * <p> Constructor takes two Flow.Subscribers: one that receives the network
    95     final CompletableFuture<String> alpnCF; // completes on initial handshake
    95     final CompletableFuture<String> alpnCF; // completes on initial handshake
    96     final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
    96     final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
    97     volatile boolean close_notify_received;
    97     volatile boolean close_notify_received;
    98     final CompletableFuture<Void> readerCF;
    98     final CompletableFuture<Void> readerCF;
    99     final CompletableFuture<Void> writerCF;
    99     final CompletableFuture<Void> writerCF;
       
   100     final Consumer<ByteBuffer> recycler;
   100     static AtomicInteger scount = new AtomicInteger(1);
   101     static AtomicInteger scount = new AtomicInteger(1);
   101     final int id;
   102     final int id;
   102 
   103 
   103     /**
   104     /**
   104      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
   105      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
   108     public SSLFlowDelegate(SSLEngine engine,
   109     public SSLFlowDelegate(SSLEngine engine,
   109                            Executor exec,
   110                            Executor exec,
   110                            Subscriber<? super List<ByteBuffer>> downReader,
   111                            Subscriber<? super List<ByteBuffer>> downReader,
   111                            Subscriber<? super List<ByteBuffer>> downWriter)
   112                            Subscriber<? super List<ByteBuffer>> downWriter)
   112     {
   113     {
       
   114         this(engine, exec, null, downReader, downWriter);
       
   115     }
       
   116 
       
   117     /**
       
   118      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
       
   119      * Flow.Subscriber requires an associated {@link CompletableFuture}
       
   120      * for errors that need to be signaled from downstream to upstream.
       
   121      */
       
   122     public SSLFlowDelegate(SSLEngine engine,
       
   123             Executor exec,
       
   124             Consumer<ByteBuffer> recycler,
       
   125             Subscriber<? super List<ByteBuffer>> downReader,
       
   126             Subscriber<? super List<ByteBuffer>> downWriter)
       
   127         {
   113         this.id = scount.getAndIncrement();
   128         this.id = scount.getAndIncrement();
   114         this.tubeName = String.valueOf(downWriter);
   129         this.tubeName = String.valueOf(downWriter);
       
   130         this.recycler = recycler;
   115         this.reader = new Reader();
   131         this.reader = new Reader();
   116         this.writer = new Writer();
   132         this.writer = new Writer();
   117         this.engine = engine;
   133         this.engine = engine;
   118         this.exec = exec;
   134         this.exec = exec;
   119         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
   135         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
   179     public String monitor() {
   195     public String monitor() {
   180         StringBuilder sb = new StringBuilder();
   196         StringBuilder sb = new StringBuilder();
   181         sb.append("SSL: id ").append(id);
   197         sb.append("SSL: id ").append(id);
   182         sb.append(" HS state: " + states(handshakeState));
   198         sb.append(" HS state: " + states(handshakeState));
   183         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
   199         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
   184         sb.append(" LL : ");
   200         if (stateList != null) {
   185         for (String s: stateList) {
   201             sb.append(" LL : ");
   186             sb.append(s).append(" ");
   202             for (String s : stateList) {
       
   203                 sb.append(s).append(" ");
       
   204             }
   187         }
   205         }
   188         sb.append("\r\n");
   206         sb.append("\r\n");
   189         sb.append("Reader:: ").append(reader.toString());
   207         sb.append("Reader:: ").append(reader.toString());
   190         sb.append("\r\n");
   208         sb.append("\r\n");
   191         sb.append("Writer:: ").append(writer.toString());
   209         sb.append("Writer:: ").append(writer.toString());
   211      *     OK: return generated buffers.
   229      *     OK: return generated buffers.
   212      *
   230      *
   213      * Upstream subscription strategy is to try and keep no more than
   231      * Upstream subscription strategy is to try and keep no more than
   214      * TARGET_BUFSIZE bytes in readBuf
   232      * TARGET_BUFSIZE bytes in readBuf
   215      */
   233      */
   216     class Reader extends SubscriberWrapper {
   234     final class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber {
       
   235         // Maximum record size is 16k.
       
   236         // Because SocketTube can feeds us up to 3 16K buffers,
       
   237         // then setting this size to 16K means that the readBuf
       
   238         // can store up to 64K-1 (16K-1 + 3*16K)
       
   239         static final int TARGET_BUFSIZE = 16 * 1024;
       
   240 
   217         final SequentialScheduler scheduler;
   241         final SequentialScheduler scheduler;
   218         static final int TARGET_BUFSIZE = 16 * 1024;
       
   219         volatile ByteBuffer readBuf;
   242         volatile ByteBuffer readBuf;
   220         volatile boolean completing;
   243         volatile boolean completing;
   221         final Object readBufferLock = new Object();
   244         final Object readBufferLock = new Object();
   222         final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
   245         final Logger debugr = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
   223 
   246 
   224         class ReaderDownstreamPusher implements Runnable {
   247         private final class ReaderDownstreamPusher implements Runnable {
   225             @Override public void run() { processData(); }
   248             @Override public void run() { processData(); }
   226         }
   249         }
   227 
   250 
   228         Reader() {
   251         Reader() {
   229             super();
   252             super();
   230             scheduler = SequentialScheduler.synchronizedScheduler(
   253             scheduler = SequentialScheduler.synchronizedScheduler(
   231                                                 new ReaderDownstreamPusher());
   254                                                 new ReaderDownstreamPusher());
   232             this.readBuf = ByteBuffer.allocate(1024);
   255             this.readBuf = ByteBuffer.allocate(1024);
   233             readBuf.limit(0); // keep in read mode
   256             readBuf.limit(0); // keep in read mode
       
   257         }
       
   258 
       
   259         @Override
       
   260         public boolean supportsRecycling() {
       
   261             return recycler != null;
   234         }
   262         }
   235 
   263 
   236         protected SchedulingAction enterScheduling() {
   264         protected SchedulingAction enterScheduling() {
   237             return enterReadScheduling();
   265             return enterReadScheduling();
   238         }
   266         }
   248         public void incoming(List<ByteBuffer> buffers, boolean complete) {
   276         public void incoming(List<ByteBuffer> buffers, boolean complete) {
   249             if (debugr.on())
   277             if (debugr.on())
   250                 debugr.log("Adding %d bytes to read buffer",
   278                 debugr.log("Adding %d bytes to read buffer",
   251                            Utils.remaining(buffers));
   279                            Utils.remaining(buffers));
   252             addToReadBuf(buffers, complete);
   280             addToReadBuf(buffers, complete);
   253             scheduler.runOrSchedule();
   281             scheduler.runOrSchedule(exec);
   254         }
   282         }
   255 
   283 
   256         @Override
   284         @Override
   257         public String toString() {
   285         public String toString() {
   258             return "READER: " + super.toString() + " readBuf: " + readBuf.toString()
   286             return "READER: " + super.toString() + " readBuf: " + readBuf.toString()
   268         }
   296         }
   269 
   297 
   270         @Override
   298         @Override
   271         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
   299         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
   272             if (readBuf.remaining() > TARGET_BUFSIZE) {
   300             if (readBuf.remaining() > TARGET_BUFSIZE) {
       
   301                 if (debugr.on())
       
   302                     debugr.log("readBuf has more than TARGET_BUFSIZE: %d",
       
   303                                readBuf.remaining());
   273                 return 0;
   304                 return 0;
   274             } else {
   305             } else {
   275                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
   306                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
   276             }
   307             }
   277         }
   308         }
   283                     readBuf.compact();
   314                     readBuf.compact();
   284                     while (readBuf.remaining() < buf.remaining())
   315                     while (readBuf.remaining() < buf.remaining())
   285                         reallocReadBuf();
   316                         reallocReadBuf();
   286                     readBuf.put(buf);
   317                     readBuf.put(buf);
   287                     readBuf.flip();
   318                     readBuf.flip();
       
   319                     // should be safe to call inside lock
       
   320                     // since the only implementation
       
   321                     // offers the buffer to an unbounded queue.
       
   322                     // WARNING: do not touch buf after this point!
       
   323                     if (recycler != null) recycler.accept(buf);
   288                 }
   324                 }
   289                 if (complete) {
   325                 if (complete) {
   290                     this.completing = complete;
   326                     this.completing = complete;
   291                 }
   327                 }
   292             }
   328             }
   293         }
   329         }
   294 
   330 
   295         void schedule() {
   331         void schedule() {
   296             scheduler.runOrSchedule();
   332             scheduler.runOrSchedule(exec);
   297         }
   333         }
   298 
   334 
   299         void stop() {
   335         void stop() {
   300             if (debugr.on()) debugr.log("stop");
   336             if (debugr.on()) debugr.log("stop");
   301             scheduler.stop();
   337             scheduler.stop();
   302         }
   338         }
   303 
   339 
   304         AtomicInteger count = new AtomicInteger(0);
   340         AtomicInteger count = new AtomicInteger(0);
   305 
   341 
       
   342         // minimum number of bytes required to call unwrap.
       
   343         // Usually this is 0, unless there was a buffer underflow.
       
   344         // In this case we need to wait for more bytes than what
       
   345         // we had before calling unwrap() again.
       
   346         volatile int minBytesRequired;
   306         // work function where it all happens
   347         // work function where it all happens
   307         void processData() {
   348         final void processData() {
   308             try {
   349             try {
   309                 if (debugr.on())
   350                 if (debugr.on())
   310                     debugr.log("processData:"
   351                     debugr.log("processData:"
   311                            + " readBuf remaining:" + readBuf.remaining()
   352                            + " readBuf remaining:" + readBuf.remaining()
   312                            + ", state:" + states(handshakeState)
   353                            + ", state:" + states(handshakeState)
   313                            + ", engine handshake status:" + engine.getHandshakeStatus());
   354                            + ", engine handshake status:" + engine.getHandshakeStatus());
   314                 int len;
   355                 int len;
   315                 boolean complete = false;
   356                 boolean complete = false;
   316                 while ((len = readBuf.remaining()) > 0) {
   357                 while (readBuf.remaining() > (len = minBytesRequired)) {
   317                     boolean handshaking = false;
   358                     boolean handshaking = false;
   318                     try {
   359                     try {
   319                         EngineResult result;
   360                         EngineResult result;
   320                         synchronized (readBufferLock) {
   361                         synchronized (readBufferLock) {
   321                             complete = this.completing;
   362                             complete = this.completing;
       
   363                             if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining());
       
   364                             // Unless there is a BUFFER_UNDERFLOW, we should try to
       
   365                             // unwrap any number of bytes. Set minBytesRequired to 0:
       
   366                             // we only need to do that if minBytesRequired is not already 0.
       
   367                             len = len > 0 ? minBytesRequired = 0 : len;
   322                             result = unwrapBuffer(readBuf);
   368                             result = unwrapBuffer(readBuf);
   323                             if (debugr.on())
   369                             len = readBuf.remaining();
   324                                 debugr.log("Unwrapped: %s", result.result);
   370                             if (debugr.on()) {
       
   371                                 debugr.log("Unwrapped: result: %s", result.result);
       
   372                                 debugr.log("Unwrapped: consumed: %s", result.bytesConsumed());
       
   373                             }
   325                         }
   374                         }
   326                         if (result.bytesProduced() > 0) {
   375                         if (result.bytesProduced() > 0) {
   327                             if (debugr.on())
   376                             if (debugr.on())
   328                                 debugr.log("sending %d", result.bytesProduced());
   377                                 debugr.log("sending %d", result.bytesProduced());
   329                             count.addAndGet(result.bytesProduced());
   378                             count.addAndGet(result.bytesProduced());
   330                             outgoing(result.destBuffer, false);
   379                             outgoing(result.destBuffer, false);
   331                         }
   380                         }
   332                         if (result.status() == Status.BUFFER_UNDERFLOW) {
   381                         if (result.status() == Status.BUFFER_UNDERFLOW) {
   333                             if (debugr.on()) debugr.log("BUFFER_UNDERFLOW");
   382                             if (debugr.on()) debugr.log("BUFFER_UNDERFLOW");
   334                             // not enough data in the read buffer...
   383                             // not enough data in the read buffer...
       
   384                             // no need to try to unwrap again unless we get more bytes
       
   385                             // than minBytesRequired = len in the read buffer.
       
   386                             minBytesRequired = len;
       
   387                             synchronized (readBufferLock) {
       
   388                                 // more bytes could already have been added...
       
   389                                 assert readBuf.remaining() >= len;
       
   390                                 // check if we have received some data, and if so
       
   391                                 // we can just re-spin the loop
       
   392                                 if (readBuf.remaining() > len) continue;
       
   393                             }
       
   394                             // request more data and return.
   335                             requestMore();
   395                             requestMore();
   336                             synchronized (readBufferLock) {
   396                             return;
   337                                 // check if we have received some data
       
   338                                 if (readBuf.remaining() > len) continue;
       
   339                                 return;
       
   340                             }
       
   341                         }
   397                         }
   342                         if (complete && result.status() == Status.CLOSED) {
   398                         if (complete && result.status() == Status.CLOSED) {
   343                             if (debugr.on()) debugr.log("Closed: completing");
   399                             if (debugr.on()) debugr.log("Closed: completing");
   344                             outgoing(Utils.EMPTY_BB_LIST, true);
   400                             outgoing(Utils.EMPTY_BB_LIST, true);
   345                             return;
   401                             return;
   350                                 resumeActivity();
   406                                 resumeActivity();
   351                             }
   407                             }
   352                             handshaking = true;
   408                             handshaking = true;
   353                         } else {
   409                         } else {
   354                             if ((handshakeState.getAndSet(NOT_HANDSHAKING)& ~DOING_TASKS) == HANDSHAKING) {
   410                             if ((handshakeState.getAndSet(NOT_HANDSHAKING)& ~DOING_TASKS) == HANDSHAKING) {
       
   411                                 handshaking = false;
       
   412                                 applicationBufferSize = engine.getSession().getApplicationBufferSize();
       
   413                                 packetBufferSize = engine.getSession().getPacketBufferSize();
   355                                 setALPN();
   414                                 setALPN();
   356                                 handshaking = false;
       
   357                                 resumeActivity();
   415                                 resumeActivity();
   358                             }
   416                             }
   359                         }
   417                         }
   360                     } catch (IOException ex) {
   418                     } catch (IOException ex) {
   361                         errorCommon(ex);
   419                         errorCommon(ex);
   389                 SSLEngineResult sslResult = engine.unwrap(src, dst);
   447                 SSLEngineResult sslResult = engine.unwrap(src, dst);
   390                 switch (sslResult.getStatus()) {
   448                 switch (sslResult.getStatus()) {
   391                     case BUFFER_OVERFLOW:
   449                     case BUFFER_OVERFLOW:
   392                         // may happen only if app size buffer was changed.
   450                         // may happen only if app size buffer was changed.
   393                         // get it again if app buffer size changed
   451                         // get it again if app buffer size changed
   394                         int appSize = engine.getSession().getApplicationBufferSize();
   452                         int appSize = applicationBufferSize =
       
   453                                 engine.getSession().getApplicationBufferSize();
   395                         ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
   454                         ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
   396                         dst.flip();
   455                         dst.flip();
   397                         b.put(dst);
   456                         b.put(dst);
   398                         dst = b;
   457                         dst = b;
   399                         break;
   458                         break;
   487             scheduler = new SequentialScheduler(new WriterDownstreamPusher());
   546             scheduler = new SequentialScheduler(new WriterDownstreamPusher());
   488         }
   547         }
   489 
   548 
   490         @Override
   549         @Override
   491         protected void incoming(List<ByteBuffer> buffers, boolean complete) {
   550         protected void incoming(List<ByteBuffer> buffers, boolean complete) {
   492             assert complete ? buffers ==  Utils.EMPTY_BB_LIST : true;
   551             assert complete ? buffers == Utils.EMPTY_BB_LIST : true;
   493             assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
   552             assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
   494             if (complete) {
   553             if (complete) {
   495                 if (debugw.on()) debugw.log("adding SENTINEL");
   554                 if (debugw.on()) debugw.log("adding SENTINEL");
   496                 completing = true;
   555                 completing = true;
   497                 writeList.add(SENTINEL);
   556                 writeList.add(SENTINEL);
   547                         return true;
   606                         return true;
   548                 return false;
   607                 return false;
   549             }
   608             }
   550         }
   609         }
   551 
   610 
       
   611         void triggerWrite() {
       
   612             synchronized (writeList) {
       
   613                 if (writeList.isEmpty()) {
       
   614                     writeList.add(HS_TRIGGER);
       
   615                 }
       
   616             }
       
   617             scheduler.runOrSchedule();
       
   618         }
       
   619 
   552         private void processData() {
   620         private void processData() {
   553             boolean completing = isCompleting();
   621             boolean completing = isCompleting();
   554 
   622 
   555             try {
   623             try {
   556                 if (debugw.on())
   624                 if (debugw.on())
   584                         if (debugw.on()) debugw.log("handshaking");
   652                         if (debugw.on()) debugw.log("handshaking");
   585                         doHandshake(result, WRITER);  // ok to ignore return
   653                         doHandshake(result, WRITER);  // ok to ignore return
   586                         handshaking = true;
   654                         handshaking = true;
   587                     } else {
   655                     } else {
   588                         if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
   656                         if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
       
   657                             applicationBufferSize = engine.getSession().getApplicationBufferSize();
       
   658                             packetBufferSize = engine.getSession().getPacketBufferSize();
   589                             setALPN();
   659                             setALPN();
   590                             resumeActivity();
   660                             resumeActivity();
   591                         }
   661                         }
   592                     }
   662                     }
   593                     cleanList(writeList); // tidy up the source list
   663                     cleanList(writeList); // tidy up the source list
   628                 switch (sslResult.getStatus()) {
   698                 switch (sslResult.getStatus()) {
   629                     case BUFFER_OVERFLOW:
   699                     case BUFFER_OVERFLOW:
   630                         // Shouldn't happen. We allocated buffer with packet size
   700                         // Shouldn't happen. We allocated buffer with packet size
   631                         // get it again if net buffer size was changed
   701                         // get it again if net buffer size was changed
   632                         if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
   702                         if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
   633                         int appSize = engine.getSession().getApplicationBufferSize();
   703                         int netSize = packetBufferSize
   634                         ByteBuffer b = ByteBuffer.allocate(appSize + dst.position());
   704                                 = engine.getSession().getPacketBufferSize();
       
   705                         ByteBuffer b = ByteBuffer.allocate(netSize + dst.position());
   635                         dst.flip();
   706                         dst.flip();
   636                         b.put(dst);
   707                         b.put(dst);
   637                         dst = b;
   708                         dst = b;
   638                         break; // try again
   709                         break; // try again
   639                     case CLOSED:
   710                     case CLOSED:
   757         reader.schedule();
   828         reader.schedule();
   758         writer.schedule();
   829         writer.schedule();
   759     }
   830     }
   760 
   831 
   761     final AtomicInteger handshakeState;
   832     final AtomicInteger handshakeState;
   762     final ConcurrentLinkedQueue<String> stateList = new ConcurrentLinkedQueue<>();
   833     final ConcurrentLinkedQueue<String> stateList =
       
   834             debug.on() ? new ConcurrentLinkedQueue<>() : null;
   763 
   835 
   764     private boolean doHandshake(EngineResult r, int caller) {
   836     private boolean doHandshake(EngineResult r, int caller) {
   765         // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS
   837         // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS
   766         handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
   838         handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
   767         stateList.add(r.handshakeStatus().toString());
   839         if (stateList != null && debug.on()) {
   768         stateList.add(Integer.toString(caller));
   840             stateList.add(r.handshakeStatus().toString());
       
   841             stateList.add(Integer.toString(caller));
       
   842         }
   769         switch (r.handshakeStatus()) {
   843         switch (r.handshakeStatus()) {
   770             case NEED_TASK:
   844             case NEED_TASK:
   771                 int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
   845                 int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
   772                 if ((s & DOING_TASKS) > 0) // someone else was doing tasks
   846                 if ((s & DOING_TASKS) > 0) // someone else was doing tasks
   773                     return false;
   847                     return false;
   776                 List<Runnable> tasks = obtainTasks();
   850                 List<Runnable> tasks = obtainTasks();
   777                 executeTasks(tasks);
   851                 executeTasks(tasks);
   778                 return false;  // executeTasks will resume activity
   852                 return false;  // executeTasks will resume activity
   779             case NEED_WRAP:
   853             case NEED_WRAP:
   780                 if (caller == READER) {
   854                 if (caller == READER) {
   781                     writer.addData(HS_TRIGGER);
   855                     writer.triggerWrite();
   782                     return false;
   856                     return false;
   783                 }
   857                 }
   784                 break;
   858                 break;
   785             case NEED_UNWRAP:
   859             case NEED_UNWRAP:
   786             case NEED_UNWRAP_AGAIN:
   860             case NEED_UNWRAP_AGAIN:
   816                     } else {
   890                     } else {
   817                         break;
   891                         break;
   818                     }
   892                     }
   819                 } while (true);
   893                 } while (true);
   820                 handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
   894                 handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
   821                 //writer.addData(HS_TRIGGER);
       
   822                 resumeActivity();
   895                 resumeActivity();
   823             } catch (Throwable t) {
   896             } catch (Throwable t) {
   824                 handleError(t);
   897                 handleError(t);
   825             }
   898             }
   826         });
   899         });
   837             // an acknowledgement back. We're calling doHandshake
   910             // an acknowledgement back. We're calling doHandshake
   838             // to finish the close handshake.
   911             // to finish the close handshake.
   839             if (engine.isInboundDone() && !engine.isOutboundDone()) {
   912             if (engine.isInboundDone() && !engine.isOutboundDone()) {
   840                 if (debug.on()) debug.log("doClosure: close_notify received");
   913                 if (debug.on()) debug.log("doClosure: close_notify received");
   841                 close_notify_received = true;
   914                 close_notify_received = true;
   842                 doHandshake(r, READER);
   915                 if (!writer.scheduler.isStopped()) {
       
   916                     doHandshake(r, READER);
       
   917                 } else {
       
   918                     // We have received closed notify, but we
       
   919                     // won't be able to send the acknowledgement.
       
   920                     // Nothing more will come from the socket either,
       
   921                     // so mark the reader as completed.
       
   922                     synchronized (reader.readBufferLock) {
       
   923                         reader.completing = true;
       
   924                     }
       
   925                 }
   843             }
   926             }
   844         }
   927         }
   845         return r;
   928         return r;
   846     }
   929     }
   847 
   930 
   912         SSLEngineResult.Status status() {
   995         SSLEngineResult.Status status() {
   913             return result.getStatus();
   996             return result.getStatus();
   914         }
   997         }
   915     }
   998     }
   916 
   999 
   917     public ByteBuffer getNetBuffer() {
  1000     volatile int packetBufferSize;
   918         return ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
  1001     final ByteBuffer getNetBuffer() {
   919     }
  1002         int netSize = packetBufferSize;
   920 
  1003         if (netSize <= 0) {
   921     private ByteBuffer getAppBuffer() {
  1004             packetBufferSize = netSize = engine.getSession().getPacketBufferSize();
   922         return ByteBuffer.allocate(engine.getSession().getApplicationBufferSize());
  1005         }
       
  1006         return ByteBuffer.allocate(netSize);
       
  1007     }
       
  1008 
       
  1009     volatile int applicationBufferSize;
       
  1010     final ByteBuffer getAppBuffer() {
       
  1011         int appSize = applicationBufferSize;
       
  1012         if (appSize <= 0) {
       
  1013             applicationBufferSize = appSize = engine.getSession().getApplicationBufferSize();
       
  1014         }
       
  1015         return ByteBuffer.allocate(appSize);
   923     }
  1016     }
   924 
  1017 
   925     final String dbgString() {
  1018     final String dbgString() {
   926         return "SSLFlowDelegate(" + tubeName + ")";
  1019         return "SSLFlowDelegate(" + tubeName + ")";
   927     }
  1020     }