--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java Thu Dec 14 18:41:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/OpeningHandshake.java Fri Dec 15 00:47:16 2017 +0300
@@ -197,9 +197,9 @@
static final class Result {
final String subprotocol;
- final TransportSupplier transport;
+ final TransportFactory transport;
- private Result(String subprotocol, TransportSupplier transport) {
+ private Result(String subprotocol, TransportFactory transport) {
this.subprotocol = subprotocol;
this.transport = transport;
}
@@ -263,7 +263,7 @@
}
String subprotocol = checkAndReturnSubprotocol(headers);
RawChannel channel = ((RawChannel.Provider) response).rawChannel();
- return new Result(subprotocol, new TransportSupplier(channel));
+ return new Result(subprotocol, new TransportFactoryImpl(channel));
}
private String checkAndReturnSubprotocol(HttpHeaders responseHeaders)
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Receiver.java Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,188 +0,0 @@
-/*
- * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.websocket;
-
-import jdk.incubator.http.internal.common.Demand;
-import jdk.incubator.http.internal.common.SequentialScheduler;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-
-/*
- * Receives incoming data from the channel on demand and converts it into a
- * stream of WebSocket messages which are then delivered to the supplied message
- * consumer in a strict sequential order and non-recursively. In other words,
- *
- * onText()
- * onText()
- * onBinary()
- * ...
- *
- * instead of
- *
- * onText()
- * onText()
- * onBinary()
- * ...
- *
- * even if `request(long n)` is called from inside these invocations.
- */
-public class Receiver {
-
- private final MessageStreamConsumer messageConsumer;
- private final RawChannel channel;
- private final FrameConsumer frameConsumer;
- private final Frame.Reader reader = new Frame.Reader();
- private final RawChannel.RawEvent event = createHandler();
- protected final Demand demand = new Demand(); /* Exposed for testing purposes */
- private final SequentialScheduler pushScheduler;
-
- private ByteBuffer data;
- private volatile int state;
-
- private static final int UNREGISTERED = 0;
- private static final int AVAILABLE = 1;
- private static final int WAITING = 2;
-
- public Receiver(MessageStreamConsumer messageConsumer, RawChannel channel) {
- this.messageConsumer = messageConsumer;
- this.channel = channel;
- this.frameConsumer = new FrameConsumer(this.messageConsumer);
- this.data = channel.initialByteBuffer();
- // To ensure the initial non-final `data` will be visible
- // (happens-before) when `handler` invokes `pushContinuously`
- // the following assignment is done last:
- pushScheduler = createScheduler();
- }
-
- /* Exposed for testing purposes */
- protected SequentialScheduler createScheduler() {
- return new SequentialScheduler(new PushContinuouslyTask());
- }
-
- private RawChannel.RawEvent createHandler() {
- return new RawChannel.RawEvent() {
-
- @Override
- public int interestOps() {
- return SelectionKey.OP_READ;
- }
-
- @Override
- public void handle() {
- state = AVAILABLE;
- pushScheduler.runOrSchedule();
- }
- };
- }
-
- public void request(long n) {
- if (demand.increase(n)) {
- pushScheduler.runOrSchedule();
- }
- }
-
- /*
- * Why is this method needed? Since Receiver operates through callbacks
- * this method allows to abstract out what constitutes as a message being
- * received (i.e. to decide outside this type when exactly one should
- * decrement the demand).
- */
- void acknowledge() {
- long x = demand.decreaseAndGet(1);
- if (x < 0) {
- throw new InternalError(String.valueOf(x));
- }
- }
-
- /*
- * Stops the machinery from reading and delivering messages permanently,
- * regardless of the current demand and data availability.
- */
- public void close() throws IOException {
- pushScheduler.stop();
- channel.shutdownInput();
- }
-
- private class PushContinuouslyTask
- extends SequentialScheduler.CompleteRestartableTask
- {
- @Override
- public void run() {
- while (!pushScheduler.isStopped()) {
- if (data.hasRemaining()) {
- if (!demand.isFulfilled()) {
- try {
- int oldPos = data.position();
- reader.readFrame(data, frameConsumer);
- int newPos = data.position();
- assert oldPos != newPos : data; // reader always consumes bytes
- } catch (Throwable e) {
- pushScheduler.stop();
- messageConsumer.onError(e);
- }
- continue;
- }
- break;
- }
- switch (state) {
- case WAITING:
- return;
- case UNREGISTERED:
- try {
- state = WAITING;
- channel.registerEvent(event);
- } catch (Throwable e) {
- pushScheduler.stop();
- messageConsumer.onError(e);
- }
- return;
- case AVAILABLE:
- try {
- data = channel.read();
- } catch (Throwable e) {
- pushScheduler.stop();
- messageConsumer.onError(e);
- return;
- }
- if (data == null) { // EOF
- pushScheduler.stop();
- messageConsumer.onComplete();
- return;
- } else if (!data.hasRemaining()) { // No data at the moment
- // Pretty much a "goto", reusing the existing code path
- // for registration
- state = UNREGISTERED;
- }
- continue;
- default:
- throw new InternalError(String.valueOf(state));
- }
- }
- }
- }
-}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transmitter.java Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,124 +0,0 @@
-/*
- * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.websocket;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
-
-import static java.util.Objects.requireNonNull;
-
-/*
- * Sends messages one at a time, in an asynchronous and non-blocking fashion.
- *
- * No matter whether the message has been fully sent or an error has occurred,
- * the transmitter reports the outcome to the supplied handler and becomes ready
- * to accept a new message. Until then, the transmitter is considered "busy" and
- * an IllegalStateException will be thrown on each attempt to invoke send.
- */
-public class Transmitter {
-
- /* This flag is used solely for assertions */
- private final AtomicBoolean busy = new AtomicBoolean();
- private OutgoingMessage message;
- private Consumer<Exception> completionHandler;
- private final RawChannel channel;
- private final RawChannel.RawEvent event;
-
- public Transmitter(RawChannel channel) {
- this.channel = channel;
- this.event = createHandler();
- }
-
- /**
- * The supplied handler may be invoked in the calling thread.
- * A {@code StackOverflowError} may thus occur if there's a possibility
- * that this method is called again by the supplied handler.
- */
- public void send(OutgoingMessage message,
- Consumer<Exception> completionHandler)
- {
- requireNonNull(message);
- requireNonNull(completionHandler);
- if (!busy.compareAndSet(false, true)) {
- throw new IllegalStateException();
- }
- send0(message, completionHandler);
- }
-
- public void close() throws IOException {
- channel.shutdownOutput();
- }
-
- private RawChannel.RawEvent createHandler() {
- return new RawChannel.RawEvent() {
-
- @Override
- public int interestOps() {
- return SelectionKey.OP_WRITE;
- }
-
- @Override
- public void handle() {
- // registerEvent(e) happens-before subsequent e.handle(), so
- // we're fine reading the stored message and the completionHandler
- send0(message, completionHandler);
- }
- };
- }
-
- private void send0(OutgoingMessage message, Consumer<Exception> handler) {
- boolean b = busy.get();
- assert b; // Please don't inline this, as busy.get() has memory
- // visibility effects and we don't want the program behaviour
- // to depend on whether the assertions are turned on
- // or turned off
- try {
- boolean sent = message.sendTo(channel);
- if (sent) {
- busy.set(false);
- handler.accept(null);
- } else {
- // The message has not been fully sent, the transmitter needs to
- // remember the message until it can continue with sending it
- this.message = message;
- this.completionHandler = handler;
- try {
- channel.registerEvent(event);
- } catch (IOException e) {
- this.message = null;
- this.completionHandler = null;
- busy.set(false);
- handler.accept(e);
- }
- }
- } catch (IOException e) {
- busy.set(false);
- handler.accept(e);
- }
- }
-}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transport.java Fri Dec 15 00:47:16 2017 +0300
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+/*
+ * The only parametrization of Transport<T> used is Transport<WebSocket>. The
+ * type parameter T was introduced solely to avoid circular dependency between
+ * Transport and WebSocket. After all, instances of T are used solely to
+ * complete CompletableFutures. Transport doesn't care about the exact type of
+ * T.
+ *
+ * This way the Transport is fully in charge of creating CompletableFutures.
+ * On the one hand, Transport may use it to cache/reuse CompletableFutures. On
+ * the other hand, the class that uses Transport, may benefit by not converting
+ * from CompletableFuture<K> returned from Transport, to CompletableFuture<V>
+ * needed by the said class.
+ */
+public interface Transport<T> {
+
+ CompletableFuture<T> sendText(CharSequence message, boolean isLast);
+
+ CompletableFuture<T> sendBinary(ByteBuffer message, boolean isLast);
+
+ CompletableFuture<T> sendPing(ByteBuffer message);
+
+ CompletableFuture<T> sendPong(ByteBuffer message);
+
+ CompletableFuture<T> sendClose(int statusCode, String reason);
+
+ void request(long n);
+
+ /*
+ * Why is this method needed? Since Receiver operates through callbacks
+ * this method allows to abstract out what constitutes as a message being
+ * received (i.e. to decide outside this type when exactly one should
+ * decrement the demand).
+ */
+ void acknowledgeReception();
+
+ void closeOutput() throws IOException;
+
+ void closeInput() throws IOException;
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportFactory.java Fri Dec 15 00:47:16 2017 +0300
@@ -0,0 +1,10 @@
+package jdk.incubator.http.internal.websocket;
+
+import java.util.function.Supplier;
+
+@FunctionalInterface
+public interface TransportFactory {
+
+ <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+ MessageStreamConsumer consumer);
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportFactoryImpl.java Fri Dec 15 00:47:16 2017 +0300
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package jdk.incubator.http.internal.websocket;
+
+import java.util.function.Supplier;
+
+public class TransportFactoryImpl implements TransportFactory {
+
+ private final RawChannel channel;
+
+ public TransportFactoryImpl(RawChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+ MessageStreamConsumer consumer) {
+ return new TransportImpl<T>(sendResultSupplier, consumer, channel);
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportImpl.java Fri Dec 15 00:47:16 2017 +0300
@@ -0,0 +1,358 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import jdk.incubator.http.internal.common.Demand;
+import jdk.incubator.http.internal.common.MinimalFuture;
+import jdk.incubator.http.internal.common.Pair;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+import static jdk.incubator.http.internal.common.Pair.pair;
+
+public class TransportImpl<T> implements Transport<T> {
+
+ /* This flag is used solely for assertions */
+ private final AtomicBoolean busy = new AtomicBoolean();
+ private OutgoingMessage message;
+ private Consumer<Exception> completionHandler;
+ private final RawChannel channel;
+ private final RawChannel.RawEvent writeEvent = createWriteEvent();
+ private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask());
+ private final Queue<Pair<OutgoingMessage, CompletableFuture<T>>>
+ queue = new ConcurrentLinkedQueue<>();
+ private final OutgoingMessage.Context context = new OutgoingMessage.Context();
+ private final Supplier<T> resultSupplier;
+
+ private final MessageStreamConsumer messageConsumer;
+ private final FrameConsumer frameConsumer;
+ private final Frame.Reader reader = new Frame.Reader();
+ private final RawChannel.RawEvent readEvent = createReadEvent();
+ private final Demand demand = new Demand();
+ private final SequentialScheduler receiveScheduler;
+
+ private ByteBuffer data;
+ private volatile int state;
+
+ private static final int UNREGISTERED = 0;
+ private static final int AVAILABLE = 1;
+ private static final int WAITING = 2;
+
+ private final Object lock = new Object();
+ private boolean inputClosed;
+ private boolean outputClosed;
+
+ public TransportImpl(Supplier<T> sendResultSupplier,
+ MessageStreamConsumer consumer,
+ RawChannel channel) {
+ this.resultSupplier = sendResultSupplier;
+ this.messageConsumer = consumer;
+ this.channel = channel;
+ this.frameConsumer = new FrameConsumer(this.messageConsumer);
+ this.data = channel.initialByteBuffer();
+ // To ensure the initial non-final `data` will be visible
+ // (happens-before) when `readEvent.handle()` invokes `receiveScheduler`
+ // the following assignment is done last:
+ receiveScheduler = new SequentialScheduler(new ReceiveTask());
+ }
+
+ /**
+ * The supplied handler may be invoked in the calling thread.
+ * A {@code StackOverflowError} may thus occur if there's a possibility
+ * that this method is called again by the supplied handler.
+ */
+ public void send(OutgoingMessage message,
+ Consumer<Exception> completionHandler) {
+ requireNonNull(message);
+ requireNonNull(completionHandler);
+ if (!busy.compareAndSet(false, true)) {
+ throw new IllegalStateException();
+ }
+ send0(message, completionHandler);
+ }
+
+ private RawChannel.RawEvent createWriteEvent() {
+ return new RawChannel.RawEvent() {
+
+ @Override
+ public int interestOps() {
+ return SelectionKey.OP_WRITE;
+ }
+
+ @Override
+ public void handle() {
+ // registerEvent(e) happens-before subsequent e.handle(), so
+ // we're fine reading the stored message and the completionHandler
+ send0(message, completionHandler);
+ }
+ };
+ }
+
+ private void send0(OutgoingMessage message, Consumer<Exception> handler) {
+ boolean b = busy.get();
+ assert b; // Please don't inline this, as busy.get() has memory
+ // visibility effects and we don't want the program behaviour
+ // to depend on whether the assertions are turned on
+ // or turned off
+ try {
+ boolean sent = message.sendTo(channel);
+ if (sent) {
+ busy.set(false);
+ handler.accept(null);
+ } else {
+ // The message has not been fully sent, the transmitter needs to
+ // remember the message until it can continue with sending it
+ this.message = message;
+ this.completionHandler = handler;
+ try {
+ channel.registerEvent(writeEvent);
+ } catch (IOException e) {
+ this.message = null;
+ this.completionHandler = null;
+ busy.set(false);
+ handler.accept(e);
+ }
+ }
+ } catch (IOException e) {
+ busy.set(false);
+ handler.accept(e);
+ }
+ }
+
+ public CompletableFuture<T> sendText(CharSequence message,
+ boolean isLast) {
+ return enqueue(new OutgoingMessage.Text(message, isLast));
+ }
+
+ public CompletableFuture<T> sendBinary(ByteBuffer message,
+ boolean isLast) {
+ return enqueue(new OutgoingMessage.Binary(message, isLast));
+ }
+
+ public CompletableFuture<T> sendPing(ByteBuffer message) {
+ return enqueue(new OutgoingMessage.Ping(message));
+ }
+
+ public CompletableFuture<T> sendPong(ByteBuffer message) {
+ return enqueue(new OutgoingMessage.Pong(message));
+ }
+
+ public CompletableFuture<T> sendClose(int statusCode, String reason) {
+ return enqueue(new OutgoingMessage.Close(statusCode, reason));
+ }
+
+ private CompletableFuture<T> enqueue(OutgoingMessage m) {
+ CompletableFuture<T> cf = new MinimalFuture<>();
+ boolean added = queue.add(pair(m, cf));
+ if (!added) {
+ // The queue is supposed to be unbounded
+ throw new InternalError();
+ }
+ sendScheduler.runOrSchedule();
+ return cf;
+ }
+
+ /*
+ * This is a message sending task. It pulls messages from the queue one by
+ * one and sends them. It may be run in different threads, but never
+ * concurrently.
+ */
+ private class SendTask implements SequentialScheduler.RestartableTask {
+
+ @Override
+ public void run(SequentialScheduler.DeferredCompleter taskCompleter) {
+ Pair<OutgoingMessage, CompletableFuture<T>> p = queue.poll();
+ if (p == null) {
+ taskCompleter.complete();
+ return;
+ }
+ OutgoingMessage message = p.first;
+ CompletableFuture<T> cf = p.second;
+ try {
+ if (!message.contextualize(context)) { // Do not send the message
+ cf.complete(null);
+ repeat(taskCompleter);
+ return;
+ }
+ Consumer<Exception> h = e -> {
+ if (e == null) {
+ cf.complete(resultSupplier.get());
+ } else {
+ cf.completeExceptionally(e);
+ }
+ repeat(taskCompleter);
+ };
+ send(message, h);
+ } catch (Throwable t) {
+ cf.completeExceptionally(t);
+ repeat(taskCompleter);
+ }
+ }
+
+ private void repeat(SequentialScheduler.DeferredCompleter taskCompleter) {
+ taskCompleter.complete();
+ // More than a single message may have been enqueued while
+ // the task has been busy with the current message, but
+ // there is only a single signal recorded
+ sendScheduler.runOrSchedule();
+ }
+ }
+
+ private RawChannel.RawEvent createReadEvent() {
+ return new RawChannel.RawEvent() {
+
+ @Override
+ public int interestOps() {
+ return SelectionKey.OP_READ;
+ }
+
+ @Override
+ public void handle() {
+ state = AVAILABLE;
+ receiveScheduler.runOrSchedule();
+ }
+ };
+ }
+
+ @Override
+ public void request(long n) {
+ if (demand.increase(n)) {
+ receiveScheduler.runOrSchedule();
+ }
+ }
+
+ @Override
+ public void acknowledgeReception() {
+ long x = demand.decreaseAndGet(1);
+ if (x < 0) {
+ throw new InternalError(String.valueOf(x));
+ }
+ }
+
+ private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask {
+
+ @Override
+ public void run() {
+ while (!receiveScheduler.isStopped()) {
+ if (data.hasRemaining()) {
+ if (!demand.isFulfilled()) {
+ try {
+ int oldPos = data.position();
+ reader.readFrame(data, frameConsumer);
+ int newPos = data.position();
+ assert oldPos != newPos : data; // reader always consumes bytes
+ } catch (Throwable e) {
+ receiveScheduler.stop();
+ messageConsumer.onError(e);
+ }
+ continue;
+ }
+ break;
+ }
+ switch (state) {
+ case WAITING:
+ return;
+ case UNREGISTERED:
+ try {
+ state = WAITING;
+ channel.registerEvent(readEvent);
+ } catch (Throwable e) {
+ receiveScheduler.stop();
+ messageConsumer.onError(e);
+ }
+ return;
+ case AVAILABLE:
+ try {
+ data = channel.read();
+ } catch (Throwable e) {
+ receiveScheduler.stop();
+ messageConsumer.onError(e);
+ return;
+ }
+ if (data == null) { // EOF
+ receiveScheduler.stop();
+ messageConsumer.onComplete();
+ return;
+ } else if (!data.hasRemaining()) {
+ // No data at the moment Pretty much a "goto",
+ // reusing the existing code path for registration
+ state = UNREGISTERED;
+ }
+ continue;
+ default:
+ throw new InternalError(String.valueOf(state));
+ }
+ }
+ }
+ }
+
+ /*
+ * Stops the machinery from reading and delivering messages permanently,
+ * regardless of the current demand and data availability.
+ */
+ @Override
+ public void closeInput() throws IOException {
+ synchronized (lock) {
+ if (!inputClosed) {
+ inputClosed = true;
+ try {
+ receiveScheduler.stop();
+ channel.shutdownInput();
+ } finally {
+ if (outputClosed) {
+ channel.close();
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void closeOutput() throws IOException {
+ synchronized (lock) {
+ if (!outputClosed) {
+ outputClosed = true;
+ try {
+ channel.shutdownOutput();
+ } finally {
+ if (inputClosed) {
+ channel.close();
+ }
+ }
+ }
+ }
+ }
+}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportSupplier.java Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,109 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.websocket;
-
-import java.io.IOException;
-
-/*
- * Abstracts out I/O channel for the WebSocket implementation. The latter then
- * deals with input and output streams of messages and does not have to
- * understand the state machine of channels (e.g. how exactly they are closed).
- * Mocking this type will allow testing WebSocket message exchange in isolation.
- */
-public class TransportSupplier {
-
- protected final RawChannel channel; /* Exposed for testing purposes */
- private final Object lock = new Object();
- private Transmitter transmitter;
- private Receiver receiver;
- private boolean receiverShutdown;
- private boolean transmitterShutdown;
- private boolean closed;
-
- public TransportSupplier(RawChannel channel) {
- this.channel = channel;
- }
-
- public Receiver receiver(MessageStreamConsumer consumer) {
- synchronized (lock) {
- if (receiver == null) {
- receiver = newReceiver(consumer);
- }
- return receiver;
- }
- }
-
- public Transmitter transmitter() {
- synchronized (lock) {
- if (transmitter == null) {
- transmitter = newTransmitter();
- }
- return transmitter;
- }
- }
-
- protected Receiver newReceiver(MessageStreamConsumer consumer) {
- return new Receiver(consumer, channel) {
- @Override
- public void close() throws IOException {
- synchronized (lock) {
- if (!closed) {
- try {
- super.close();
- } finally {
- receiverShutdown = true;
- if (transmitterShutdown) {
- closed = true;
- channel.close();
- }
- }
- }
- }
- }
- };
- }
-
- protected Transmitter newTransmitter() {
- return new Transmitter(channel) {
- @Override
- public void close() throws IOException {
- synchronized (lock) {
- if (!closed) {
- try {
- super.close();
- } finally {
- transmitterShutdown = true;
- if (receiverShutdown) {
- closed = true;
- channel.close();
- }
- }
- }
- }
- }
- };
- }
-}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Thu Dec 14 18:41:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Fri Dec 15 00:47:16 2017 +0300
@@ -29,37 +29,25 @@
import jdk.incubator.http.internal.common.Demand;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.MinimalFuture;
-import jdk.incubator.http.internal.common.Pair;
import jdk.incubator.http.internal.common.SequentialScheduler;
-import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
import jdk.incubator.http.internal.common.Utils;
import jdk.incubator.http.internal.websocket.OpeningHandshake.Result;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Binary;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Close;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Context;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Ping;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Pong;
-import jdk.incubator.http.internal.websocket.OutgoingMessage.Text;
import java.io.IOException;
import java.lang.ref.Reference;
import java.net.ProtocolException;
import java.net.URI;
import java.nio.ByteBuffer;
-import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
import java.util.function.Function;
import static java.util.Objects.requireNonNull;
import static jdk.incubator.http.internal.common.MinimalFuture.failedFuture;
-import static jdk.incubator.http.internal.common.Pair.pair;
import static jdk.incubator.http.internal.websocket.StatusCodes.CLOSED_ABNORMALLY;
import static jdk.incubator.http.internal.websocket.StatusCodes.NO_STATUS_CODE;
import static jdk.incubator.http.internal.websocket.StatusCodes.isLegalToSendFromClient;
@@ -108,12 +96,7 @@
private final Listener listener;
private final AtomicBoolean outstandingSend = new AtomicBoolean();
- private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask());
- private final Queue<Pair<OutgoingMessage, CompletableFuture<WebSocket>>>
- queue = new ConcurrentLinkedQueue<>();
- private final Context context = new OutgoingMessage.Context();
- private final Transmitter transmitter;
- private final Receiver receiver;
+ private final Transport<WebSocket> transport;
private final SequentialScheduler receiveScheduler = new SequentialScheduler(new ReceiveTask());
private final Demand demand = new Demand();
@@ -140,10 +123,10 @@
}
/* Exposed for testing purposes */
- static WebSocket newInstance(URI uri,
- String subprotocol,
- Listener listener,
- TransportSupplier transport) {
+ static WebSocketImpl newInstance(URI uri,
+ String subprotocol,
+ Listener listener,
+ TransportFactory transport) {
WebSocketImpl ws = new WebSocketImpl(uri, subprotocol, listener, transport);
// This initialisation is outside of the constructor for the sake of
// safe publication of WebSocketImpl.this
@@ -154,68 +137,82 @@
private WebSocketImpl(URI uri,
String subprotocol,
Listener listener,
- TransportSupplier transport) {
+ TransportFactory transportFactory) {
this.uri = requireNonNull(uri);
this.subprotocol = requireNonNull(subprotocol);
this.listener = requireNonNull(listener);
- this.transmitter = transport.transmitter();
- this.receiver = transport.receiver(new SignallingMessageConsumer());
+ this.transport = transportFactory.createTransport(
+ () -> WebSocketImpl.this, // What about escape of WebSocketImpl.this?
+ new SignallingMessageConsumer());
}
@Override
- public CompletableFuture<WebSocket> sendText(CharSequence message, boolean isLast) {
- return enqueueExclusively(new Text(message, isLast));
+ public CompletableFuture<WebSocket> sendText(CharSequence message,
+ boolean isLast) {
+ if (!outstandingSend.compareAndSet(false, true)) {
+ return failedFuture(new IllegalStateException("Send pending"));
+ }
+ CompletableFuture<WebSocket> cf = transport.sendText(message, isLast);
+ cf.whenComplete((r, e) -> outstandingSend.set(false));
+ return cf;
}
@Override
- public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, boolean isLast) {
- return enqueueExclusively(new Binary(message, isLast));
+ public CompletableFuture<WebSocket> sendBinary(ByteBuffer message,
+ boolean isLast) {
+ if (!outstandingSend.compareAndSet(false, true)) {
+ return failedFuture(new IllegalStateException("Send pending"));
+ }
+ CompletableFuture<WebSocket> cf = transport.sendBinary(message, isLast);
+ // Optimize?
+ // if (cf.isDone()) {
+ // outstandingSend.set(false);
+ // } else {
+ // cf.whenComplete((r, e) -> outstandingSend.set(false));
+ // }
+ cf.whenComplete((r, e) -> outstandingSend.set(false));
+ return cf;
}
@Override
public CompletableFuture<WebSocket> sendPing(ByteBuffer message) {
- return enqueue(new Ping(message));
+ return transport.sendPing(message);
}
@Override
public CompletableFuture<WebSocket> sendPong(ByteBuffer message) {
- return enqueue(new Pong(message));
+ return transport.sendPong(message);
}
@Override
public CompletableFuture<WebSocket> sendClose(int statusCode, String reason) {
if (!isLegalToSendFromClient(statusCode)) {
- return failedFuture(
- new IllegalArgumentException("statusCode: " + statusCode));
+ return failedFuture(new IllegalArgumentException("statusCode"));
}
- Close msg;
- try {
- msg = new Close(statusCode, reason);
- } catch (IllegalArgumentException e) {
- return failedFuture(e);
- }
- outputClosed = true;
- return enqueueClose(msg);
+ return sendClose0(statusCode, reason);
}
/*
- * Sends a Close message, then shuts down the transmitter since no more
+ * Sends a Close message, then shuts down the output since no more
* messages are expected to be sent after this.
+ *
+ * TODO: Even if arguments are illegal the default message will be sent.
*/
- private CompletableFuture<WebSocket> enqueueClose(Close m) {
+ private CompletableFuture<WebSocket> sendClose0(int statusCode, String reason ) {
// TODO: MUST be a CF created once and shared across sendClose, otherwise
// a second sendClose may prematurely close the channel
- return enqueue(m)
+ outputClosed = true;
+ return transport.sendClose(statusCode, reason)
.orTimeout(60, TimeUnit.SECONDS)
.whenComplete((r, error) -> {
try {
- transmitter.close();
+ transport.closeOutput();
} catch (IOException e) {
Log.logError(e);
}
if (error instanceof TimeoutException) {
try {
- receiver.close();
+ transport.closeInput();
} catch (IOException e) {
Log.logError(e);
}
@@ -223,77 +220,6 @@
});
}
- /*
- * Accepts the given message into the outgoing queue in a mutually-exclusive
- * fashion in respect to other messages accepted through this method. No
- * further messages will be accepted until the returned CompletableFuture
- * completes. This method is used to enforce "one outstanding send
- * operation" policy.
- */
- private CompletableFuture<WebSocket> enqueueExclusively(OutgoingMessage m) {
- if (!outstandingSend.compareAndSet(false, true)) {
- return failedFuture(new IllegalStateException("Send pending"));
- }
- return enqueue(m).whenComplete((r, e) -> outstandingSend.set(false));
- }
-
- private CompletableFuture<WebSocket> enqueue(OutgoingMessage m) {
- CompletableFuture<WebSocket> cf = new MinimalFuture<>();
- boolean added = queue.add(pair(m, cf));
- if (!added) {
- // The queue is supposed to be unbounded
- throw new InternalError();
- }
- sendScheduler.runOrSchedule();
- return cf;
- }
-
- /*
- * This is a message sending task. It pulls messages from the queue one by
- * one and sends them. It may be run in different threads, but never
- * concurrently.
- */
- private class SendTask implements SequentialScheduler.RestartableTask {
-
- @Override
- public void run(DeferredCompleter taskCompleter) {
- Pair<OutgoingMessage, CompletableFuture<WebSocket>> p = queue.poll();
- if (p == null) {
- taskCompleter.complete();
- return;
- }
- OutgoingMessage message = p.first;
- CompletableFuture<WebSocket> cf = p.second;
- try {
- if (!message.contextualize(context)) { // Do not send the message
- cf.complete(null);
- repeat(taskCompleter);
- return;
- }
- Consumer<Exception> h = e -> {
- if (e == null) {
- cf.complete(WebSocketImpl.this);
- } else {
- cf.completeExceptionally(e);
- }
- repeat(taskCompleter);
- };
- transmitter.send(message, h);
- } catch (Throwable t) {
- cf.completeExceptionally(t);
- repeat(taskCompleter);
- }
- }
-
- private void repeat(DeferredCompleter taskCompleter) {
- taskCompleter.complete();
- // More than a single message may have been enqueued while
- // the task has been busy with the current message, but
- // there is only a single signal recorded
- sendScheduler.runOrSchedule();
- }
- }
-
@Override
public void request(long n) {
if (demand.increase(n)) {
@@ -348,7 +274,7 @@
*/
private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask {
- // Receiver only asked here and nowhere else because we must make sure
+ // Transport only asked here and nowhere else because we must make sure
// onOpen is invoked first and no messages become pending before onOpen
// finishes
@@ -387,7 +313,7 @@
case IDLE:
if (demand.tryDecrement()
&& tryChangeState(IDLE, WAITING)) {
- receiver.request(1);
+ transport.request(1);
}
return;
case WAITING:
@@ -404,13 +330,13 @@
}
private void processError() throws IOException {
- receiver.close();
+ transport.closeInput();
receiveScheduler.stop();
Throwable err = error.get();
if (err instanceof FailWebSocketException) {
int code1 = ((FailWebSocketException) err).getStatusCode();
err = new ProtocolException().initCause(err);
- enqueueClose(new Close(code1, ""))
+ sendClose0(code1, "")
.whenComplete(
(r, e) -> {
if (e != null) {
@@ -422,7 +348,7 @@
}
private void processClose() throws IOException {
- receiver.close();
+ transport.closeInput();
receiveScheduler.stop();
CompletionStage<?> readyToClose;
readyToClose = listener.onClose(WebSocketImpl.this, statusCode, reason);
@@ -436,7 +362,7 @@
code = statusCode;
}
readyToClose.whenComplete((r, e) -> {
- enqueueClose(new Close(code, ""))
+ sendClose0(code, "")
.whenComplete((r1, e1) -> {
if (e1 != null) {
Log.logError(e1);
@@ -458,7 +384,7 @@
.put(binaryData)
.flip();
// Non-exclusive send;
- CompletableFuture<WebSocket> pongSent = enqueue(new Pong(copy));
+ CompletableFuture<WebSocket> pongSent = transport.sendPong(copy);
pongSent.whenComplete(
(r, e) -> {
if (e != null) {
@@ -499,9 +425,9 @@
private void close() {
try {
try {
- receiver.close();
+ transport.closeInput();
} finally {
- transmitter.close();
+ transport.closeOutput();
}
} catch (Throwable t) {
Log.logError(t);
@@ -520,7 +446,7 @@
Log.logTrace("Close: {0}, ''{1}''", statusCode, reason);
} else {
try {
- receiver.close();
+ transport.closeInput();
} catch (Throwable t) {
Log.logError(t);
}
@@ -531,7 +457,7 @@
@Override
public void onText(CharSequence data, MessagePart part) {
- receiver.acknowledge();
+ transport.acknowledgeReception();
text = data;
WebSocketImpl.this.part = part;
tryChangeState(WAITING, TEXT);
@@ -539,7 +465,7 @@
@Override
public void onBinary(ByteBuffer data, MessagePart part) {
- receiver.acknowledge();
+ transport.acknowledgeReception();
binaryData = data;
WebSocketImpl.this.part = part;
tryChangeState(WAITING, BINARY);
@@ -547,27 +473,27 @@
@Override
public void onPing(ByteBuffer data) {
- receiver.acknowledge();
+ transport.acknowledgeReception();
binaryData = data;
tryChangeState(WAITING, PING);
}
@Override
public void onPong(ByteBuffer data) {
- receiver.acknowledge();
+ transport.acknowledgeReception();
binaryData = data;
tryChangeState(WAITING, PONG);
}
@Override
public void onClose(int statusCode, CharSequence reason) {
- receiver.acknowledge();
+ transport.acknowledgeReception();
signalClose(statusCode, reason.toString());
}
@Override
public void onComplete() {
- receiver.acknowledge();
+ transport.acknowledgeReception();
signalClose(CLOSED_ABNORMALLY, "");
}
@@ -602,4 +528,9 @@
}
return false;
}
+
+ /* Exposed for testing purposes */
+ protected final Transport<WebSocket> transport() {
+ return transport;
+ }
}
--- a/test/jdk/java/net/httpclient/websocket/ReceivingTestDriver.java Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,29 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-/*
- * @test
- * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.ReceivingTest
- */
-public class ReceivingTestDriver { }
--- a/test/jdk/java/net/httpclient/websocket/SendingTestDriver.java Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,29 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-/*
- * @test
- * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
- * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.SendingTest
- */
-public class SendingTestDriver { }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketImplDriver.java Fri Dec 15 00:47:16 2017 +0300
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open
+ * @run testng/othervm/timeout=30 --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.WebSocketImplTest
+ */
+public class WebSocketImplDriver { }
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockListener.java Thu Dec 14 18:41:57 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockListener.java Fri Dec 15 00:47:16 2017 +0300
@@ -39,7 +39,7 @@
private final long bufferSize;
private long count;
- private final List<ListenerInvocation> invocations = new ArrayList<>();
+ private final List<Invocation> invocations = new ArrayList<>();
private final CompletableFuture<?> lastCall = new CompletableFuture<>();
/*
@@ -147,11 +147,11 @@
webSocket.request(count);
}
- public List<ListenerInvocation> invocations() {
+ public List<Invocation> invocations() {
return new ArrayList<>(invocations);
}
- public abstract static class ListenerInvocation {
+ public abstract static class Invocation {
public static OnOpen onOpen(WebSocket webSocket) {
return new OnOpen(webSocket);
@@ -192,12 +192,12 @@
final WebSocket webSocket;
- private ListenerInvocation(WebSocket webSocket) {
+ private Invocation(WebSocket webSocket) {
this.webSocket = webSocket;
}
}
- public static final class OnOpen extends ListenerInvocation {
+ public static final class OnOpen extends Invocation {
public OnOpen(WebSocket webSocket) {
super(webSocket);
@@ -207,7 +207,7 @@
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- ListenerInvocation that = (ListenerInvocation) o;
+ Invocation that = (Invocation) o;
return Objects.equals(webSocket, that.webSocket);
}
@@ -217,7 +217,7 @@
}
}
- public static final class OnText extends ListenerInvocation {
+ public static final class OnText extends Invocation {
final String text;
final MessagePart part;
@@ -249,7 +249,7 @@
}
}
- public static final class OnBinary extends ListenerInvocation {
+ public static final class OnBinary extends Invocation {
final ByteBuffer data;
final MessagePart part;
@@ -281,7 +281,7 @@
}
}
- public static final class OnPing extends ListenerInvocation {
+ public static final class OnPing extends Invocation {
final ByteBuffer data;
@@ -310,7 +310,7 @@
}
}
- public static final class OnPong extends ListenerInvocation {
+ public static final class OnPong extends Invocation {
final ByteBuffer data;
@@ -339,7 +339,7 @@
}
}
- public static final class OnClose extends ListenerInvocation {
+ public static final class OnClose extends Invocation {
final int statusCode;
final String reason;
@@ -371,7 +371,7 @@
}
}
- public static final class OnError extends ListenerInvocation {
+ public static final class OnError extends Invocation {
final Class<? extends Throwable> clazz;
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockReceiver.java Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,87 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.websocket;
-
-import jdk.incubator.http.internal.common.Pair;
-import jdk.incubator.http.internal.common.SequentialScheduler;
-import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.function.Consumer;
-
-public class MockReceiver extends Receiver {
-
- private final Iterator<Pair<CompletionStage<?>, Consumer<MessageStreamConsumer>>> iterator;
- private final MessageStreamConsumer consumer;
-
- public MockReceiver(MessageStreamConsumer consumer, RawChannel channel,
- Pair<CompletionStage<?>, Consumer<MessageStreamConsumer>>... pairs) {
- super(consumer, channel);
- this.consumer = consumer;
- iterator = Arrays.asList(pairs).iterator();
- }
-
- @Override
- protected SequentialScheduler createScheduler() {
- class X { // Class is hack needed to allow the task to refer to the scheduler
- SequentialScheduler scheduler = new SequentialScheduler(task());
-
- SequentialScheduler.RestartableTask task() {
- return new SequentialScheduler.RestartableTask() {
- @Override
- public void run(DeferredCompleter taskCompleter) {
- if (!scheduler.isStopped() && !demand.isFulfilled()) {
- if (!iterator.hasNext()) {
- taskCompleter.complete();
- return;
- }
- Pair<CompletionStage<?>, Consumer<MessageStreamConsumer>> p = iterator.next();
- CompletableFuture<?> cf = p.first.toCompletableFuture();
- if (cf.isDone()) { // Forcing synchronous execution
- p.second.accept(consumer);
- repeat(taskCompleter);
- } else {
- cf.whenCompleteAsync((r, e) -> {
- p.second.accept(consumer);
- repeat(taskCompleter);
- });
- }
- } else {
- taskCompleter.complete();
- }
- }
-
- private void repeat(DeferredCompleter taskCompleter) {
- taskCompleter.complete();
- scheduler.runOrSchedule();
- }
- };
- }
- }
- return new X().scheduler;
- }
-}
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockTransmitter.java Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,71 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.websocket;
-
-import java.util.Queue;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.Consumer;
-
-public abstract class MockTransmitter extends Transmitter {
-
- private final long startTime = System.currentTimeMillis();
-
- private final Queue<OutgoingMessage> messages = new ConcurrentLinkedQueue<>();
-
- public MockTransmitter() {
- super(null);
- }
-
- @Override
- public void send(OutgoingMessage message,
- Consumer<Exception> completionHandler) {
- System.out.printf("[%6s ms.] begin send(%s)%n",
- System.currentTimeMillis() - startTime,
- message);
- messages.add(message);
- whenSent().whenComplete((r, e) -> {
- System.out.printf("[%6s ms.] complete send(%s)%n",
- System.currentTimeMillis() - startTime,
- message);
- if (e != null) {
- completionHandler.accept((Exception) e);
- } else {
- completionHandler.accept(null);
- }
- });
- System.out.printf("[%6s ms.] end send(%s)%n",
- System.currentTimeMillis() - startTime,
- message);
- }
-
- @Override
- public void close() { }
-
- protected abstract CompletionStage<?> whenSent();
-
- public Queue<OutgoingMessage> queue() {
- return messages;
- }
-}
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockTransport.java Thu Dec 14 18:41:57 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockTransport.java Fri Dec 15 00:47:16 2017 +0300
@@ -23,46 +23,414 @@
package jdk.incubator.http.internal.websocket;
+import jdk.incubator.http.WebSocket.MessagePart;
+import jdk.incubator.http.internal.common.Demand;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static jdk.incubator.http.internal.websocket.TestSupport.fullCopy;
+
+public class MockTransport<T> implements Transport<T> {
+
+ private final long startTime = System.currentTimeMillis();
+ private final Queue<Invocation> output = new ConcurrentLinkedQueue<>();
+ private final Queue<CompletableFuture<Consumer<MessageStreamConsumer>>>
+ input = new ConcurrentLinkedQueue<>();
+ private final Supplier<T> supplier;
+ private final MessageStreamConsumer consumer;
+ private final SequentialScheduler scheduler
+ = new SequentialScheduler(new ReceiveTask());
+ private final Demand demand = new Demand();
+
+ public MockTransport(Supplier<T> sendResultSupplier,
+ MessageStreamConsumer consumer) {
+ this.supplier = sendResultSupplier;
+ this.consumer = consumer;
+ input.addAll(receive());
+ }
+
+ @Override
+ public final CompletableFuture<T> sendText(CharSequence message,
+ boolean isLast) {
+ output.add(Invocation.sendText(message, isLast));
+ return send(String.format("sendText(%s, %s)", message, isLast),
+ () -> sendText0(message, isLast));
+ }
-public class MockTransport extends TransportSupplier {
+ protected CompletableFuture<T> sendText0(CharSequence message,
+ boolean isLast) {
+ return defaultSend();
+ }
+
+ protected CompletableFuture<T> defaultSend() {
+ return CompletableFuture.completedFuture(result());
+ }
+
+ @Override
+ public final CompletableFuture<T> sendBinary(ByteBuffer message,
+ boolean isLast) {
+ output.add(Invocation.sendBinary(message, isLast));
+ return send(String.format("sendBinary(%s, %s)", message, isLast),
+ () -> sendBinary0(message, isLast));
+ }
+
+ protected CompletableFuture<T> sendBinary0(ByteBuffer message,
+ boolean isLast) {
+ return defaultSend();
+ }
- public MockTransport() {
- super(new NullRawChannel());
+ @Override
+ public final CompletableFuture<T> sendPing(ByteBuffer message) {
+ output.add(Invocation.sendPing(message));
+ return send(String.format("sendPing(%s)", message),
+ () -> sendPing0(message));
+ }
+
+ protected CompletableFuture<T> sendPing0(ByteBuffer message) {
+ return defaultSend();
+ }
+
+ @Override
+ public final CompletableFuture<T> sendPong(ByteBuffer message) {
+ output.add(Invocation.sendPong(message));
+ return send(String.format("sendPong(%s)", message),
+ () -> sendPong0(message));
+ }
+
+ protected CompletableFuture<T> sendPong0(ByteBuffer message) {
+ return defaultSend();
+ }
+
+ @Override
+ public final CompletableFuture<T> sendClose(int statusCode, String reason) {
+ output.add(Invocation.sendClose(statusCode, reason));
+ return send(String.format("sendClose(%s, %s)", statusCode, reason),
+ () -> sendClose0(statusCode, reason));
+ }
+
+ protected CompletableFuture<T> sendClose0(int statusCode, String reason) {
+ return defaultSend();
}
- public static class NullRawChannel implements RawChannel {
+ protected Collection<CompletableFuture<Consumer<MessageStreamConsumer>>> receive() {
+ return List.of();
+ }
+
+ public static Consumer<MessageStreamConsumer> onText(CharSequence data,
+ MessagePart part) {
+ return c -> c.onText(data.toString(), part);
+ }
+
+ public static Consumer<MessageStreamConsumer> onBinary(ByteBuffer data,
+ MessagePart part) {
+ return c -> c.onBinary(fullCopy(data), part);
+ }
+
+ public static Consumer<MessageStreamConsumer> onPing(ByteBuffer data) {
+ return c -> c.onPing(fullCopy(data));
+ }
+
+ public static Consumer<MessageStreamConsumer> onPong(ByteBuffer data) {
+ return c -> c.onPong(fullCopy(data));
+ }
+
+ public static Consumer<MessageStreamConsumer> onClose(int statusCode,
+ String reason) {
+ return c -> c.onClose(statusCode, reason);
+ }
+
+ public static Consumer<MessageStreamConsumer> onError(Throwable error) {
+ return c -> c.onError(error);
+ }
+
+ public static Consumer<MessageStreamConsumer> onComplete() {
+ return c -> c.onComplete();
+ }
- @Override
- public void registerEvent(RawEvent event) {
- throw new UnsupportedOperationException();
+ @Override
+ public void request(long n) {
+ demand.increase(n);
+ scheduler.runOrSchedule();
+ }
+
+ @Override
+ public void acknowledgeReception() {
+ demand.tryDecrement();
+ }
+
+ @Override
+ public final void closeOutput() throws IOException {
+ output.add(Invocation.closeOutput());
+ begin("closeOutput()");
+ closeOutput0();
+ end("closeOutput()");
+ }
+
+ protected void closeOutput0() throws IOException {
+ defaultClose();
+ }
+
+ protected void defaultClose() throws IOException {
+ }
+
+ @Override
+ public final void closeInput() throws IOException {
+ output.add(Invocation.closeInput());
+ begin("closeInput()");
+ closeInput0();
+ end("closeInput()");
+ }
+
+ protected void closeInput0() throws IOException {
+ defaultClose();
+ }
+
+ public abstract static class Invocation {
+
+ static Invocation.SendText sendText(CharSequence message,
+ boolean isLast) {
+ return new SendText(message, isLast);
}
- @Override
- public ByteBuffer initialByteBuffer() {
- return ByteBuffer.allocate(0);
+ static Invocation.SendBinary sendBinary(ByteBuffer message,
+ boolean isLast) {
+ return new SendBinary(message, isLast);
+ }
+
+ static Invocation.SendPing sendPing(ByteBuffer message) {
+ return new SendPing(message);
+ }
+
+ static Invocation.SendPong sendPong(ByteBuffer message) {
+ return new SendPong(message);
+ }
+
+ static Invocation.SendClose sendClose(int statusCode, String reason) {
+ return new SendClose(statusCode, reason);
+ }
+
+ public static CloseOutput closeOutput() {
+ return new CloseOutput();
}
- @Override
- public ByteBuffer read() {
- throw new UnsupportedOperationException();
+ public static CloseInput closeInput() {
+ return new CloseInput();
+ }
+
+ public static final class SendText extends Invocation {
+
+ final CharSequence message;
+ final boolean isLast;
+
+ SendText(CharSequence message, boolean isLast) {
+ this.message = message.toString();
+ this.isLast = isLast;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null || getClass() != obj.getClass()) return false;
+ SendText sendText = (SendText) obj;
+ return isLast == sendText.isLast &&
+ Objects.equals(message, sendText.message);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(isLast, message);
+ }
}
- @Override
- public long write(ByteBuffer[] srcs, int offset, int length) {
- throw new UnsupportedOperationException();
+ public static final class SendBinary extends Invocation {
+
+ final ByteBuffer message;
+ final boolean isLast;
+
+ SendBinary(ByteBuffer message, boolean isLast) {
+ this.message = fullCopy(message);
+ this.isLast = isLast;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null || getClass() != obj.getClass()) return false;
+ SendBinary that = (SendBinary) obj;
+ return isLast == that.isLast &&
+ Objects.equals(message, that.message);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(message, isLast);
+ }
+ }
+
+ private static final class SendPing extends Invocation {
+
+ final ByteBuffer message;
+
+ SendPing(ByteBuffer message) {
+ this.message = fullCopy(message);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null || getClass() != obj.getClass()) return false;
+ SendPing sendPing = (SendPing) obj;
+ return Objects.equals(message, sendPing.message);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(message);
+ }
+ }
+
+ private static final class SendPong extends Invocation {
+
+ final ByteBuffer message;
+
+ SendPong(ByteBuffer message) {
+ this.message = fullCopy(message);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null || getClass() != obj.getClass()) return false;
+ SendPing sendPing = (SendPing) obj;
+ return Objects.equals(message, sendPing.message);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(message);
+ }
}
- @Override
- public void shutdownInput() {
+ private static final class SendClose extends Invocation {
+
+ final int statusCode;
+ final String reason;
+
+ SendClose(int statusCode, String reason) {
+ this.statusCode = statusCode;
+ this.reason = reason;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null || getClass() != obj.getClass()) return false;
+ SendClose sendClose = (SendClose) obj;
+ return statusCode == sendClose.statusCode &&
+ Objects.equals(reason, sendClose.reason);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(statusCode, reason);
+ }
+ }
+
+ private static final class CloseOutput extends Invocation {
+
+ CloseOutput() { }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof CloseOutput;
+ }
}
+ private static final class CloseInput extends Invocation {
+
+ CloseInput() { }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof CloseInput;
+ }
+ }
+ }
+
+ public Queue<Invocation> invocations() {
+ return new LinkedList<>(output);
+ }
+
+ protected final T result() {
+ return supplier.get();
+ }
+
+ private CompletableFuture<T> send(String name,
+ Supplier<CompletableFuture<T>> supplier) {
+ begin(name);
+ CompletableFuture<T> cf = supplier.get().whenComplete((r, e) -> {
+ System.out.printf("[%6s ms.] complete %s%n", elapsedTime(), name);
+ });
+ end(name);
+ return cf;
+ }
+
+ private void begin(String name) {
+ System.out.printf("[%6s ms.] begin %s%n", elapsedTime(), name);
+ }
+
+ private void end(String name) {
+ System.out.printf("[%6s ms.] end %s%n", elapsedTime(), name);
+ }
+
+ private long elapsedTime() {
+ return System.currentTimeMillis() - startTime;
+ }
+
+ private final class ReceiveTask implements SequentialScheduler.RestartableTask {
+
@Override
- public void shutdownOutput() {
+ public void run(SequentialScheduler.DeferredCompleter taskCompleter) {
+ if (!scheduler.isStopped() && !demand.isFulfilled() && !input.isEmpty()) {
+ CompletableFuture<Consumer<MessageStreamConsumer>> cf = input.remove();
+ if (cf.isDone()) { // Forcing synchronous execution
+ cf.join().accept(consumer);
+ repeat(taskCompleter);
+ } else {
+ cf.whenCompleteAsync((r, e) -> {
+ r.accept(consumer);
+ repeat(taskCompleter);
+ });
+ }
+ } else {
+ taskCompleter.complete();
+ }
}
- @Override
- public void close() {
+ private void repeat(SequentialScheduler.DeferredCompleter taskCompleter) {
+ taskCompleter.complete();
+ scheduler.runOrSchedule();
}
}
}
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/ReceivingTest.java Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,206 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.websocket;
-
-import jdk.incubator.http.WebSocket;
-import org.testng.annotations.Test;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.TimeUnit;
-
-import static java.util.concurrent.CompletableFuture.completedStage;
-import static jdk.incubator.http.WebSocket.MessagePart.FIRST;
-import static jdk.incubator.http.WebSocket.MessagePart.LAST;
-import static jdk.incubator.http.WebSocket.MessagePart.PART;
-import static jdk.incubator.http.WebSocket.MessagePart.WHOLE;
-import static jdk.incubator.http.WebSocket.NORMAL_CLOSURE;
-import static jdk.incubator.http.internal.common.Pair.pair;
-import static jdk.incubator.http.internal.websocket.MockListener.ListenerInvocation.onClose;
-import static jdk.incubator.http.internal.websocket.MockListener.ListenerInvocation.onError;
-import static jdk.incubator.http.internal.websocket.MockListener.ListenerInvocation.onOpen;
-import static jdk.incubator.http.internal.websocket.MockListener.ListenerInvocation.onPing;
-import static jdk.incubator.http.internal.websocket.MockListener.ListenerInvocation.onPong;
-import static jdk.incubator.http.internal.websocket.MockListener.ListenerInvocation.onText;
-import static org.testng.Assert.assertEquals;
-
-public class ReceivingTest {
-
- // TODO: request in onClose/onError
- // TODO: throw exception in onClose/onError
- // TODO: exception is thrown from request()
-
- @Test
- public void testNonPositiveRequest() throws Exception {
- MockListener listener = new MockListener(Long.MAX_VALUE) {
- @Override
- protected void onOpen0(WebSocket webSocket) {
- webSocket.request(0);
- }
- };
- MockTransport transport = new MockTransport() {
- @Override
- protected Receiver newReceiver(MessageStreamConsumer consumer) {
- return new MockReceiver(consumer, channel, pair(now(), m -> m.onText("1", WHOLE)));
- }
- };
- WebSocket ws = newInstance(listener, transport);
- listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
- List<MockListener.ListenerInvocation> invocations = listener.invocations();
- assertEquals(invocations, List.of(onOpen(ws), onError(ws, IllegalArgumentException.class)));
- }
-
- @Test
- public void testText1() throws Exception {
- MockListener listener = new MockListener(Long.MAX_VALUE);
- MockTransport transport = new MockTransport() {
- @Override
- protected Receiver newReceiver(MessageStreamConsumer consumer) {
- return new MockReceiver(consumer, channel,
- pair(now(), m -> m.onText("1", FIRST)),
- pair(now(), m -> m.onText("2", PART)),
- pair(now(), m -> m.onText("3", LAST)),
- pair(now(), m -> m.onClose(NORMAL_CLOSURE, "no reason")));
- }
- };
- WebSocket ws = newInstance(listener, transport);
- listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
- List<MockListener.ListenerInvocation> invocations = listener.invocations();
- assertEquals(invocations, List.of(onOpen(ws),
- onText(ws, "1", FIRST),
- onText(ws, "2", PART),
- onText(ws, "3", LAST),
- onClose(ws, NORMAL_CLOSURE, "no reason")));
- }
-
- @Test
- public void testText2() throws Exception {
- MockListener listener = new MockListener(Long.MAX_VALUE);
- MockTransport transport = new MockTransport() {
- @Override
- protected Receiver newReceiver(MessageStreamConsumer consumer) {
- return new MockReceiver(consumer, channel,
- pair(now(), m -> m.onText("1", FIRST)),
- pair(seconds(1), m -> m.onText("2", PART)),
- pair(now(), m -> m.onText("3", LAST)),
- pair(seconds(1), m -> m.onClose(NORMAL_CLOSURE, "no reason")));
- }
- };
- WebSocket ws = newInstance(listener, transport);
- listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
- List<MockListener.ListenerInvocation> invocations = listener.invocations();
- assertEquals(invocations, List.of(onOpen(ws),
- onText(ws, "1", FIRST),
- onText(ws, "2", PART),
- onText(ws, "3", LAST),
- onClose(ws, NORMAL_CLOSURE, "no reason")));
- }
-
- @Test
- public void testTextIntermixedWithPongs() throws Exception {
- MockListener listener = new MockListener(Long.MAX_VALUE);
- MockTransport transport = new MockTransport() {
- @Override
- protected Receiver newReceiver(MessageStreamConsumer consumer) {
- return new MockReceiver(consumer, channel,
- pair(now(), m -> m.onText("1", FIRST)),
- pair(now(), m -> m.onText("2", PART)),
- pair(now(), m -> m.onPong(ByteBuffer.allocate(16))),
- pair(seconds(1), m -> m.onPong(ByteBuffer.allocate(32))),
- pair(now(), m -> m.onText("3", LAST)),
- pair(now(), m -> m.onPong(ByteBuffer.allocate(64))),
- pair(now(), m -> m.onClose(NORMAL_CLOSURE, "no reason")));
- }
- };
- WebSocket ws = newInstance(listener, transport);
- listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
- List<MockListener.ListenerInvocation> invocations = listener.invocations();
- assertEquals(invocations, List.of(onOpen(ws),
- onText(ws, "1", FIRST),
- onText(ws, "2", PART),
- onPong(ws, ByteBuffer.allocate(16)),
- onPong(ws, ByteBuffer.allocate(32)),
- onText(ws, "3", LAST),
- onPong(ws, ByteBuffer.allocate(64)),
- onClose(ws, NORMAL_CLOSURE, "no reason")));
- }
-
- @Test
- public void testTextIntermixedWithPings() throws Exception {
- MockListener listener = new MockListener(Long.MAX_VALUE);
- MockTransport transport = new MockTransport() {
- @Override
- protected Receiver newReceiver(MessageStreamConsumer consumer) {
- return new MockReceiver(consumer, channel,
- pair(now(), m -> m.onText("1", FIRST)),
- pair(now(), m -> m.onText("2", PART)),
- pair(now(), m -> m.onPing(ByteBuffer.allocate(16))),
- pair(seconds(1), m -> m.onPing(ByteBuffer.allocate(32))),
- pair(now(), m -> m.onText("3", LAST)),
- pair(now(), m -> m.onPing(ByteBuffer.allocate(64))),
- pair(now(), m -> m.onClose(NORMAL_CLOSURE, "no reason")));
- }
-
- @Override
- protected Transmitter newTransmitter() {
- return new MockTransmitter() {
- @Override
- protected CompletionStage<?> whenSent() {
- return now();
- }
- };
- }
- };
- WebSocket ws = newInstance(listener, transport);
- listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
- List<MockListener.ListenerInvocation> invocations = listener.invocations();
- System.out.println(invocations);
- assertEquals(invocations, List.of(onOpen(ws),
- onText(ws, "1", FIRST),
- onText(ws, "2", PART),
- onPing(ws, ByteBuffer.allocate(16)),
- onPing(ws, ByteBuffer.allocate(32)),
- onText(ws, "3", LAST),
- onPing(ws, ByteBuffer.allocate(64)),
- onClose(ws, NORMAL_CLOSURE, "no reason")));
- }
-
- private static CompletionStage<?> seconds(long s) {
- return new CompletableFuture<>().completeOnTimeout(null, s, TimeUnit.SECONDS);
- }
-
- private static CompletionStage<?> now() {
- return completedStage(null);
- }
-
- private static WebSocket newInstance(WebSocket.Listener listener,
- TransportSupplier transport) {
- URI uri = URI.create("ws://localhost");
- String subprotocol = "";
- return WebSocketImpl.newInstance(uri, subprotocol, listener, transport);
- }
-}
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/SendingTest.java Thu Dec 14 18:41:57 2017 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,164 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.websocket;
-
-import jdk.incubator.http.WebSocket;
-import org.testng.annotations.Test;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static jdk.incubator.http.WebSocket.NORMAL_CLOSURE;
-import static jdk.incubator.http.internal.websocket.TestSupport.assertCompletesExceptionally;
-import static jdk.incubator.http.internal.websocket.WebSocketImpl.newInstance;
-import static org.testng.Assert.assertEquals;
-
-public class SendingTest {
-
- @Test
- public void sendTextImmediately() {
- MockTransmitter t = new MockTransmitter() {
- @Override
- protected CompletionStage<?> whenSent() {
- return CompletableFuture.completedFuture(null);
- }
- };
- WebSocket ws = newWebSocket(t);
- CompletableFuture.completedFuture(ws)
- .thenCompose(w -> w.sendText("1", true))
- .thenCompose(w -> w.sendText("2", true))
- .thenCompose(w -> w.sendText("3", true))
- .join();
-
- assertEquals(t.queue().size(), 3);
- }
-
- @Test
- public void sendTextWithDelay() {
- MockTransmitter t = new MockTransmitter() {
- @Override
- protected CompletionStage<?> whenSent() {
- return new CompletableFuture<>()
- .completeOnTimeout(null, 1, TimeUnit.SECONDS);
- }
- };
- WebSocket ws = newWebSocket(t);
- CompletableFuture.completedFuture(ws)
- .thenCompose(w -> w.sendText("1", true))
- .thenCompose(w -> w.sendText("2", true))
- .thenCompose(w -> w.sendText("3", true))
- .join();
-
- assertEquals(t.queue().size(), 3);
- }
-
- @Test
- public void sendTextMixedDelay() {
- Random r = new Random();
-
- MockTransmitter t = new MockTransmitter() {
- @Override
- protected CompletionStage<?> whenSent() {
- return r.nextBoolean() ?
- new CompletableFuture<>().completeOnTimeout(null, 1, TimeUnit.SECONDS) :
- CompletableFuture.completedFuture(null);
- }
- };
- WebSocket ws = newWebSocket(t);
- CompletableFuture.completedFuture(ws)
- .thenCompose(w -> w.sendText("1", true))
- .thenCompose(w -> w.sendText("2", true))
- .thenCompose(w -> w.sendText("3", true))
- .thenCompose(w -> w.sendText("4", true))
- .thenCompose(w -> w.sendText("5", true))
- .thenCompose(w -> w.sendText("6", true))
- .thenCompose(w -> w.sendText("7", true))
- .thenCompose(w -> w.sendText("8", true))
- .thenCompose(w -> w.sendText("9", true))
- .join();
-
- assertEquals(t.queue().size(), 9);
- }
-
- @Test
- public void sendControlMessagesConcurrently() {
-
- CompletableFuture<?> first = new CompletableFuture<>();
-
- MockTransmitter t = new MockTransmitter() {
-
- final AtomicInteger i = new AtomicInteger();
-
- @Override
- protected CompletionStage<?> whenSent() {
- if (i.incrementAndGet() == 1) {
- return first;
- } else {
- return CompletableFuture.completedFuture(null);
- }
- }
- };
- WebSocket ws = newWebSocket(t);
-
- CompletableFuture<?> cf1 = ws.sendPing(ByteBuffer.allocate(0));
- CompletableFuture<?> cf2 = ws.sendPong(ByteBuffer.allocate(0));
- CompletableFuture<?> cf3 = ws.sendClose(NORMAL_CLOSURE, "");
- CompletableFuture<?> cf4 = ws.sendClose(NORMAL_CLOSURE, "");
- CompletableFuture<?> cf5 = ws.sendPing(ByteBuffer.allocate(0));
- CompletableFuture<?> cf6 = ws.sendPong(ByteBuffer.allocate(0));
-
- first.complete(null);
- // Don't care about exceptional completion, only that all of them have
- // completed
- CompletableFuture.allOf(cf1, cf2, cf3, cf4, cf5, cf6)
- .handle((v, e) -> null).join();
-
- cf3.join(); /* Check that sendClose has completed normally */
- cf4.join(); /* Check that repeated sendClose has completed normally */
- assertCompletesExceptionally(IllegalStateException.class, cf5);
- assertCompletesExceptionally(IllegalStateException.class, cf6);
-
- assertEquals(t.queue().size(), 3); // 6 minus 3 that were not accepted
- }
-
- private static WebSocket newWebSocket(Transmitter transmitter) {
- URI uri = URI.create("ws://localhost");
- String subprotocol = "";
- TransportSupplier transport = new MockTransport() {
- @Override
- public Transmitter transmitter() {
- return transmitter;
- }
- };
- return newInstance(uri,
- subprotocol,
- new MockListener(Long.MAX_VALUE),
- transport);
- }
-}
--- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/TestSupport.java Thu Dec 14 18:41:57 2017 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/TestSupport.java Fri Dec 15 00:47:16 2017 +0300
@@ -297,13 +297,13 @@
} catch (Throwable t) {
caught = t;
}
+ if (caught == null) {
+ throw new AssertionFailedException("No exception was thrown");
+ }
if (predicate.test(caught)) {
System.out.println("Got expected exception: " + caught);
return caught;
}
- if (caught == null) {
- throw new AssertionFailedException("No exception was thrown");
- }
throw new AssertionFailedException("Caught exception didn't match the predicate", caught);
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/WebSocketImplTest.java Fri Dec 15 00:47:16 2017 +0300
@@ -0,0 +1,380 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.incubator.http.internal.websocket;
+
+import jdk.incubator.http.WebSocket;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static jdk.incubator.http.WebSocket.MessagePart.FIRST;
+import static jdk.incubator.http.WebSocket.MessagePart.LAST;
+import static jdk.incubator.http.WebSocket.MessagePart.PART;
+import static jdk.incubator.http.WebSocket.MessagePart.WHOLE;
+import static jdk.incubator.http.WebSocket.NORMAL_CLOSURE;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onClose;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onError;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onOpen;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onPing;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onPong;
+import static jdk.incubator.http.internal.websocket.MockListener.Invocation.onText;
+import static jdk.incubator.http.internal.websocket.MockTransport.onClose;
+import static jdk.incubator.http.internal.websocket.MockTransport.onPing;
+import static jdk.incubator.http.internal.websocket.MockTransport.onPong;
+import static jdk.incubator.http.internal.websocket.MockTransport.onText;
+import static jdk.incubator.http.internal.websocket.TestSupport.assertCompletesExceptionally;
+import static org.testng.Assert.assertEquals;
+
+/*
+ * Formatting in this file may seem strange:
+ *
+ * (
+ * ( ...)
+ * ...
+ * )
+ * ...
+ *
+ * However there is a rationale behind it. Sometimes the level of argument
+ * nesting is high, which makes it hard to manage parentheses.
+ */
+public class WebSocketImplTest {
+
+ // TODO: request in onClose/onError
+ // TODO: throw exception in onClose/onError
+ // TODO: exception is thrown from request()
+
+ @Test
+ public void testNonPositiveRequest() throws Exception {
+ MockListener listener = new MockListener(Long.MAX_VALUE) {
+ @Override
+ protected void onOpen0(WebSocket webSocket) {
+ webSocket.request(0);
+ }
+ };
+ WebSocket ws = newInstance(listener, List.of(now(onText("1", WHOLE))));
+ listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
+ List<MockListener.Invocation> invocations = listener.invocations();
+ assertEquals(
+ invocations,
+ List.of(
+ onOpen(ws),
+ onError(ws, IllegalArgumentException.class)
+ )
+ );
+ }
+
+ @Test
+ public void testText1() throws Exception {
+ MockListener listener = new MockListener(Long.MAX_VALUE);
+ WebSocket ws = newInstance(
+ listener,
+ List.of(
+ now(onText("1", FIRST)),
+ now(onText("2", PART)),
+ now(onText("3", LAST)),
+ now(onClose(NORMAL_CLOSURE, "no reason"))
+ )
+ );
+ listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
+ List<MockListener.Invocation> invocations = listener.invocations();
+ assertEquals(
+ invocations,
+ List.of(
+ onOpen(ws),
+ onText(ws, "1", FIRST),
+ onText(ws, "2", PART),
+ onText(ws, "3", LAST),
+ onClose(ws, NORMAL_CLOSURE, "no reason")
+ )
+ );
+ }
+
+ @Test
+ public void testText2() throws Exception {
+ MockListener listener = new MockListener(Long.MAX_VALUE);
+ WebSocket ws = newInstance(
+ listener,
+ List.of(
+ now(onText("1", FIRST)),
+ seconds(1, onText("2", PART)),
+ now(onText("3", LAST)),
+ seconds(1, onClose(NORMAL_CLOSURE, "no reason"))
+ )
+ );
+ listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
+ List<MockListener.Invocation> invocations = listener.invocations();
+ assertEquals(
+ invocations,
+ List.of(
+ onOpen(ws),
+ onText(ws, "1", FIRST),
+ onText(ws, "2", PART),
+ onText(ws, "3", LAST),
+ onClose(ws, NORMAL_CLOSURE, "no reason")
+ )
+ );
+ }
+
+ @Test
+ public void testTextIntermixedWithPongs() throws Exception {
+ MockListener listener = new MockListener(Long.MAX_VALUE);
+ WebSocket ws = newInstance(
+ listener,
+ List.of(
+ now(onText("1", FIRST)),
+ now(onText("2", PART)),
+ now(onPong(ByteBuffer.allocate(16))),
+ seconds(1, onPong(ByteBuffer.allocate(32))),
+ now(onText("3", LAST)),
+ now(onPong(ByteBuffer.allocate(64))),
+ now(onClose(NORMAL_CLOSURE, "no reason"))
+ )
+ );
+ listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
+ List<MockListener.Invocation> invocations = listener.invocations();
+ assertEquals(
+ invocations,
+ List.of(
+ onOpen(ws),
+ onText(ws, "1", FIRST),
+ onText(ws, "2", PART),
+ onPong(ws, ByteBuffer.allocate(16)),
+ onPong(ws, ByteBuffer.allocate(32)),
+ onText(ws, "3", LAST),
+ onPong(ws, ByteBuffer.allocate(64)),
+ onClose(ws, NORMAL_CLOSURE, "no reason")
+ )
+ );
+ }
+
+ @Test
+ public void testTextIntermixedWithPings() throws Exception {
+ MockListener listener = new MockListener(Long.MAX_VALUE);
+ WebSocket ws = newInstance(
+ listener,
+ List.of(
+ now(onText("1", FIRST)),
+ now(onText("2", PART)),
+ now(onPing(ByteBuffer.allocate(16))),
+ seconds(1, onPing(ByteBuffer.allocate(32))),
+ now(onText("3", LAST)),
+ now(onPing(ByteBuffer.allocate(64))),
+ now(onClose(NORMAL_CLOSURE, "no reason"))
+ )
+ );
+ listener.onCloseOrOnErrorCalled().get(10, TimeUnit.SECONDS);
+ List<MockListener.Invocation> invocations = listener.invocations();
+ assertEquals(
+ invocations,
+ List.of(
+ onOpen(ws),
+ onText(ws, "1", FIRST),
+ onText(ws, "2", PART),
+ onPing(ws, ByteBuffer.allocate(16)),
+ onPing(ws, ByteBuffer.allocate(32)),
+ onText(ws, "3", LAST),
+ onPing(ws, ByteBuffer.allocate(64)),
+ onClose(ws, NORMAL_CLOSURE, "no reason"))
+ );
+ }
+
+ @Test
+ public void sendTextImmediately() {
+ WebSocketImpl ws = newInstance(
+ new MockListener(1),
+ new TransportFactory() {
+ @Override
+ public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+ MessageStreamConsumer consumer) {
+ return new MockTransport<>(sendResultSupplier, consumer);
+ }
+ });
+ CompletableFuture.completedFuture(ws)
+ .thenCompose(w -> w.sendText("1", true))
+ .thenCompose(w -> w.sendText("2", true))
+ .thenCompose(w -> w.sendText("3", true))
+ .join();
+ MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+ assertEquals(transport.invocations().size(), 3);
+ }
+
+ @Test
+ public void sendTextWithDelay() {
+ MockListener listener = new MockListener(1);
+ WebSocketImpl ws = newInstance(
+ listener,
+ new TransportFactory() {
+ @Override
+ public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+ MessageStreamConsumer consumer) {
+ return new MockTransport<>(sendResultSupplier, consumer) {
+ @Override
+ protected CompletableFuture<T> defaultSend() {
+ return seconds(1, result());
+ }
+ };
+ }
+ });
+ CompletableFuture.completedFuture(ws)
+ .thenCompose(w -> w.sendText("1", true))
+ .thenCompose(w -> w.sendText("2", true))
+ .thenCompose(w -> w.sendText("3", true))
+ .join();
+ assertEquals(listener.invocations(), List.of(onOpen(ws)));
+ MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+ assertEquals(transport.invocations().size(), 3);
+ }
+
+ @Test
+ public void sendTextMixedDelay() {
+ MockListener listener = new MockListener(1);
+ WebSocketImpl ws = newInstance(
+ listener,
+ new TransportFactory() {
+
+ final Random r = new Random();
+
+ @Override
+ public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+ MessageStreamConsumer consumer) {
+ return new MockTransport<>(sendResultSupplier, consumer) {
+ @Override
+ protected CompletableFuture<T> defaultSend() {
+ return r.nextBoolean()
+ ? seconds(1, result())
+ : now(result());
+ }
+ };
+ }
+ });
+ CompletableFuture.completedFuture(ws)
+ .thenCompose(w -> w.sendText("1", true))
+ .thenCompose(w -> w.sendText("2", true))
+ .thenCompose(w -> w.sendText("3", true))
+ .thenCompose(w -> w.sendText("4", true))
+ .thenCompose(w -> w.sendText("5", true))
+ .thenCompose(w -> w.sendText("6", true))
+ .thenCompose(w -> w.sendText("7", true))
+ .thenCompose(w -> w.sendText("8", true))
+ .thenCompose(w -> w.sendText("9", true))
+ .join();
+ assertEquals(listener.invocations(), List.of(onOpen(ws)));
+ MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+ assertEquals(transport.invocations().size(), 9);
+ }
+
+ @Test(enabled = false) // temporarily disabled
+ public void sendControlMessagesConcurrently() {
+ MockListener listener = new MockListener(1);
+
+ CompletableFuture<?> first = new CompletableFuture<>(); // barrier
+
+ WebSocketImpl ws = newInstance(
+ listener,
+ new TransportFactory() {
+
+ final AtomicInteger i = new AtomicInteger();
+
+ @Override
+ public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+ MessageStreamConsumer consumer) {
+ return new MockTransport<>(sendResultSupplier, consumer) {
+ @Override
+ protected CompletableFuture<T> defaultSend() {
+ if (i.incrementAndGet() == 1) {
+ return first.thenApply(o -> result());
+ } else {
+ return now(result());
+ }
+ }
+ };
+ }
+ });
+
+ CompletableFuture<?> cf1 = ws.sendPing(ByteBuffer.allocate(0));
+ CompletableFuture<?> cf2 = ws.sendPong(ByteBuffer.allocate(0));
+ CompletableFuture<?> cf3 = ws.sendClose(NORMAL_CLOSURE, "");
+ CompletableFuture<?> cf4 = ws.sendClose(NORMAL_CLOSURE, "");
+ CompletableFuture<?> cf5 = ws.sendPing(ByteBuffer.allocate(0));
+ CompletableFuture<?> cf6 = ws.sendPong(ByteBuffer.allocate(0));
+
+ first.complete(null);
+ // Don't care about exceptional completion, only that all of them have
+ // completed
+ CompletableFuture.allOf(cf1, cf2, cf3, cf4, cf5, cf6)
+ .handle((v, e) -> null).join();
+
+ cf3.join(); /* Check that sendClose has completed normally */
+ cf4.join(); /* Check that repeated sendClose has completed normally */
+ assertCompletesExceptionally(IllegalStateException.class, cf5);
+ assertCompletesExceptionally(IllegalStateException.class, cf6);
+
+ assertEquals(listener.invocations(), List.of(onOpen(ws)));
+ MockTransport<WebSocket> transport = (MockTransport<WebSocket>) ws.transport();
+ assertEquals(transport.invocations().size(), 3); // 6 minus 3 that were not accepted
+ }
+
+ private static <T> CompletableFuture<T> seconds(long sec, T result) {
+ return new CompletableFuture<T>()
+ .completeOnTimeout(result, sec, TimeUnit.SECONDS);
+ }
+
+ private static <T> CompletableFuture<T> now(T result) {
+ return CompletableFuture.completedFuture(result);
+ }
+
+ private static WebSocketImpl newInstance(
+ WebSocket.Listener listener,
+ Collection<CompletableFuture<Consumer<MessageStreamConsumer>>> input) {
+ TransportFactory factory = new TransportFactory() {
+ @Override
+ public <T> Transport<T> createTransport(Supplier<T> sendResultSupplier,
+ MessageStreamConsumer consumer) {
+ return new MockTransport<T>(sendResultSupplier, consumer) {
+ @Override
+ protected Collection<CompletableFuture<Consumer<MessageStreamConsumer>>> receive() {
+ return input;
+ }
+ };
+ }
+ };
+ return newInstance(listener, factory);
+ }
+
+ private static WebSocketImpl newInstance(WebSocket.Listener listener,
+ TransportFactory factory) {
+ URI uri = URI.create("ws://localhost");
+ String subprotocol = "";
+ return WebSocketImpl.newInstance(uri, subprotocol, listener, factory);
+ }
+}