src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
changeset 50985 cd41f34e548c
parent 50681 4254bed3c09d
child 52902 e3398b2e1ab0
child 56833 be0819373531
equal deleted inserted replaced
50984:f1f4b8cd0192 50985:cd41f34e548c
    31 import javax.net.ssl.SSLEngineResult;
    31 import javax.net.ssl.SSLEngineResult;
    32 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
    32 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
    33 import javax.net.ssl.SSLEngineResult.Status;
    33 import javax.net.ssl.SSLEngineResult.Status;
    34 import javax.net.ssl.SSLException;
    34 import javax.net.ssl.SSLException;
    35 import java.io.IOException;
    35 import java.io.IOException;
       
    36 import java.lang.ref.Reference;
       
    37 import java.lang.ref.ReferenceQueue;
       
    38 import java.lang.ref.WeakReference;
    36 import java.nio.ByteBuffer;
    39 import java.nio.ByteBuffer;
    37 import java.util.ArrayList;
    40 import java.util.ArrayList;
    38 import java.util.Collections;
    41 import java.util.Collections;
    39 import java.util.Iterator;
    42 import java.util.Iterator;
    40 import java.util.LinkedList;
    43 import java.util.LinkedList;
    91     private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
    94     private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
    92     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
    95     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
    93     // When handshake is in progress trying to wrap may produce no bytes.
    96     // When handshake is in progress trying to wrap may produce no bytes.
    94     private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
    97     private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
    95     private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
    98     private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
       
    99     private static final boolean isMonitored =
       
   100             monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true"));
    96 
   101 
    97     final Executor exec;
   102     final Executor exec;
    98     final Reader reader;
   103     final Reader reader;
    99     final Writer writer;
   104     final Writer writer;
   100     final SSLEngine engine;
   105     final SSLEngine engine;
   101     final String tubeName; // hack
   106     final String tubeName; // hack
   102     final CompletableFuture<String> alpnCF; // completes on initial handshake
   107     final CompletableFuture<String> alpnCF; // completes on initial handshake
       
   108     final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped
   103     volatile boolean close_notify_received;
   109     volatile boolean close_notify_received;
   104     final CompletableFuture<Void> readerCF;
   110     final CompletableFuture<Void> readerCF;
   105     final CompletableFuture<Void> writerCF;
   111     final CompletableFuture<Void> writerCF;
   106     final Consumer<ByteBuffer> recycler;
   112     final Consumer<ByteBuffer> recycler;
   107     static AtomicInteger scount = new AtomicInteger(1);
   113     static AtomicInteger scount = new AtomicInteger(1);
   150 
   156 
   151         // connect the Reader to the downReader and the
   157         // connect the Reader to the downReader and the
   152         // Writer to the downWriter.
   158         // Writer to the downWriter.
   153         connect(downReader, downWriter);
   159         connect(downReader, downWriter);
   154 
   160 
   155         if (monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true")))
   161         if (isMonitored) Monitor.add(monitor);
   156             Monitor.add(this::monitor);
       
   157     }
   162     }
   158 
   163 
   159     /**
   164     /**
   160      * Returns true if the SSLFlowDelegate has detected a TLS
   165      * Returns true if the SSLFlowDelegate has detected a TLS
   161      * close_notify from the server.
   166      * close_notify from the server.
   200     }
   205     }
   201 
   206 
   202     public String monitor() {
   207     public String monitor() {
   203         StringBuilder sb = new StringBuilder();
   208         StringBuilder sb = new StringBuilder();
   204         sb.append("SSL: id ").append(id);
   209         sb.append("SSL: id ").append(id);
       
   210         sb.append(" ").append(dbgString());
   205         sb.append(" HS state: " + states(handshakeState));
   211         sb.append(" HS state: " + states(handshakeState));
   206         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
   212         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
   207         if (stateList != null) {
   213         if (stateList != null) {
   208             sb.append(" LL : ");
   214             sb.append(" LL : ");
   209             for (String s : stateList) {
   215             for (String s : stateList) {
   291             scheduler.runOrSchedule(exec);
   297             scheduler.runOrSchedule(exec);
   292         }
   298         }
   293 
   299 
   294         @Override
   300         @Override
   295         public String toString() {
   301         public String toString() {
   296             return "READER: " + super.toString() + " readBuf: " + readBuf.toString()
   302             return "READER: " + super.toString() + ", readBuf: " + readBuf.toString()
   297                     + " count: " + count.toString();
   303                     + ", count: " + count.toString() + ", scheduler: "
       
   304                     + (scheduler.isStopped() ? "stopped" : "running")
       
   305                     + ", status: " + lastUnwrapStatus;
   298         }
   306         }
   299 
   307 
   300         private void reallocReadBuf() {
   308         private void reallocReadBuf() {
   301             int sz = readBuf.capacity();
   309             int sz = readBuf.capacity();
   302             ByteBuffer newb = ByteBuffer.allocate(sz * 2);
   310             ByteBuffer newb = ByteBuffer.allocate(sz * 2);
   333                     // WARNING: do not touch buf after this point!
   341                     // WARNING: do not touch buf after this point!
   334                     if (recycler != null) recycler.accept(buf);
   342                     if (recycler != null) recycler.accept(buf);
   335                 }
   343                 }
   336                 if (complete) {
   344                 if (complete) {
   337                     this.completing = complete;
   345                     this.completing = complete;
       
   346                     minBytesRequired = 0;
   338                 }
   347                 }
   339             }
   348             }
   340         }
   349         }
   341 
   350 
   342         void schedule() {
   351         void schedule() {
   393                         if (result.status() == Status.BUFFER_UNDERFLOW) {
   402                         if (result.status() == Status.BUFFER_UNDERFLOW) {
   394                             if (debugr.on()) debugr.log("BUFFER_UNDERFLOW");
   403                             if (debugr.on()) debugr.log("BUFFER_UNDERFLOW");
   395                             // not enough data in the read buffer...
   404                             // not enough data in the read buffer...
   396                             // no need to try to unwrap again unless we get more bytes
   405                             // no need to try to unwrap again unless we get more bytes
   397                             // than minBytesRequired = len in the read buffer.
   406                             // than minBytesRequired = len in the read buffer.
   398                             minBytesRequired = len;
       
   399                             synchronized (readBufferLock) {
   407                             synchronized (readBufferLock) {
       
   408                                 minBytesRequired = len;
   400                                 // more bytes could already have been added...
   409                                 // more bytes could already have been added...
   401                                 assert readBuf.remaining() >= len;
   410                                 assert readBuf.remaining() >= len;
   402                                 // check if we have received some data, and if so
   411                                 // check if we have received some data, and if so
   403                                 // we can just re-spin the loop
   412                                 // we can just re-spin the loop
   404                                 if (readBuf.remaining() > len) continue;
   413                                 if (readBuf.remaining() > len) continue;
       
   414                                 else if (this.completing) {
       
   415                                     if (debug.on()) {
       
   416                                         debugr.log("BUFFER_UNDERFLOW with EOF," +
       
   417                                                 " %d bytes non decrypted.", len);
       
   418                                     }
       
   419                                     // The channel won't send us any more data, and
       
   420                                     // we are in underflow: we need to fail.
       
   421                                     throw new IOException("BUFFER_UNDERFLOW with EOF, "
       
   422                                             + len + " bytes non decrypted.");
       
   423                                 }
   405                             }
   424                             }
   406                             // request more data and return.
   425                             // request more data and return.
   407                             requestMore();
   426                             requestMore();
   408                             return;
   427                             return;
   409                         }
   428                         }
   427                             }
   446                             }
   428                         }
   447                         }
   429                     } catch (IOException ex) {
   448                     } catch (IOException ex) {
   430                         errorCommon(ex);
   449                         errorCommon(ex);
   431                         handleError(ex);
   450                         handleError(ex);
       
   451                         return;
   432                     }
   452                     }
   433                     if (handshaking && !complete)
   453                     if (handshaking && !complete)
   434                         return;
   454                         return;
   435                 }
   455                 }
   436                 if (!complete) {
   456                 if (!complete) {
   450                 errorCommon(ex);
   470                 errorCommon(ex);
   451                 handleError(ex);
   471                 handleError(ex);
   452             }
   472             }
   453         }
   473         }
   454 
   474 
       
   475         private volatile Status lastUnwrapStatus;
   455         EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
   476         EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
   456             ByteBuffer dst = getAppBuffer();
   477             ByteBuffer dst = getAppBuffer();
   457             int len = src.remaining();
   478             int len = src.remaining();
   458             while (true) {
   479             while (true) {
   459                 SSLEngineResult sslResult = engine.unwrap(src, dst);
   480                 SSLEngineResult sslResult = engine.unwrap(src, dst);
   460                 switch (sslResult.getStatus()) {
   481                 switch (lastUnwrapStatus = sslResult.getStatus()) {
   461                     case BUFFER_OVERFLOW:
   482                     case BUFFER_OVERFLOW:
   462                         // may happen if app size buffer was changed, or if
   483                         // may happen if app size buffer was changed, or if
   463                         // our 'adaptiveBufferSize' guess was too small for
   484                         // our 'adaptiveBufferSize' guess was too small for
   464                         // the current payload. In that case, update the
   485                         // the current payload. In that case, update the
   465                         // value of applicationBufferSize, and allocate a
   486                         // value of applicationBufferSize, and allocate a
   505     public interface Monitorable {
   526     public interface Monitorable {
   506         public String getInfo();
   527         public String getInfo();
   507     }
   528     }
   508 
   529 
   509     public static class Monitor extends Thread {
   530     public static class Monitor extends Thread {
   510         final List<Monitorable> list;
   531         final List<WeakReference<Monitorable>> list;
       
   532         final List<FinalMonitorable> finalList;
       
   533         final ReferenceQueue<Monitorable> queue = new ReferenceQueue<>();
   511         static Monitor themon;
   534         static Monitor themon;
   512 
   535 
   513         static {
   536         static {
   514             themon = new Monitor();
   537             themon = new Monitor();
   515             themon.start(); // uncomment to enable Monitor
   538             themon.start(); // uncomment to enable Monitor
       
   539         }
       
   540 
       
   541         // An instance used to temporarily store the
       
   542         // last observable state of a monitorable object.
       
   543         // When Monitor.remove(o) is called, we replace
       
   544         // 'o' with a FinalMonitorable whose reference
       
   545         // will be enqueued after the last observable state
       
   546         // has been printed.
       
   547         final class FinalMonitorable implements Monitorable {
       
   548             final String finalState;
       
   549             FinalMonitorable(Monitorable o) {
       
   550                 finalState = o.getInfo();
       
   551                 finalList.add(this);
       
   552             }
       
   553             @Override
       
   554             public String getInfo() {
       
   555                 finalList.remove(this);
       
   556                 return finalState;
       
   557             }
   516         }
   558         }
   517 
   559 
   518         Monitor() {
   560         Monitor() {
   519             super("Monitor");
   561             super("Monitor");
   520             setDaemon(true);
   562             setDaemon(true);
   521             list = Collections.synchronizedList(new LinkedList<>());
   563             list = Collections.synchronizedList(new LinkedList<>());
       
   564             finalList = new ArrayList<>(); // access is synchronized on list above
   522         }
   565         }
   523 
   566 
   524         void addTarget(Monitorable o) {
   567         void addTarget(Monitorable o) {
   525             list.add(o);
   568             list.add(new WeakReference<>(o, queue));
       
   569         }
       
   570         void removeTarget(Monitorable o) {
       
   571             // It can take a long time for GC to clean up references.
       
   572             // Calling Monitor.remove() early helps removing noise from the
       
   573             // logs/
       
   574             synchronized (list) {
       
   575                 Iterator<WeakReference<Monitorable>> it = list.iterator();
       
   576                 while (it.hasNext()) {
       
   577                     Monitorable m = it.next().get();
       
   578                     if (m == null) it.remove();
       
   579                     if (o == m) {
       
   580                         it.remove();
       
   581                         break;
       
   582                     }
       
   583                 }
       
   584                 FinalMonitorable m = new FinalMonitorable(o);
       
   585                 addTarget(m);
       
   586                 Reference.reachabilityFence(m);
       
   587             }
   526         }
   588         }
   527 
   589 
   528         public static void add(Monitorable o) {
   590         public static void add(Monitorable o) {
   529             themon.addTarget(o);
   591             themon.addTarget(o);
       
   592         }
       
   593         public static void remove(Monitorable o) {
       
   594             themon.removeTarget(o);
   530         }
   595         }
   531 
   596 
   532         @Override
   597         @Override
   533         public void run() {
   598         public void run() {
   534             System.out.println("Monitor starting");
   599             System.out.println("Monitor starting");
   535             try {
   600             try {
   536                 while (true) {
   601                 while (true) {
   537                     Thread.sleep(20 * 1000);
   602                     Thread.sleep(20 * 1000);
   538                     synchronized (list) {
   603                     synchronized (list) {
   539                         for (Monitorable o : list) {
   604                         Reference<? extends Monitorable> expired;
       
   605                         while ((expired = queue.poll()) != null) list.remove(expired);
       
   606                         for (WeakReference<Monitorable> ref : list) {
       
   607                             Monitorable o = ref.get();
       
   608                             if (o == null) continue;
       
   609                             if (o instanceof FinalMonitorable) {
       
   610                                 ref.enqueue();
       
   611                             }
   540                             System.out.println(o.getInfo());
   612                             System.out.println(o.getInfo());
   541                             System.out.println("-------------------------");
   613                             System.out.println("-------------------------");
   542                         }
   614                         }
   543                     }
   615                     }
   544                     System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-");
   616                     System.out.println("--o-o-o-o-o-o-o-o-o-o-o-o-o-o-");
   731         // If the SSLEngine produces less than writeBuffer.capacity() / 2,
   803         // If the SSLEngine produces less than writeBuffer.capacity() / 2,
   732         // then we copy off the bytes to a smaller buffer that we send
   804         // then we copy off the bytes to a smaller buffer that we send
   733         // downstream. Otherwise, we send the writeBuffer downstream
   805         // downstream. Otherwise, we send the writeBuffer downstream
   734         // and will allocate a new one next time.
   806         // and will allocate a new one next time.
   735         volatile ByteBuffer writeBuffer;
   807         volatile ByteBuffer writeBuffer;
       
   808         private volatile Status lastWrappedStatus;
   736         @SuppressWarnings("fallthrough")
   809         @SuppressWarnings("fallthrough")
   737         EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
   810         EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
   738             long len = Utils.remaining(src);
   811             long len = Utils.remaining(src);
   739             if (debugw.on())
   812             if (debugw.on())
   740                 debugw.log("wrapping " + len + " bytes");
   813                 debugw.log("wrapping " + len + " bytes");
   745             assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity();
   818             assert dst.hasRemaining() : "buffer has no remaining space: capacity=" + dst.capacity();
   746 
   819 
   747             while (true) {
   820             while (true) {
   748                 SSLEngineResult sslResult = engine.wrap(src, dst);
   821                 SSLEngineResult sslResult = engine.wrap(src, dst);
   749                 if (debugw.on()) debugw.log("SSLResult: " + sslResult);
   822                 if (debugw.on()) debugw.log("SSLResult: " + sslResult);
   750                 switch (sslResult.getStatus()) {
   823                 switch (lastWrappedStatus = sslResult.getStatus()) {
   751                     case BUFFER_OVERFLOW:
   824                     case BUFFER_OVERFLOW:
   752                         // Shouldn't happen. We allocated buffer with packet size
   825                         // Shouldn't happen. We allocated buffer with packet size
   753                         // get it again if net buffer size was changed
   826                         // get it again if net buffer size was changed
   754                         if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
   827                         if (debugw.on()) debugw.log("BUFFER_OVERFLOW");
   755                         int netSize = packetBufferSize
   828                         int netSize = packetBufferSize
   813             }
   886             }
   814         }
   887         }
   815 
   888 
   816         @Override
   889         @Override
   817         public String toString() {
   890         public String toString() {
   818             return "WRITER: " + super.toString() +
   891             return "WRITER: " + super.toString()
   819                     " writeList size " + Integer.toString(writeList.size());
   892                     + ", writeList size: " + Integer.toString(writeList.size())
       
   893                     + ", scheduler: " + (scheduler.isStopped() ? "stopped" : "running")
       
   894                     + ", status: " + lastWrappedStatus;
   820                     //" writeList: " + writeList.toString();
   895                     //" writeList: " + writeList.toString();
   821         }
   896         }
   822     }
   897     }
   823 
   898 
   824     private void handleError(Throwable t) {
   899     private void handleError(Throwable t) {
   837         if (stopped)
   912         if (stopped)
   838             return;
   913             return;
   839         stopped = true;
   914         stopped = true;
   840         reader.stop();
   915         reader.stop();
   841         writer.stop();
   916         writer.stop();
       
   917         if (isMonitored) Monitor.remove(monitor);
   842     }
   918     }
   843 
   919 
   844     private Void stopOnError(Throwable currentlyUnused) {
   920     private Void stopOnError(Throwable currentlyUnused) {
   845         // maybe log, etc
   921         // maybe log, etc
   846         normalStop();
   922         normalStop();
   951                 break;
  1027                 break;
   952             case NEED_UNWRAP:
  1028             case NEED_UNWRAP:
   953             case NEED_UNWRAP_AGAIN:
  1029             case NEED_UNWRAP_AGAIN:
   954                 // do nothing else
  1030                 // do nothing else
   955                 // receiving-side data will trigger unwrap
  1031                 // receiving-side data will trigger unwrap
       
  1032                 if (caller == WRITER) {
       
  1033                     reader.schedule();
       
  1034                     return false;
       
  1035                 }
   956                 break;
  1036                 break;
   957             default:
  1037             default:
   958                 throw new InternalError("Unexpected handshake status:"
  1038                 throw new InternalError("Unexpected handshake status:"
   959                                         + r.handshakeStatus());
  1039                                         + r.handshakeStatus());
   960         }
  1040         }