src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
branchhttp-client-branch
changeset 56184 1c7b4d7140e2
parent 56101 983e338eeb50
child 56187 61ba287288d9
equal deleted inserted replaced
56183:ec8123e97fad 56184:1c7b4d7140e2
    74  *                         | SSLFlowDelegate  |
    74  *                         | SSLFlowDelegate  |
    75  *        downReader       |                  | upstreamReader
    75  *        downReader       |                  | upstreamReader
    76  *        <--------------- |                  | <--------------
    76  *        <--------------- |                  | <--------------
    77  * supplied to constructor |                  | obtained from this
    77  * supplied to constructor |                  | obtained from this
    78  *                         +------------------+
    78  *                         +------------------+
       
    79  *
       
    80  * Errors are reported to the downReader Flow.Subscriber
       
    81  *
    79  * }
    82  * }
    80  * </pre>
    83  * </pre>
    81  */
    84  */
    82 public class SSLFlowDelegate {
    85 public class SSLFlowDelegate {
    83 
    86 
    88     final Executor exec;
    91     final Executor exec;
    89     final Reader reader;
    92     final Reader reader;
    90     final Writer writer;
    93     final Writer writer;
    91     final SSLEngine engine;
    94     final SSLEngine engine;
    92     final String tubeName; // hack
    95     final String tubeName; // hack
    93     private final CompletableFuture<Void> cf;
       
    94     final CompletableFuture<String> alpnCF; // completes on initial handshake
    96     final CompletableFuture<String> alpnCF; // completes on initial handshake
    95     final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
    97     final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
    96     volatile boolean close_notify_received;
    98     volatile boolean close_notify_received;
       
    99     volatile Flow.Subscriber<?> downReader;
       
   100     static AtomicInteger scount = new AtomicInteger(1);
       
   101     final int id;
    97 
   102 
    98     /**
   103     /**
    99      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
   104      * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
   100      * Flow.Subscriber requires an associated {@link CompletableFuture}
   105      * Flow.Subscriber requires an associated {@link CompletableFuture}
   101      * for errors that need to be signaled from downstream to upstream.
   106      * for errors that need to be signaled from downstream to upstream.
   103     public SSLFlowDelegate(SSLEngine engine,
   108     public SSLFlowDelegate(SSLEngine engine,
   104                            Executor exec,
   109                            Executor exec,
   105                            Subscriber<? super List<ByteBuffer>> downReader,
   110                            Subscriber<? super List<ByteBuffer>> downReader,
   106                            Subscriber<? super List<ByteBuffer>> downWriter)
   111                            Subscriber<? super List<ByteBuffer>> downWriter)
   107     {
   112     {
       
   113         this.id = scount.getAndIncrement();
   108         this.tubeName = String.valueOf(downWriter);
   114         this.tubeName = String.valueOf(downWriter);
   109         this.reader = new Reader();
   115         this.reader = new Reader();
   110         this.writer = new Writer();
   116         this.writer = new Writer();
   111         this.engine = engine;
   117         this.engine = engine;
   112         this.exec = exec;
   118         this.exec = exec;
   113         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
   119         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
   114         CompletableFuture<Void> cs = CompletableFuture.allOf(
   120         CompletableFuture.anyOf(reader.completion(), writer.completion())
   115                 reader.completion(), writer.completion()).thenRun(this::normalStop);
   121             .exceptionally(this::stopOnError);
   116         this.cf = MinimalFuture.of(cs);
   122 
       
   123         CompletableFuture.allOf(reader.completion(), writer.completion())
       
   124             .thenRun(this::normalStop);
   117         this.alpnCF = new MinimalFuture<>();
   125         this.alpnCF = new MinimalFuture<>();
       
   126         this.downReader = downReader;
   118 
   127 
   119         // connect the Reader to the downReader and the
   128         // connect the Reader to the downReader and the
   120         // Writer to the downWriter.
   129         // Writer to the downWriter.
   121         connect(downReader, downWriter);
   130         connect(downReader, downWriter);
   122 
   131 
   123         //Monitor.add(this::monitor);
   132         Monitor.add(this::monitor);
   124     }
   133     }
   125 
   134 
   126     /**
   135     /**
   127      * Returns true if the SSLFlowDelegate has detected a TLS
   136      * Returns true if the SSLFlowDelegate has detected a TLS
   128      * close_notify from the server.
   137      * close_notify from the server.
   142      * @param downWriter  The right hand side write sink (typically
   151      * @param downWriter  The right hand side write sink (typically
   143      *                    the SocketTube write subscriber).
   152      *                    the SocketTube write subscriber).
   144      */
   153      */
   145     void connect(Subscriber<? super List<ByteBuffer>> downReader,
   154     void connect(Subscriber<? super List<ByteBuffer>> downReader,
   146                  Subscriber<? super List<ByteBuffer>> downWriter) {
   155                  Subscriber<? super List<ByteBuffer>> downWriter) {
       
   156         this.downReader = downReader;
   147         this.reader.subscribe(downReader);
   157         this.reader.subscribe(downReader);
   148         this.writer.subscribe(downWriter);
   158         this.writer.subscribe(downWriter);
   149     }
   159     }
   150 
   160 
   151    /**
   161    /**
   166         alpnCF.complete(alpn);
   176         alpnCF.complete(alpn);
   167     }
   177     }
   168 
   178 
   169     public String monitor() {
   179     public String monitor() {
   170         StringBuilder sb = new StringBuilder();
   180         StringBuilder sb = new StringBuilder();
   171         sb.append("SSL: HS state: " + states(handshakeState));
   181         sb.append("SSL: id ").append(id);
       
   182         sb.append(" HS state: " + states(handshakeState));
   172         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
   183         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
   173         sb.append(" LL : ");
   184         sb.append(" LL : ");
   174         for (String s: stateList) {
   185         for (String s: stateList) {
   175             sb.append(s).append(" ");
   186             sb.append(s).append(" ");
   176         }
   187         }
   367                 handleError(ex);
   378                 handleError(ex);
   368             }
   379             }
   369         }
   380         }
   370     }
   381     }
   371 
   382 
   372     /**
       
   373      * Returns a CompletableFuture which completes after all activity
       
   374      * in the delegate is terminated (whether normally or exceptionally).
       
   375      *
       
   376      * @return
       
   377      */
       
   378     public CompletableFuture<Void> completion() {
       
   379         return cf;
       
   380     }
       
   381 
       
   382     public interface Monitorable {
   383     public interface Monitorable {
   383         public String getInfo();
   384         public String getInfo();
   384     }
   385     }
   385 
   386 
   386     public static class Monitor extends Thread {
   387     public static class Monitor extends Thread {
   604         }
   605         }
   605     }
   606     }
   606 
   607 
   607     private void handleError(Throwable t) {
   608     private void handleError(Throwable t) {
   608         debug.log(Level.DEBUG, "handleError", t);
   609         debug.log(Level.DEBUG, "handleError", t);
   609         cf.completeExceptionally(t);
   610         downReader.onError(t);
   610         // no-op if already completed
   611         // no-op if already completed
   611         alpnCF.completeExceptionally(t);
   612         alpnCF.completeExceptionally(t);
   612         reader.stop();
   613         reader.stop();
   613         writer.stop();
   614         writer.stop();
   614     }
   615     }
   615 
   616 
   616     private void normalStop() {
   617     private void normalStop() {
       
   618         stopOnError(null);
       
   619     }
       
   620 
       
   621     private Void stopOnError(Throwable t) {
   617         reader.stop();
   622         reader.stop();
   618         writer.stop();
   623         writer.stop();
       
   624         return null;
   619     }
   625     }
   620 
   626 
   621     private void cleanList(List<ByteBuffer> l) {
   627     private void cleanList(List<ByteBuffer> l) {
   622         synchronized (l) {
   628         synchronized (l) {
   623             Iterator<ByteBuffer> iter = l.iterator();
   629             Iterator<ByteBuffer> iter = l.iterator();