http-client-branch: (WebSocket) cleanup & simplifications http-client-branch
authorprappo
Fri, 24 Nov 2017 17:51:51 +0300
branchhttp-client-branch
changeset 55867 1b8734a5c696
parent 55866 93e8ef0ff08e
child 55868 5899aa5e1837
http-client-branch: (WebSocket) cleanup & simplifications
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportSupplier.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java
test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java	Fri Nov 24 12:10:19 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java	Fri Nov 24 17:51:51 2017 +0300
@@ -197,11 +197,11 @@
     static final class Result {
 
         final String subprotocol;
-        final RawChannel channel;
+        final TransportSupplier transport;
 
-        private Result(String subprotocol, RawChannel channel) {
+        private Result(String subprotocol, TransportSupplier transport) {
             this.subprotocol = subprotocol;
-            this.channel = channel;
+            this.transport = transport;
         }
     }
 
@@ -260,7 +260,7 @@
         }
         String subprotocol = checkAndReturnSubprotocol(headers);
         RawChannel channel = ((RawChannel.Provider) response).rawChannel();
-        return new Result(subprotocol, channel);
+        return new Result(subprotocol, new TransportSupplier(channel));
     }
 
     private String checkAndReturnSubprotocol(HttpHeaders responseHeaders)
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java	Fri Nov 24 12:10:19 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java	Fri Nov 24 17:51:51 2017 +0300
@@ -50,7 +50,7 @@
  *
  * even if `request(long n)` is called from inside these invocations.
  */
-final class Receiver {
+public class Receiver {
 
     private final MessageStreamConsumer messageConsumer;
     private final RawChannel channel;
@@ -67,7 +67,7 @@
     private static final int AVAILABLE    = 1;
     private static final int WAITING      = 2;
 
-    Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) {
+    public Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) {
         this.messageConsumer = messageConsumer;
         this.channel = channel;
         this.frameConsumer = new FrameConsumer(this.messageConsumer);
@@ -94,7 +94,7 @@
         };
     }
 
