8217094: HttpClient SSL race if a socket IOException is raised before ALPN is available jdk-13+4
authordfuchs
Wed, 16 Jan 2019 19:09:16 +0000
changeset 53350 a47b8125b7cc
parent 53349 d3aa93570779
child 53360 58e25974ede4
8217094: HttpClient SSL race if a socket IOException is raised before ALPN is available Summary: The patch makes suer that the SSLFlowDelegate's ALPN CF is always completed Reviewed-by: chegar
src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java
src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java
test/jdk/java/net/httpclient/ALPNFailureTest.java
test/jdk/java/net/httpclient/ALPNProxyFailureTest.java
test/jdk/java/net/httpclient/DigestEchoServer.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Wed Jan 16 10:12:58 2019 -0800
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Wed Jan 16 19:09:16 2019 +0000
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -32,6 +32,7 @@
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import javax.net.ssl.SSLEngineResult.Status;
 import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLHandshakeException;
 import java.io.IOException;
 import java.lang.ref.Reference;
 import java.lang.ref.ReferenceQueue;
@@ -109,6 +110,7 @@
     volatile boolean close_notify_received;
     final CompletableFuture<Void> readerCF;
     final CompletableFuture<Void> writerCF;
+    final CompletableFuture<Void> stopCF;
     final Consumer<ByteBuffer> recycler;
     static AtomicInteger scount = new AtomicInteger(1);
     final int id;
@@ -149,8 +151,7 @@
         this.writerCF = reader.completion();
         readerCF.exceptionally(this::stopOnError);
         writerCF.exceptionally(this::stopOnError);
-
-        CompletableFuture.allOf(reader.completion(), writer.completion())
+        this.stopCF = CompletableFuture.allOf(reader.completion(), writer.completion())
             .thenRun(this::normalStop);
         this.alpnCF = new MinimalFuture<>();
 
@@ -302,7 +303,9 @@
             return "READER: " + super.toString() + ", readBuf: " + readBuf.toString()
                     + ", count: " + count.toString() + ", scheduler: "
                     + (scheduler.isStopped() ? "stopped" : "running")
-                    + ", status: " + lastUnwrapStatus;
+                    + ", status: " + lastUnwrapStatus
+                    + ", handshakeState: " + handshakeState.get()
+                    + ", engine: " + engine.getHandshakeStatus();
         }
 
         private void reallocReadBuf() {
@@ -429,6 +432,8 @@
                         if (complete && result.status() == Status.CLOSED) {
                             if (debugr.on()) debugr.log("Closed: completing");
                             outgoing(Utils.EMPTY_BB_LIST, true);
+                            // complete ALPN if not yet completed
+                            setALPN();
                             return;
                         }
                         if (result.handshaking()) {
@@ -437,11 +442,7 @@
                             if (doHandshake(result, READER)) continue; // need unwrap
                             else break; // doHandshake will have triggered the write scheduler if necessary
                         } else {
-                            if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
-                                handshaking = false;
-                                applicationBufferSize = engine.getSession().getApplicationBufferSize();
-                                packetBufferSize = engine.getSession().getPacketBufferSize();
-                                setALPN();
+                            if (trySetALPN()) {
                                 resumeActivity();
                             }
                         }
@@ -741,6 +742,8 @@
                         if (!upstreamCompleted) {
                             upstreamCompleted = true;
                             upstreamSubscription.cancel();
+                            // complete ALPN if not yet completed
+                            setALPN();
                         }
                         if (result.bytesProduced() <= 0)
                             return;
@@ -758,10 +761,7 @@
                         doHandshake(result, WRITER);  // ok to ignore return
                         handshaking = true;
                     } else {
-                        if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
-                            applicationBufferSize = engine.getSession().getApplicationBufferSize();
-                            packetBufferSize = engine.getSession().getPacketBufferSize();
-                            setALPN();
+                        if (trySetALPN()) {
                             resumeActivity();
                         }
                     }
@@ -914,11 +914,25 @@
         stopped = true;
         reader.stop();
         writer.stop();
+        // make sure the alpnCF is completed.
+        if (!alpnCF.isDone()) {
+            Throwable alpn = new SSLHandshakeException(
+                    "Connection closed before successful ALPN negotiation");
+            alpnCF.completeExceptionally(alpn);
+        }
         if (isMonitored) Monitor.remove(monitor);
     }
 
