src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
changeset 53350 a47b8125b7cc
parent 52902 e3398b2e1ab0
child 58649 6b6bf0de534b
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Wed Jan 16 10:12:58 2019 -0800
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Wed Jan 16 19:09:16 2019 +0000
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -32,6 +32,7 @@
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import javax.net.ssl.SSLEngineResult.Status;
 import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLHandshakeException;
 import java.io.IOException;
 import java.lang.ref.Reference;
 import java.lang.ref.ReferenceQueue;
@@ -109,6 +110,7 @@
     volatile boolean close_notify_received;
     final CompletableFuture<Void> readerCF;
     final CompletableFuture<Void> writerCF;
+    final CompletableFuture<Void> stopCF;
     final Consumer<ByteBuffer> recycler;
     static AtomicInteger scount = new AtomicInteger(1);
     final int id;
@@ -149,8 +151,7 @@
         this.writerCF = reader.completion();
         readerCF.exceptionally(this::stopOnError);
         writerCF.exceptionally(this::stopOnError);
-
-        CompletableFuture.allOf(reader.completion(), writer.completion())
+        this.stopCF = CompletableFuture.allOf(reader.completion(), writer.completion())
             .thenRun(this::normalStop);
         this.alpnCF = new MinimalFuture<>();
 
@@ -302,7 +303,9 @@
             return "READER: " + super.toString() + ", readBuf: " + readBuf.toString()
                     + ", count: " + count.toString() + ", scheduler: "
                     + (scheduler.isStopped() ? "stopped" : "running")
-                    + ", status: " + lastUnwrapStatus;
+                    + ", status: " + lastUnwrapStatus
+                    + ", handshakeState: " + handshakeState.get()
+                    + ", engine: " + engine.getHandshakeStatus();
         }
 
         private void reallocReadBuf() {
@@ -429,6 +432,8 @@
                         if (complete && result.status() == Status.CLOSED) {
                             if (debugr.on()) debugr.log("Closed: completing");
                             outgoing(Utils.EMPTY_BB_LIST, true);
+                            // complete ALPN if not yet completed
+                            setALPN();
                             return;
                         }
                         if (result.handshaking()) {
@@ -437,11 +442,7 @@
                             if (doHandshake(result, READER)) continue; // need unwrap
                             else break; // doHandshake will have triggered the write scheduler if necessary
                         } else {
-                            if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
-                                handshaking = false;
-                                applicationBufferSize = engine.getSession().getApplicationBufferSize();
-                                packetBufferSize = engine.getSession().getPacketBufferSize();
-                                setALPN();
+                            if (trySetALPN()) {
                                 resumeActivity();
                             }
                         }
@@ -741,6 +742,8 @@
                         if (!upstreamCompleted) {
                             upstreamCompleted = true;
                             upstreamSubscription.cancel();
+                            // complete ALPN if not yet completed
+                            setALPN();
                         }
                         if (result.bytesProduced() <= 0)
                             return;
@@ -758,10 +761,7 @@
                         doHandshake(result, WRITER);  // ok to ignore return
                         handshaking = true;
                     } else {
-                        if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
-                            applicationBufferSize = engine.getSession().getApplicationBufferSize();
-                            packetBufferSize = engine.getSession().getPacketBufferSize();
-                            setALPN();
+                        if (trySetALPN()) {
                             resumeActivity();
                         }
                     }
@@ -914,11 +914,25 @@
         stopped = true;
         reader.stop();
         writer.stop();
+        // make sure the alpnCF is completed.
+        if (!alpnCF.isDone()) {
+            Throwable alpn = new SSLHandshakeException(
+                    "Connection closed before successful ALPN negotiation");
+            alpnCF.completeExceptionally(alpn);
+        }
         if (isMonitored) Monitor.remove(monitor);
     }
 
-    private Void stopOnError(Throwable currentlyUnused) {
+    private Void stopOnError(Throwable error) {
         // maybe log, etc
+        // ensure the ALPN is completed
+        // We could also do this in SSLTube.SSLSubscriberWrapper
+        // onError/onComplete - with the caveat that the ALP CF
+        // would get completed externally. Doing it here keeps
+        // it all inside SSLFlowDelegate.
+        if (!alpnCF.isDone()) {
+            alpnCF.completeExceptionally(error);
+        }
         normalStop();
         return null;
     }
@@ -1070,6 +1084,11 @@
                     }
                 } while (true);
                 if (debug.on()) debug.log("finished task execution");
+                HandshakeStatus hs = engine.getHandshakeStatus();
+                if (hs == HandshakeStatus.FINISHED || hs == HandshakeStatus.NOT_HANDSHAKING) {
+                    // We're no longer handshaking, try setting ALPN
+                    trySetALPN();
+                }
                 resumeActivity();
             } catch (Throwable t) {
                 handleError(t);
@@ -1077,6 +1096,17 @@
         });
     }
 
+    boolean trySetALPN() {
+        // complete ALPN CF if needed.
+        if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
+            applicationBufferSize = engine.getSession().getApplicationBufferSize();
+            packetBufferSize = engine.getSession().getPacketBufferSize();
+            setALPN();
+            return true;
+        }
+        return false;
+    }
+
     // FIXME: acknowledge a received CLOSE request from peer
     EngineResult doClosure(EngineResult r) throws IOException {
         if (debug.on())