--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java Wed Mar 07 17:16:28 2018 +0000
@@ -0,0 +1,371 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.internal.net.http.websocket;
+
+import jdk.internal.net.http.common.Utils;
+import jdk.internal.vm.annotation.Stable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+
+/*
+ * A FIFO message storage facility.
+ *
+ * The queue supports at most one consumer and an arbitrary number of producers.
+ * Methods `peek`, `remove` and `isEmpty` must not be invoked concurrently.
+ * Methods `addText`, `addBinary`, `addPing`, `addPong` and `addClose` may be
+ * invoked concurrently.
+ *
+ * This queue is of a bounded size. The queue pre-allocates array of the said
+ * size and fills it with `Message` elements. The resulting structure never
+ * changes. This allows to avoid re-allocation and garbage collection of
+ * elements and arrays thereof. For this reason `Message` elements are never
+ * returned from the `peek` method. Instead their components passed to the
+ * provided callback.
+ *
+ * The queue consists of:
+ *
+ * - a ring array of n + 1 `Message` elements
+ * - indexes H and T denoting the head and the tail elements of the queue
+ * respectively
+ *
+ * Each `Message` element contains a boolean flag. This flag is an auxiliary
+ * communication between the producers and the consumer. The flag shows
+ * whether or not the element is ready to be consumed (peeked at, removed). The
+ * flag is required since updating an element involves many fields and thus is
+ * not an atomic action. An addition to the queue happens in two steps:
+ *
+ * # Step 1
+ *
+ * Producers race with each other to secure an index for the element they add.
+ * T is atomically advanced [1] only if the advanced value doesn't equal to H
+ * (a producer doesn't bump into the head of the queue).
+ *
+ * # Step 2
+ *
+ * Once T is advanced in the previous step, the producer updates the message
+ * fields of the element at the previous value of T and then sets the flag of
+ * this element.
+ *
+ * A removal happens in a single step. The consumer gets the element at index H.
+ * If the flag of this element is set, the consumer clears the fields of the
+ * element, clears the flag and finally advances H.
+ *
+ * ----------------------------------------------------------------------------
+ * [1] To advance the index is to change it from i to (i + 1) % (n + 1).
+ */
+public class MessageQueue {
+
+ private final static boolean DEBUG = false;
+
+ @Stable
+ private final Message[] elements;
+
+ private final AtomicInteger tail = new AtomicInteger();
+ private volatile int head;
+
+ public MessageQueue() {
+ this(defaultSize());
+ }
+
+ /* Exposed for testing */
+ protected MessageQueue(int size) {
+ if (size < 1) {
+ throw new IllegalArgumentException();
+ }
+ Message[] array = new Message[size + 1];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = new Message();
+ }
+ elements = array;
+ }
+
+ private static int defaultSize() {
+ String property = "jdk.httpclient.websocket.outputQueueMaxSize";
+ int defaultSize = 128;
+ String value = Utils.getNetProperty(property);
+ int size;
+ if (value == null) {
+ size = defaultSize;
+ } else {
+ try {
+ size = Integer.parseUnsignedInt(value);
+ } catch (NumberFormatException ignored) {
+ size = defaultSize;
+ }
+ }
+ if (DEBUG) {
+ System.out.printf("[MessageQueue] %s=%s, using size %s%n",
+ property, value, size);
+ }
+ return size;
+ }
+
+ public <T> void addText(CharBuffer message,
+ boolean isLast,
+ T attachment,
+ BiConsumer<? super T, ? super Throwable> action,
+ CompletableFuture<T> future)
+ throws IOException
+ {
+ add(MessageQueue.Type.TEXT, null, message, isLast, -1, attachment,
+ action, future);
+ }
+
+ private <T> void add(Type type,
+ ByteBuffer binary,
+ CharBuffer text,
+ boolean isLast,
+ int statusCode,
+ T attachment,
+ BiConsumer<? super T, ? super Throwable> action,
+ CompletableFuture<? super T> future)
+ throws IOException
+ {
+ int h, currentTail, newTail;
+ do {
+ h = head;
+ currentTail = tail.get();
+ newTail = (currentTail + 1) % elements.length;
+ if (newTail == h) {
+ throw new IOException("Queue full");
+ }
+ } while (!tail.compareAndSet(currentTail, newTail));
+ Message t = elements[currentTail];
+ if (t.ready) {
+ throw new InternalError();
+ }
+ t.type = type;
+ t.binary = binary;
+ t.text = text;
+ t.isLast = isLast;
+ t.statusCode = statusCode;
+ t.attachment = attachment;
+ t.action = action;
+ t.future = future;
+ t.ready = true;
+ }
+
+ public <T> void addBinary(ByteBuffer message,
+ boolean isLast,
+ T attachment,
+ BiConsumer<? super T, ? super Throwable> action,
+ CompletableFuture<? super T> future)
+ throws IOException
+ {
+ add(MessageQueue.Type.BINARY, message, null, isLast, -1, attachment,
+ action, future);
+ }
+
+ public <T> void addPing(ByteBuffer message,
+ T attachment,
+ BiConsumer<? super T, ? super Throwable> action,
+ CompletableFuture<? super T> future)
+ throws IOException
+ {
+ add(MessageQueue.Type.PING, message, null, false, -1, attachment,
+ action, future);
+ }
+
+ public <T> void addPong(ByteBuffer message,
+ T attachment,
+ BiConsumer<? super T, ? super Throwable> action,
+ CompletableFuture<? super T> future)
+ throws IOException
+ {
+ add(MessageQueue.Type.PONG, message, null, false, -1, attachment,
+ action, future);
+ }
+
+ public <T> void addClose(int statusCode,
+ CharBuffer reason,
+ T attachment,
+ BiConsumer<? super T, ? super Throwable> action,
+ CompletableFuture<? super T> future)
+ throws IOException
+ {
+ add(MessageQueue.Type.CLOSE, null, reason, false, statusCode,
+ attachment, action, future);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <R, E extends Throwable> R peek(QueueCallback<R, E> callback)
+ throws E
+ {
+ Message h = elements[head];
+ if (!h.ready) {
+ return callback.onEmpty();
+ }
+ Type type = h.type;
+ switch (type) {
+ case TEXT:
+ try {
+ return (R) callback.onText(h.text, h.isLast, h.attachment,
+ h.action, h.future);
+ } catch (Throwable t) {
+ // Something unpleasant is going on here with the compiler.
+ // If this seemingly useless catch is omitted, the compiler
+ // reports an error:
+ //
+ // java: unreported exception java.lang.Throwable;
+ // must be caught or declared to be thrown
+ //
+ // My guess is there is a problem with both the type
+ // inference for the method AND @SuppressWarnings("unchecked")
+ // being working at the same time.
+ throw (E) t;
+ }
+ case BINARY:
+ try {
+ return (R) callback.onBinary(h.binary, h.isLast, h.attachment,
+ h.action, h.future);
+ } catch (Throwable t) {
+ throw (E) t;
+ }
+ case PING:
+ try {
+ return (R) callback.onPing(h.binary, h.attachment, h.action,
+ h.future);
+ } catch (Throwable t) {
+ throw (E) t;
+ }
+ case PONG:
+ try {
+ return (R) callback.onPong(h.binary, h.attachment, h.action,
+ h.future);
+ } catch (Throwable t) {
+ throw (E) t;
+ }
+ case CLOSE:
+ try {
+ return (R) callback.onClose(h.statusCode, h.text, h.attachment,
+ h.action, h.future);
+ } catch (Throwable t) {
+ throw (E) t;
+ }
+ default:
+ throw new InternalError(String.valueOf(type));
+ }
+ }
+
+ public boolean isEmpty() {
+ return !elements[head].ready;
+ }
+
+ public void remove() {
+ int currentHead = head;
+ Message h = elements[currentHead];
+ if (!h.ready) {
+ throw new InternalError("Queue empty");
+ }
+ h.type = null;
+ h.binary = null;
+ h.text = null;
+ h.attachment = null;
+ h.action = null;
+ h.future = null;
+ h.ready = false;
+ head = (currentHead + 1) % elements.length;
+ }
+
+ private enum Type {
+
+ TEXT,
+ BINARY,
+ PING,
+ PONG,
+ CLOSE
+ }
+
+ /*
+ * A callback for consuming a queue element's fields. Can return a result of
+ * type T or throw an exception of type E. This design allows to avoid
+ * "returning" results or "throwing" errors by updating some objects from
+ * the outside of the methods.
+ */
+ public interface QueueCallback<R, E extends Throwable> {
+
+ <T> R onText(CharBuffer message,
+ boolean isLast,
+ T attachment,
+ BiConsumer<? super T, ? super Throwable> action,
+ CompletableFuture<? super T> future) throws E;
+
+ <T> R onBinary(ByteBuffer message,
+ boolean isLast,
+ T attachment,
+ BiConsumer<? super T, ? super Throwable> action,
+ CompletableFuture<? super T> future) throws E;
+
+ <T> R onPing(ByteBuffer message,
+ T attachment,
+ BiConsumer<? super T, ? super Throwable> action,
+ CompletableFuture<? super T> future) throws E;
+
+ <T> R onPong(ByteBuffer message,
+ T attachment,
+ BiConsumer<? super T, ? super Throwable> action,
+ CompletableFuture<? super T> future) throws E;
+
+ <T> R onClose(int statusCode,
+ CharBuffer reason,
+ T attachment,
+ BiConsumer<? super T, ? super Throwable> action,
+ CompletableFuture<? super T> future) throws E;
+
+ /* The queue is empty*/
+ R onEmpty() throws E;
+ }
+
+ /*
+ * A union of components of all WebSocket message types; also a node in a
+ * queue.
+ *
+ * A `Message` never leaves the context of the queue, thus the reference to
+ * it cannot be retained by anyone other than the queue.
+ */
+ private static class Message {
+
+ private volatile boolean ready;
+
+ // -- The source message fields --
+
+ private Type type;
+ private ByteBuffer binary;
+ private CharBuffer text;
+ private boolean isLast;
+ private int statusCode;
+ private Object attachment;
+ @SuppressWarnings("rawtypes")
+ private BiConsumer action;
+ @SuppressWarnings("rawtypes")
+ private CompletableFuture future;
+ }
+}