-    private Void stopOnError(Throwable currentlyUnused) {
+    private Void stopOnError(Throwable error) {
         // maybe log, etc
+        // ensure the ALPN is completed
+        // We could also do this in SSLTube.SSLSubscriberWrapper
+        // onError/onComplete - with the caveat that the ALP CF
+        // would get completed externally. Doing it here keeps
+        // it all inside SSLFlowDelegate.
+        if (!alpnCF.isDone()) {
+            alpnCF.completeExceptionally(error);
+        }
         normalStop();
         return null;
     }
@@ -1070,6 +1084,11 @@
                     }
                 } while (true);
                 if (debug.on()) debug.log("finished task execution");
+                HandshakeStatus hs = engine.getHandshakeStatus();
+                if (hs == HandshakeStatus.FINISHED || hs == HandshakeStatus.NOT_HANDSHAKING) {
+                    // We're no longer handshaking, try setting ALPN
+                    trySetALPN();
+                }
                 resumeActivity();
             } catch (Throwable t) {
                 handleError(t);
@@ -1077,6 +1096,17 @@
         });
     }
 
+    boolean trySetALPN() {
+        // complete ALPN CF if needed.
+        if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
+            applicationBufferSize = engine.getSession().getApplicationBufferSize();
+            packetBufferSize = engine.getSession().getPacketBufferSize();
+            setALPN();
+            return true;
+        }
+        return false;
+    }
+
     // FIXME: acknowledge a received CLOSE request from peer
     EngineResult doClosure(EngineResult r) throws IOException {
         if (debug.on())
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java	Wed Jan 16 10:12:58 2019 -0800
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java	Wed Jan 16 19:09:16 2019 +0000
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -274,7 +274,11 @@
 
         @Override
         public String toString() {
-            return "DelegateWrapper:" + delegate.toString();
+            return "DelegateWrapper[subscribedCalled: " + subscribedCalled
+                    +", subscribedDone: " + subscribedDone
+                    +", completed: " + completed
+                    +", error: " + error
+                    +"]: " + delegate;
         }
 
     }
@@ -288,6 +292,20 @@
         private final AtomicReference<Throwable> errorRef
                 = new AtomicReference<>();
 
+        @Override
+        public String toString() {
+            DelegateWrapper sub = subscribed;
+            DelegateWrapper pend = pendingDelegate.get();
+            // Though final sslFD may be null if called from within
+            // SSLFD::connect() as SSLTube is not fully constructed yet.
+            SSLFlowDelegate sslFD = sslDelegate;
+            return "SSLSubscriberWrapper[" + SSLTube.this
+                    + ", delegate: " + (sub == null ? pend  :sub)
+                    + ", getALPN: " + (sslFD == null ? null : sslFD.alpn())
+                    + ", onCompleteReceived: " + onCompleteReceived
+                    + ", onError: " + errorRef.get() + "]";
+        }
+
         // setDelegate can be called asynchronously when the SSLTube flow
         // is connected. At this time the permanent subscriber (this class)
         // may already be subscribed (readSubscription != null) or not.
@@ -319,6 +337,9 @@
                     debug.log("SSLSubscriberWrapper (reader) no subscription yet");
                 return;
             }
+            // sslDelegate field should have been initialized by the
+            // the time we reach here, as there can be no subscriber
+            // until SSLTube is fully constructed.
             if (handleNow || !sslDelegate.resumeReader()) {
                 processPendingSubscriber();
             }
@@ -429,7 +450,8 @@
             Throwable failed;
             boolean completed;
             // reset any demand that may have been made by the previous
-            // subscriber
+            // subscriber. sslDelegate field should have been initialized,
+            // since we only reach here when there is a subscriber.
             sslDelegate.resetReaderDemand();
             // send the subscription to the subscriber.
             subscriberImpl.onSubscribe(subscription);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java	Wed Jan 16 10:12:58 2019 -0800
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java	Wed Jan 16 19:09:16 2019 +0000
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -468,7 +468,8 @@
           .append(" outputQ size: ").append(Integer.toString(outputQ.size()))
           //.append(" outputQ: ").append(outputQ.toString())
           .append(" cf: ").append(cf.toString())
-          .append(" downstreamSubscription: ").append(downstreamSubscription.toString());
+          .append(" downstreamSubscription: ").append(downstreamSubscription)
+          .append(" downstreamSubscriber: ").append(downstreamSubscriber);
 
         return sb.toString();
     }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/ALPNFailureTest.java	Wed Jan 16 19:09:16 2019 +0000
