src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
changeset 53350 a47b8125b7cc
parent 52902 e3398b2e1ab0
child 58649 6b6bf0de534b
equal deleted inserted replaced
53349:d3aa93570779 53350:a47b8125b7cc
     1 /*
     1 /*
     2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
     2  * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.  Oracle designates this
     7  * published by the Free Software Foundation.  Oracle designates this
    30 import javax.net.ssl.SSLEngine;
    30 import javax.net.ssl.SSLEngine;
    31 import javax.net.ssl.SSLEngineResult;
    31 import javax.net.ssl.SSLEngineResult;
    32 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
    32 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
    33 import javax.net.ssl.SSLEngineResult.Status;
    33 import javax.net.ssl.SSLEngineResult.Status;
    34 import javax.net.ssl.SSLException;
    34 import javax.net.ssl.SSLException;
       
    35 import javax.net.ssl.SSLHandshakeException;
    35 import java.io.IOException;
    36 import java.io.IOException;
    36 import java.lang.ref.Reference;
    37 import java.lang.ref.Reference;
    37 import java.lang.ref.ReferenceQueue;
    38 import java.lang.ref.ReferenceQueue;
    38 import java.lang.ref.WeakReference;
    39 import java.lang.ref.WeakReference;
    39 import java.nio.ByteBuffer;
    40 import java.nio.ByteBuffer;
   107     final CompletableFuture<String> alpnCF; // completes on initial handshake
   108     final CompletableFuture<String> alpnCF; // completes on initial handshake
   108     final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped
   109     final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped
   109     volatile boolean close_notify_received;
   110     volatile boolean close_notify_received;
   110     final CompletableFuture<Void> readerCF;
   111     final CompletableFuture<Void> readerCF;
   111     final CompletableFuture<Void> writerCF;
   112     final CompletableFuture<Void> writerCF;
       
   113     final CompletableFuture<Void> stopCF;
   112     final Consumer<ByteBuffer> recycler;
   114     final Consumer<ByteBuffer> recycler;
   113     static AtomicInteger scount = new AtomicInteger(1);
   115     static AtomicInteger scount = new AtomicInteger(1);
   114     final int id;
   116     final int id;
   115 
   117 
   116     /**
   118     /**
   147         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
   149         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
   148         this.readerCF = reader.completion();
   150         this.readerCF = reader.completion();
   149         this.writerCF = reader.completion();
   151         this.writerCF = reader.completion();
   150         readerCF.exceptionally(this::stopOnError);
   152         readerCF.exceptionally(this::stopOnError);
   151         writerCF.exceptionally(this::stopOnError);
   153         writerCF.exceptionally(this::stopOnError);
   152 
   154         this.stopCF = CompletableFuture.allOf(reader.completion(), writer.completion())
   153         CompletableFuture.allOf(reader.completion(), writer.completion())
       
   154             .thenRun(this::normalStop);
   155             .thenRun(this::normalStop);
   155         this.alpnCF = new MinimalFuture<>();
   156         this.alpnCF = new MinimalFuture<>();
   156 
   157 
   157         // connect the Reader to the downReader and the
   158         // connect the Reader to the downReader and the
   158         // Writer to the downWriter.
   159         // Writer to the downWriter.
   300         @Override
   301         @Override
   301         public String toString() {
   302         public String toString() {
   302             return "READER: " + super.toString() + ", readBuf: " + readBuf.toString()
   303             return "READER: " + super.toString() + ", readBuf: " + readBuf.toString()
   303                     + ", count: " + count.toString() + ", scheduler: "
   304                     + ", count: " + count.toString() + ", scheduler: "
   304                     + (scheduler.isStopped() ? "stopped" : "running")
   305                     + (scheduler.isStopped() ? "stopped" : "running")
   305                     + ", status: " + lastUnwrapStatus;
   306                     + ", status: " + lastUnwrapStatus
       
   307                     + ", handshakeState: " + handshakeState.get()
       
   308                     + ", engine: " + engine.getHandshakeStatus();
   306         }
   309         }
   307 
   310 
   308         private void reallocReadBuf() {
   311         private void reallocReadBuf() {
   309             int sz = readBuf.capacity();
   312             int sz = readBuf.capacity();
   310             ByteBuffer newb = ByteBuffer.allocate(sz * 2);
   313             ByteBuffer newb = ByteBuffer.allocate(sz * 2);
   427                             return;
   430                             return;
   428                         }
   431                         }
   429                         if (complete && result.status() == Status.CLOSED) {
   432                         if (complete && result.status() == Status.CLOSED) {
   430                             if (debugr.on()) debugr.log("Closed: completing");
   433                             if (debugr.on()) debugr.log("Closed: completing");
   431                             outgoing(Utils.EMPTY_BB_LIST, true);
   434                             outgoing(Utils.EMPTY_BB_LIST, true);
       
   435                             // complete ALPN if not yet completed
       
   436                             setALPN();
   432                             return;
   437                             return;
   433                         }
   438                         }
   434                         if (result.handshaking()) {
   439                         if (result.handshaking()) {
   435                             handshaking = true;
   440                             handshaking = true;
   436                             if (debugr.on()) debugr.log("handshaking");
   441                             if (debugr.on()) debugr.log("handshaking");
   437                             if (doHandshake(result, READER)) continue; // need unwrap
   442                             if (doHandshake(result, READER)) continue; // need unwrap
   438                             else break; // doHandshake will have triggered the write scheduler if necessary
   443                             else break; // doHandshake will have triggered the write scheduler if necessary
   439                         } else {
   444                         } else {
   440                             if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
   445                             if (trySetALPN()) {
   441                                 handshaking = false;
       
   442                                 applicationBufferSize = engine.getSession().getApplicationBufferSize();
       
   443                                 packetBufferSize = engine.getSession().getPacketBufferSize();
       
   444                                 setALPN();
       
   445                                 resumeActivity();
   446                                 resumeActivity();
   446                             }
   447                             }
   447                         }
   448                         }
   448                     } catch (IOException ex) {
   449                     } catch (IOException ex) {
   449                         errorCommon(ex);
   450                         errorCommon(ex);
   739 
   740 
   740                     if (result.status() == Status.CLOSED) {
   741                     if (result.status() == Status.CLOSED) {
   741                         if (!upstreamCompleted) {
   742                         if (!upstreamCompleted) {
   742                             upstreamCompleted = true;
   743                             upstreamCompleted = true;
   743                             upstreamSubscription.cancel();
   744                             upstreamSubscription.cancel();
       
   745                             // complete ALPN if not yet completed
       
   746                             setALPN();
   744                         }
   747                         }
   745                         if (result.bytesProduced() <= 0)
   748                         if (result.bytesProduced() <= 0)
   746                             return;
   749                             return;
   747 
   750 
   748                         if (!completing && !completed) {
   751                         if (!completing && !completed) {
   756                     if (result.handshaking()) {
   759                     if (result.handshaking()) {
   757                         if (debugw.on()) debugw.log("handshaking");
   760                         if (debugw.on()) debugw.log("handshaking");
   758                         doHandshake(result, WRITER);  // ok to ignore return
   761                         doHandshake(result, WRITER);  // ok to ignore return
   759                         handshaking = true;
   762                         handshaking = true;
   760                     } else {
   763                     } else {
   761                         if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
   764                         if (trySetALPN()) {
   762                             applicationBufferSize = engine.getSession().getApplicationBufferSize();
       
   763                             packetBufferSize = engine.getSession().getPacketBufferSize();
       
   764                             setALPN();
       
   765                             resumeActivity();
   765                             resumeActivity();
   766                         }
   766                         }
   767                     }
   767                     }
   768                     cleanList(writeList); // tidy up the source list
   768                     cleanList(writeList); // tidy up the source list
   769                     sendResultBytes(result);
   769                     sendResultBytes(result);
   912         if (stopped)
   912         if (stopped)
   913             return;
   913             return;
   914         stopped = true;
   914         stopped = true;
   915         reader.stop();
   915         reader.stop();
   916         writer.stop();
   916         writer.stop();
       
   917         // make sure the alpnCF is completed.
       
   918         if (!alpnCF.isDone()) {
       
   919             Throwable alpn = new SSLHandshakeException(
       
   920                     "Connection closed before successful ALPN negotiation");
       
   921             alpnCF.completeExceptionally(alpn);
       
   922         }
   917         if (isMonitored) Monitor.remove(monitor);
   923         if (isMonitored) Monitor.remove(monitor);
   918     }
   924     }
   919 
   925 
   920     private Void stopOnError(Throwable currentlyUnused) {
   926     private Void stopOnError(Throwable error) {
   921         // maybe log, etc
   927         // maybe log, etc
       
   928         // ensure the ALPN is completed
       
   929         // We could also do this in SSLTube.SSLSubscriberWrapper
       
   930         // onError/onComplete - with the caveat that the ALP CF
       
   931         // would get completed externally. Doing it here keeps
       
   932         // it all inside SSLFlowDelegate.
       
   933         if (!alpnCF.isDone()) {
       
   934             alpnCF.completeExceptionally(error);
       
   935         }
   922         normalStop();
   936         normalStop();
   923         return null;
   937         return null;
   924     }
   938     }
   925 
   939 
   926     private void cleanList(List<ByteBuffer> l) {
   940     private void cleanList(List<ByteBuffer> l) {
  1068                         }
  1082                         }
  1069                         break;
  1083                         break;
  1070                     }
  1084                     }
  1071                 } while (true);
  1085                 } while (true);
  1072                 if (debug.on()) debug.log("finished task execution");
  1086                 if (debug.on()) debug.log("finished task execution");
       
  1087                 HandshakeStatus hs = engine.getHandshakeStatus();
       
  1088                 if (hs == HandshakeStatus.FINISHED || hs == HandshakeStatus.NOT_HANDSHAKING) {
       
  1089                     // We're no longer handshaking, try setting ALPN
       
  1090                     trySetALPN();
       
  1091                 }
  1073                 resumeActivity();
  1092                 resumeActivity();
  1074             } catch (Throwable t) {
  1093             } catch (Throwable t) {
  1075                 handleError(t);
  1094                 handleError(t);
  1076             }
  1095             }
  1077         });
  1096         });
       
  1097     }
       
  1098 
       
  1099     boolean trySetALPN() {
       
  1100         // complete ALPN CF if needed.
       
  1101         if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
       
  1102             applicationBufferSize = engine.getSession().getApplicationBufferSize();
       
  1103             packetBufferSize = engine.getSession().getPacketBufferSize();
       
  1104             setALPN();
       
  1105             return true;
       
  1106         }
       
  1107         return false;
  1078     }
  1108     }
  1079 
  1109 
  1080     // FIXME: acknowledge a received CLOSE request from peer
  1110     // FIXME: acknowledge a received CLOSE request from peer
  1081     EngineResult doClosure(EngineResult r) throws IOException {
  1111     EngineResult doClosure(EngineResult r) throws IOException {
  1082         if (debug.on())
  1112         if (debug.on())