-    void request(long n) {
+    public void request(long n) {
         if (n <= 0L) {
             throw new IllegalArgumentException("Non-positive request: " + n);
         }
@@ -113,8 +113,9 @@
      * Stops the machinery from reading and delivering messages permanently,
      * regardless of the current demand and data availability.
      */
-    void close() {
+    public void close() throws IOException {
         pushScheduler.stop();
+        channel.shutdownInput();
     }
 
     private class PushContinuouslyTask
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java	Fri Nov 24 12:10:19 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java	Fri Nov 24 17:51:51 2017 +0300
@@ -70,6 +70,10 @@
         send0(message, completionHandler);
     }
 
+    public void close() throws IOException {
+        channel.shutdownOutput();
+    }
+
     private RawChannel.RawEvent createHandler() {
         return new RawChannel.RawEvent() {
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportSupplier.java	Fri Nov 24 17:51:51 2017 +0300
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import java.io.IOException;
+
+/*
+ * Abstracts out I/O channel for the WebSocket implementation. The latter then
+ * deals with input and output streams of messages and does not have to
+ * understand the state machine of channels (e.g. how exactly they are closed).
+ * Mocking this type will allow testing WebSocket message exchange in isolation.
+ */
+public class TransportSupplier {
+
+    private final RawChannel channel;
+    private final Object lock = new Object();
+    private Transmitter transmitter;
+    private Receiver receiver;
+    private boolean receiverShutdown;
+    private boolean transmitterShutdown;
+    private boolean closed;
+
+    public TransportSupplier(RawChannel channel) {
+        this.channel = channel;
+    }
+
+    public Receiver receiver(MessageStreamConsumer consumer) {
+        synchronized (lock) {
+            if (receiver == null) {
+                receiver = newReceiver(consumer);
+            }
+            return receiver;
+        }
+    }
+
+    public Transmitter transmitter() {
+        synchronized (lock) {
+            if (transmitter == null) {
+                transmitter = newTransmitter();
+            }
+            return transmitter;
+        }
+    }
+
+    protected Receiver newReceiver(MessageStreamConsumer consumer) {
+        return new Receiver(consumer, channel) {
+            @Override
+            public void close() throws IOException {
+                synchronized (lock) {
+                    if (!closed) {
+                        try {
+                            super.close();
+                        } finally {
+                            receiverShutdown = true;
+                            if (transmitterShutdown) {
+                                closed = true;
+                                channel.close();
+                            }
+                        }
+                    }
+                }
+            }
+        };
+    }
+
+    protected Transmitter newTransmitter() {
+        return new Transmitter(channel) {
+            @Override
+            public void close() throws IOException {
+                synchronized (lock) {
+                    if (!closed) {
+                        try {
+                            super.close();
+                        } finally {
+                            transmitterShutdown = true;
+                            if (receiverShutdown) {
+                                closed = true;
+                                channel.close();
+                            }
+                        }
+                    }
+                }
+            }
+        };
+    }
+}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Fri Nov 24 12:10:19 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java	Fri Nov 24 17:51:51 2017 +0300
@@ -25,6 +25,21 @@
 
 package jdk.incubator.http.internal.websocket;
 
+import jdk.incubator.http.WebSocket;
+import jdk.incubator.http.internal.common.Log;
+import jdk.incubator.http.internal.common.MinimalFuture;
+import jdk.incubator.http.internal.common.Pair;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
+import jdk.incubator.http.internal.common.Utils;
+import jdk.incubator.http.internal.websocket.OpeningHandshake.Result;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Close;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Context;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
+import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
+
 import java.io.IOException;
 import java.lang.ref.Reference;
 import java.net.ProtocolException;
@@ -40,21 +55,6 @@
 import java.util.function.Consumer;
 import java.util.function.Function;
 
-import jdk.incubator.http.WebSocket;
-import jdk.incubator.http.internal.common.Log;
-import jdk.incubator.http.internal.common.MinimalFuture;
-import jdk.incubator.http.internal.common.Pair;
-import jdk.incubator.http.internal.common.SequentialScheduler;
-import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
-import jdk.incubator.http.internal.common.Utils;
-import jdk.incubator.http.internal.websocket.OpeningHandshake.Result;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Close;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Context;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
-
 import static java.util.Objects.requireNonNull;
 import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
 import static jdk.incubator.http.internal.common.Pair.pair;
@@ -69,7 +69,6 @@
 
     private final URI uri;
     private final String subprotocol;
-    private final RawChannel channel; /* Stored to call close() on */
     private final Listener listener;
 
     private volatile boolean inputClosed;
@@ -90,17 +89,6 @@
     private final Receiver receiver;
 
     /*
-     * Whether or not the WebSocket has been closed. When a WebSocket has been
-     * closed it means that no further messages can be sent or received.
-     * A closure can be triggered by:
-     *
-     *   1. abort()
-     *   2. "Failing the WebSocket Connection" (i.e. a fatal error)
-     *   3. Completion of the Closing handshake
-     */
-    private final AtomicBoolean closed = new AtomicBoolean();
-
-    /*
      * This lock is enforcing sequential ordering of invocations to listener's
      * methods. It is supposed to be uncontended. The only contention that can
      * happen is when onOpen, an asynchronous onError (not related to reading
@@ -110,15 +98,12 @@
      */
     private final Object lock = new Object();
 
-    private final CompletableFuture<?> channelInputClosed = new MinimalFuture<>();
-    private final CompletableFuture<?> channelOutputClosed = new MinimalFuture<>();
-
     public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
         Function<Result, WebSocket> newWebSocket = r -> {
             WebSocketImpl ws = new WebSocketImpl(b.getUri(),
                                                  r.subprotocol,
-                                                 r.channel,
-                                                 b.getListener());
+                                                 b.getListener(),
+                                                 r.transport);
             // The order of calls might cause a subtle effects, like CF will be
             // returned from the buildAsync _after_ onOpen has been signalled.
             // This means if onOpen is lengthy, it might cause some problems.
@@ -141,42 +126,15 @@
 
     WebSocketImpl(URI uri,
                   String subprotocol,
-                  RawChannel channel,
-                  Listener listener)
-    {
-        this(uri,
-             subprotocol,
-             channel,
-             listener,
-             new Transmitter(channel));
-    }
-
-    /* Exposed for testing purposes */
-    WebSocketImpl(URI uri,
-                  String subprotocol,
-                  RawChannel channel,
                   Listener listener,
-                  Transmitter transmitter)
+                  TransportSupplier transport)
     {
         this.uri = requireNonNull(uri);
         this.subprotocol = requireNonNull(subprotocol);
-        this.channel = requireNonNull(channel);
         this.listener = requireNonNull(listener);
-        this.transmitter = transmitter;
-        this.receiver = new Receiver(messageConsumerOf(listener), channel);
+        this.transmitter = transport.transmitter();
+        this.receiver = transport.receiver(messageConsumerOf(listener));
         this.sendScheduler = new SequentialScheduler(new SendTask());
-
-        // Set up automatic channel closing action
-        CompletableFuture.allOf(channelInputClosed, channelOutputClosed)
-                .whenComplete((result, error) -> {
-                    try {
-                        channel.close();
-                    } catch (IOException e) {
-                        Log.logError(e);
-                    } finally {
-                        closed.set(true);
-                    }
-                });
     }
 
     /*
@@ -201,7 +159,15 @@
                 Log.logError(error);
             } else {
                 lastMethodInvoked = true;
-                receiver.close();
+                try {
+                    try {
+                        receiver.close();
+                    } finally {
+                        transmitter.close();
+                    }
+                } catch (IOException e) {
+                    Log.logError(e);
+                }
                 try {
                     listener.onError(this, error);
                 } catch (Exception e) {
@@ -212,23 +178,16 @@
     }
 
     /*
-     * Processes a Close event that came from the channel. Invoked at most once.
+     * Processes a Close event that came from the receiver. Invoked at most
+     * once. No further messages are pulled from the receiver.
      */
     private void processClose(int statusCode, String reason) {
         inputClosed = true;
-        receiver.close();
         try {
-            channel.shutdownInput();
+            receiver.close();
         } catch (IOException e) {
             Log.logError(e);
         }
-        boolean alreadyCompleted = !channelInputClosed.complete(null);
-        if (alreadyCompleted) {
-            // This CF is supposed to be completed only once, the first time a
-            // Close message is received. No further messages are pulled from
-            // the socket.
-            throw new InternalError();
-        }
         int code;
         if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
             code = NORMAL_CLOSURE;
@@ -259,7 +218,11 @@
                 Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
             } else {
                 lastMethodInvoked = true;
-                receiver.close();
+                try {
+                    receiver.close();
+                } catch (IOException e) {
+                    Log.logError(e);
+                }
                 try {
                     return listener.onClose(this, statusCode, reason);
                 } catch (Exception e) {
@@ -316,22 +279,22 @@
      * no more messages are expected to be sent after this.
      */
     private CompletableFuture<WebSocket> enqueueClose(Close m) {
+        // MUST be a CF created once and shared across sendClose, otherwise
+        // a second sendClose may prematurely close the channel
         return enqueue(m)
                 .orTimeout(60, TimeUnit.SECONDS)
                 .whenComplete((r, error) -> {
+                    try {
+                        transmitter.close();
+                    } catch (IOException e) {
+                        Log.logError(e);
+                    }
                     if (error instanceof TimeoutException) {
                         try {
-                            channel.close();
+                            receiver.close();
                         } catch (IOException e) {
                             Log.logError(e);
                         }
-                    } else {
-                        try {
-                            channel.shutdownOutput();
-                        } catch (IOException e) {
-                            Log.logError(e);
-                        }
-                        channelOutputClosed.complete(null);
                     }
                 });
     }
@@ -432,19 +395,20 @@
         inputClosed = true;
         outputClosed = true;
         try {
-            channel.close();
-        } catch (IOException ignored) {
-        } finally {
-            closed.set(true);
-            signalClose(CLOSED_ABNORMALLY, "");
-        }
+            try {
+                receiver.close();
+            } finally {
+                transmitter.close();
+            }
+        } catch (IOException ignored) { }
     }
 
     @Override
     public String toString() {
         return super.toString()
-                + "[" + (closed.get() ? "CLOSED" : "OPEN") + "]: " + uri
-                + (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "");
+                + "[uri=" + uri
+                + (!subprotocol.isEmpty() ? ", subprotocol=" + subprotocol : "")
+                + "]";
     }
 
     private MessageStreamConsumer messageConsumerOf(Listener listener) {
@@ -483,7 +447,7 @@
                 receiver.acknowledge();
                 // Let's make a full copy of this tiny data. What we want here
                 // is to rule out a possibility the shared data we send might be
-                // corrupted the by processing in the listener.
+                // corrupted by processing in the listener.
                 ByteBuffer slice = data.slice();
                 ByteBuffer copy = ByteBuffer.allocate(data.remaining())
                         .put(data)
@@ -535,18 +499,11 @@
                 } else {
                     Exception ex = (Exception) new ProtocolException().initCause(error);
                     int code = ((FailWebSocketException) error).getStatusCode();
-                    enqueueClose(new Close(code, ""))
+                    enqueueClose(new Close(code, "")) // do we have to wait for 60 secs? nah...
                             .whenComplete((r, e) -> {
                                 if (e != null) {
                                     ex.addSuppressed(Utils.getCompletionCause(e));
                                 }
-                                try {
-                                    channel.close();
-                                } catch (IOException e1) {
-                                    ex.addSuppressed(e1);
-                                } finally {
-                                    closed.set(true);
-                                }
                                 signalError(ex);
                             });
                 }
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java	Fri Nov 24 12:10:19 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java	Fri Nov 24 17:51:51 2017 +0300
@@ -26,6 +26,7 @@
 import jdk.incubator.http.WebSocket;
 import org.testng.annotations.Test;
 
+import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.Queue;
@@ -186,11 +187,17 @@
             public void close() {
             }
         };
+        TransportSupplier transport = new TransportSupplier(channel) {
+            @Override
+            public Transmitter transmitter() {
+                return transmitter;
+            }
+        };
         return new WebSocketImpl(
                 uri,
                 subprotocol,
-                channel,
-                new WebSocket.Listener() { }, transmitter);
+                new WebSocket.Listener() { },
+                transport);
     }
 
     private abstract class MockTransmitter extends Transmitter {
@@ -225,6 +232,9 @@
                               message);
         }
 
+        @Override
+        public void close() { }
+
         protected abstract CompletionStage<?> whenSent();
 
         public Queue<OutgoingMessage> queue() {