@@ -0,0 +1,197 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @summary This test will timeout if the ALPN CF is not completed
+ *          when a 'Connection reset by peer' exception is raised
+ *          during the handshake.
+ * @bug 8217094
+ * @modules java.net.http
+ *          java.logging
+ * @build ALPNFailureTest
+ * @run main/othervm -Djdk.internal.httpclient.debug=true ALPNFailureTest HTTP_1_1
+ * @run main/othervm ALPNFailureTest HTTP_2
+ */
+import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLContext;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.ProxySelector;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.net.http.HttpTimeoutException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ALPNFailureTest {
+
+
+    public static void main(String[] args) throws Exception{
+        if (args == null || args.length == 0) {
+            args = new String[] {HttpClient.Version.HTTP_1_1.name()};
+        }
+        ServerSocket socket = ServerSocketFactory.getDefault()
+                .createServerSocket(0, 10, InetAddress.getLoopbackAddress());
+
+        test(socket, null, null, args);
+    }
+
+    public static void test(ServerSocket socket, SSLContext context,
+                            ProxySelector ps, String... args)
+            throws Exception
+    {
+        System.out.println("Tests a race condition in SSLTube/SSLFlowDelegate");
+        System.out.println("This test will timeout if the ALPN CF is not completed" +
+                " when a 'Connection reset by peer' exception is raised" +
+                " during the handshake - see 8217094.");
+
+        URI uri = new URI("https", null,
+                socket.getInetAddress().getHostAddress(), socket.getLocalPort(),
+                "/ReadOnlyServer/https_1_1/", null, null);
+        HttpRequest request1 = HttpRequest.newBuilder(uri)
+                .GET().build();
+        HttpRequest request2 = HttpRequest.newBuilder(uri)
+                .POST(HttpRequest.BodyPublishers.ofString("foo")).build();
+
+        ReadOnlyServer server = new ReadOnlyServer(socket);
+        Thread serverThread = new Thread(server, "ServerThread");
+        serverThread.start();
+        try {
+            for (var arg : args) {
+                var version = HttpClient.Version.valueOf(arg);
+                HttpClient.Builder builder = HttpClient.newBuilder()
+                        .version(version);
+                if (ps != null) builder.proxy(ps);
+                if (context != null) builder.sslContext(context);
+
+                HttpClient client = builder.build();
+                for (var request : List.of(request1, request2)) {
+                    System.out.println("Server is " + socket.getLocalSocketAddress()
+                            + ", Version is " + version + ", Method is " + request.method()
+                            + (ps == null ? ", no proxy"
+                            : (", Proxy is " + ps.select(request.uri()))));
+                    try {
+                        HttpResponse<String> resp =
+                                client.send(request, HttpResponse.BodyHandlers.ofString());
+                        throw new AssertionError(
+                                "Client should not have received any response: " + resp);
+                    } catch (HttpTimeoutException x) {
+                        System.out.println("Unexpected " + x);
+                        x.printStackTrace();
+                        throw new AssertionError("Unexpected exception " + x, x);
+                    } catch (Exception x) {
+                        // We expect IOException("Connection reset by peer"), but
+                        // any exception would do: we just don't want to linger
+                        // forever.
+                        System.err.println("Client got expected exception: " + x);
+                        x.printStackTrace(System.out);
+                    }
+                }
+            }
+        } finally {
+            server.close();
+        }
+    }
+
+    public static class ReadOnlyServer  implements Runnable, Closeable {
+        final ServerSocket socket;
+        final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+        final AtomicBoolean closing = new AtomicBoolean();
+        ReadOnlyServer(ServerSocket socket) {
+            this.socket = socket;
+        }
+
+        @Override
+        public void run() {
+            int count = 0;
+            int all = 0;
+            try {
+                System.out.println("Server starting");
+                while (!closing.get()) {
+                    all += count;
+                    count = 0;
+                    try (Socket client = socket.accept()) {
+                        client.setSoTimeout(1000);
+                        client.setOption(StandardSocketOptions.SO_LINGER, 0);
+                        InputStream is = client.getInputStream();
+                        OutputStream os = client.getOutputStream();
+                        boolean drain = true;
+                        int timeouts = 0;
+                        // now read some byte from the ClientHello
+                        // and abruptly close the socket.
+                        while (drain) {
+                            try {
+                                is.read();
+                                count++;
+                                if (count >= 50) {
+                                    drain = false;
+                                }
+                            } catch (SocketTimeoutException so) {
+                                // make sure we read something
+                                if (count > 0) timeouts++;
+                                if (timeouts == 5) {
+                                    // presumably the client is
+                                    // waiting for us to answer...
+                                    // but we should not reach here.
+                                    drain = false;
+                                }
+                            }
+                        }
+                        System.out.println("Got " + count + " bytes");
+                    }
+                }
+            } catch (Throwable t) {
+                if (!closing.get()) {
+                    errorRef.set(t);
+                    t.printStackTrace();
+                }
+            } finally {
+                System.out.println("Server existing after reading " + (all + count) + " bytes");
+                close();
+            }
+
+        }
+
+        @Override
+        public void close() {
+            if (closing.getAndSet(true))
+                return; // already closed
+            try {
+                socket.close();
+            } catch (IOException x) {
+                System.out.println("Exception while closing: " + x);
+            }
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/ALPNProxyFailureTest.java	Wed Jan 16 19:09:16 2019 +0000
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @summary This test will timeout if the ALPN CF is not completed
+ *          when a 'Connection reset by peer' exception is raised
+ *          during the handshake.
+ * @bug 8217094
+ * @library /test/lib http2/server
+ * @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters DigestEchoServer
+ *        ALPNFailureTest ALPNProxyFailureTest
+ * @modules java.net.http/jdk.internal.net.http.common
+ *          java.net.http/jdk.internal.net.http.frame
+ *          java.net.http/jdk.internal.net.http.hpack
+ *          java.logging
+ *          java.base/sun.net.www.http
+ *          java.base/sun.net.www
+ *          java.base/sun.net
+ * @build ALPNFailureTest
+ * @run main/othervm -Djdk.internal.httpclient.debug=true -Dtest.nolinger=true ALPNProxyFailureTest HTTP_1_1
+ * @run main/othervm -Dtest.nolinger=true ALPNProxyFailureTest HTTP_2
+ */
+import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLContext;
+import jdk.test.lib.net.SimpleSSLContext;
+import java.net.InetAddress;
+import java.net.ProxySelector;
+import java.net.ServerSocket;
+import java.net.http.HttpClient;
+
+public class ALPNProxyFailureTest extends ALPNFailureTest {
+
+    static final SSLContext context;
+    static {
+        try {
+            context = new SimpleSSLContext().get();
+            SSLContext.setDefault(context);
+        } catch (Exception x) {
+            throw new ExceptionInInitializerError(x);
+        }
+    }
+
+    public static void main(String[] args) throws Exception{
+        if (args == null || args.length == 0) {
+            args = new String[] {HttpClient.Version.HTTP_1_1.name()};
+        }
+        ServerSocket socket = ServerSocketFactory.getDefault()
+                .createServerSocket(0, 10, InetAddress.getLoopbackAddress());
+
+        DigestEchoServer.TunnelingProxy proxy = DigestEchoServer.createHttpsProxyTunnel(
+                DigestEchoServer.HttpAuthSchemeType.NONE);
+        ProxySelector ps = ProxySelector.of(proxy.getProxyAddress());
+
+        try {
+            test(socket, context, ps, args);
+        } finally {
+            proxy.stop();
+        }
+    }
+
+}
--- a/test/jdk/java/net/httpclient/DigestEchoServer.java	Wed Jan 16 10:12:58 2019 -0800
+++ b/test/jdk/java/net/httpclient/DigestEchoServer.java	Wed Jan 16 19:09:16 2019 +0000
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -41,6 +41,7 @@
 import java.net.PasswordAuthentication;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.StandardSocketOptions;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -77,6 +78,8 @@
 
     public static final boolean DEBUG =
             Boolean.parseBoolean(System.getProperty("test.debug", "false"));
+    public static final boolean NO_LINGER =
+            Boolean.parseBoolean(System.getProperty("test.nolinger", "false"));
     public enum HttpAuthType {
         SERVER, PROXY, SERVER307, PROXY305
         /* add PROXY_AND_SERVER and SERVER_PROXY_NONE */
@@ -1603,6 +1606,11 @@
                     Socket toClose;
                     try {
                         toClose = clientConnection = ss.accept();
+                        if (NO_LINGER) {
+                            // can be useful to trigger "Connection reset by peer"
+                            // errors on the client side.
+                            clientConnection.setOption(StandardSocketOptions.SO_LINGER, 0);
+                        }
                     } catch (IOException io) {
                         if (DEBUG || !stopped) io.printStackTrace(System.out);
                         break;