src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageQueue.java
author prappo
Tue, 13 Mar 2018 17:12:16 +0000
branchhttp-client-branch
changeset 56295 898dfb226bd0
parent 56291 c8c4c707ff3a
child 56303 a82058c084ef
permissions -rw-r--r--
http-client-branch: (WebSocket) DEBUG logging off
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
56263
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
     1
/*
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
     2
 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
     4
 *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
     7
 * published by the Free Software Foundation.  Oracle designates this
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
     8
 * particular file as subject to the "Classpath" exception as provided
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
     9
 * by Oracle in the LICENSE file that accompanied this code.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    10
 *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    11
 * This code is distributed in the hope that it will be useful, but WITHOUT
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    12
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    13
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    14
 * version 2 for more details (a copy is included in the LICENSE file that
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    15
 * accompanied this code).
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    16
 *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    17
 * You should have received a copy of the GNU General Public License version
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    18
 * 2 along with this work; if not, write to the Free Software Foundation,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    19
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    20
 *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    21
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    22
 * or visit www.oracle.com if you need additional information or have any
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    23
 * questions.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    24
 */
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    25
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    26
package jdk.internal.net.http.websocket;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    27
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    28
import jdk.internal.net.http.common.Utils;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    29
import jdk.internal.vm.annotation.Stable;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    30
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    31
import java.io.IOException;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    32
import java.nio.ByteBuffer;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    33
import java.nio.CharBuffer;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    34
import java.util.concurrent.CompletableFuture;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    35
import java.util.concurrent.atomic.AtomicInteger;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    36
import java.util.function.BiConsumer;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    37
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    38
/*
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    39
 * A FIFO message storage facility.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    40
 *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    41
 * The queue supports at most one consumer and an arbitrary number of producers.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    42
 * Methods `peek`, `remove` and `isEmpty` must not be invoked concurrently.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    43
 * Methods `addText`, `addBinary`, `addPing`, `addPong` and `addClose` may be
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    44
 * invoked concurrently.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    45
 *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    46
 * This queue is of a bounded size. The queue pre-allocates array of the said
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    47
 * size and fills it with `Message` elements. The resulting structure never
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    48
 * changes. This allows to avoid re-allocation and garbage collection of
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    49
 * elements and arrays thereof. For this reason `Message` elements are never
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    50
 * returned from the `peek` method. Instead their components passed to the
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    51
 * provided callback.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    52
 *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    53
 * The queue consists of:
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    54
 *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    55
 *   - a ring array of n + 1 `Message` elements
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    56
 *   - indexes H and T denoting the head and the tail elements of the queue
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    57
 *     respectively
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    58
 *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    59
 * Each `Message` element contains a boolean flag. This flag is an auxiliary
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    60
 * communication between the producers and the consumer. The flag shows
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    61
 * whether or not the element is ready to be consumed (peeked at, removed). The
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    62
 * flag is required since updating an element involves many fields and thus is
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    63
 * not an atomic action. An addition to the queue happens in two steps:
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    64
 *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    65
 * # Step 1
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    66
 *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    67
 * Producers race with each other to secure an index for the element they add.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    68
 * T is atomically advanced [1] only if the advanced value doesn't equal to H
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    69
 * (a producer doesn't bump into the head of the queue).
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    70
 *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    71
 * # Step 2
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    72
 *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    73
 * Once T is advanced in the previous step, the producer updates the message
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    74
 * fields of the element at the previous value of T and then sets the flag of
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    75
 * this element.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    76
 *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    77
 * A removal happens in a single step. The consumer gets the element at index H.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    78
 * If the flag of this element is set, the consumer clears the fields of the
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    79
 * element, clears the flag and finally advances H.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    80
 *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    81
 * ----------------------------------------------------------------------------
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    82
 * [1] To advance the index is to change it from i to (i + 1) % (n + 1).
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    83
 */
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    84
public class MessageQueue {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    85
56295
898dfb226bd0 http-client-branch: (WebSocket) DEBUG logging off
prappo
parents: 56291
diff changeset
    86
    private final static boolean DEBUG = false;
56263
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    87
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    88
    @Stable
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    89
    private final Message[] elements;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    90
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    91
    private final AtomicInteger tail = new AtomicInteger();
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    92
    private volatile int head;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    93
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    94
    public MessageQueue() {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    95
        this(defaultSize());
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    96
    }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    97
56290
e178d19ff91c http-client-branch: (WebSocket) minor cleanup
prappo
parents: 56263
diff changeset
    98
    /* Exposed for testing purposes */
56263
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
    99
    protected MessageQueue(int size) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   100
        if (size < 1) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   101
            throw new IllegalArgumentException();
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   102
        }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   103
        Message[] array = new Message[size + 1];
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   104
        for (int i = 0; i < array.length; i++) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   105
            array[i] = new Message();
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   106
        }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   107
        elements = array;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   108
    }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   109
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   110
    private static int defaultSize() {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   111
        String property = "jdk.httpclient.websocket.outputQueueMaxSize";
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   112
        int defaultSize = 128;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   113
        String value = Utils.getNetProperty(property);
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   114
        int size;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   115
        if (value == null) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   116
            size = defaultSize;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   117
        } else {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   118
            try {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   119
                size = Integer.parseUnsignedInt(value);
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   120
            } catch (NumberFormatException ignored) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   121
                size = defaultSize;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   122
            }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   123
        }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   124
        if (DEBUG) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   125
            System.out.printf("[MessageQueue] %s=%s, using size %s%n",
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   126
                              property, value, size);
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   127
        }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   128
        return size;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   129
    }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   130
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   131
    public <T> void addText(CharBuffer message,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   132
                            boolean isLast,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   133
                            T attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   134
                            BiConsumer<? super T, ? super Throwable> action,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   135
                            CompletableFuture<T> future)
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   136
            throws IOException
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   137
    {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   138
        add(MessageQueue.Type.TEXT, null, message, isLast, -1, attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   139
            action, future);
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   140
    }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   141
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   142
    private <T> void add(Type type,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   143
                         ByteBuffer binary,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   144
                         CharBuffer text,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   145
                         boolean isLast,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   146
                         int statusCode,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   147
                         T attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   148
                         BiConsumer<? super T, ? super Throwable> action,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   149
                         CompletableFuture<? super T> future)
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   150
            throws IOException
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   151
    {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   152
        int h, currentTail, newTail;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   153
        do {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   154
            h = head;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   155
            currentTail = tail.get();
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   156
            newTail = (currentTail + 1) % elements.length;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   157
            if (newTail == h) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   158
                throw new IOException("Queue full");
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   159
            }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   160
        } while (!tail.compareAndSet(currentTail, newTail));
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   161
        Message t = elements[currentTail];
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   162
        if (t.ready) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   163
            throw new InternalError();
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   164
        }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   165
        t.type = type;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   166
        t.binary = binary;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   167
        t.text = text;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   168
        t.isLast = isLast;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   169
        t.statusCode = statusCode;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   170
        t.attachment = attachment;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   171
        t.action = action;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   172
        t.future = future;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   173
        t.ready = true;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   174
    }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   175
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   176
    public <T> void addBinary(ByteBuffer message,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   177
                              boolean isLast,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   178
                              T attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   179
                              BiConsumer<? super T, ? super Throwable> action,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   180
                              CompletableFuture<? super T> future)
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   181
            throws IOException
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   182
    {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   183
        add(MessageQueue.Type.BINARY, message, null, isLast, -1, attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   184
            action, future);
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   185
    }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   186
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   187
    public <T> void addPing(ByteBuffer message,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   188
                            T attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   189
                            BiConsumer<? super T, ? super Throwable> action,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   190
                            CompletableFuture<? super T> future)
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   191
            throws IOException
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   192
    {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   193
        add(MessageQueue.Type.PING, message, null, false, -1, attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   194
            action, future);
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   195
    }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   196
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   197
    public <T> void addPong(ByteBuffer message,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   198
                            T attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   199
                            BiConsumer<? super T, ? super Throwable> action,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   200
                            CompletableFuture<? super T> future)
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   201
            throws IOException
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   202
    {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   203
        add(MessageQueue.Type.PONG, message, null, false, -1, attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   204
            action, future);
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   205
    }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   206
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   207
    public <T> void addClose(int statusCode,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   208
                             CharBuffer reason,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   209
                             T attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   210
                             BiConsumer<? super T, ? super Throwable> action,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   211
                             CompletableFuture<? super T> future)
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   212
            throws IOException
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   213
    {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   214
        add(MessageQueue.Type.CLOSE, null, reason, false, statusCode,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   215
            attachment, action, future);
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   216
    }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   217
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   218
    @SuppressWarnings("unchecked")
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   219
    public <R, E extends Throwable> R peek(QueueCallback<R, E> callback)
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   220
            throws E
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   221
    {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   222
        Message h = elements[head];
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   223
        if (!h.ready) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   224
            return callback.onEmpty();
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   225
        }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   226
        Type type = h.type;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   227
        switch (type) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   228
            case TEXT:
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   229
                try {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   230
                    return (R) callback.onText(h.text, h.isLast, h.attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   231
                                               h.action, h.future);
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   232
                } catch (Throwable t) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   233
                    // Something unpleasant is going on here with the compiler.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   234
                    // If this seemingly useless catch is omitted, the compiler
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   235
                    // reports an error:
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   236
                    //
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   237
                    //   java: unreported exception java.lang.Throwable;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   238
                    //   must be caught or declared to be thrown
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   239
                    //
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   240
                    // My guess is there is a problem with both the type
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   241
                    // inference for the method AND @SuppressWarnings("unchecked")
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   242
                    // being working at the same time.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   243
                    throw (E) t;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   244
                }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   245
            case BINARY:
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   246
                try {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   247
                    return (R) callback.onBinary(h.binary, h.isLast, h.attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   248
                                                 h.action, h.future);
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   249
                } catch (Throwable t) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   250
                    throw (E) t;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   251
                }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   252
            case PING:
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   253
                try {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   254
                    return (R) callback.onPing(h.binary, h.attachment, h.action,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   255
                                               h.future);
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   256
                } catch (Throwable t) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   257
                    throw (E) t;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   258
                }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   259
            case PONG:
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   260
                try {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   261
                    return (R) callback.onPong(h.binary, h.attachment, h.action,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   262
                                               h.future);
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   263
                } catch (Throwable t) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   264
                    throw (E) t;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   265
                }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   266
            case CLOSE:
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   267
                try {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   268
                    return (R) callback.onClose(h.statusCode, h.text, h.attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   269
                                                h.action, h.future);
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   270
                } catch (Throwable t) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   271
                    throw (E) t;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   272
                }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   273
            default:
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   274
                throw new InternalError(String.valueOf(type));
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   275
        }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   276
    }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   277
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   278
    public boolean isEmpty() {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   279
        return !elements[head].ready;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   280
    }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   281
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   282
    public void remove() {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   283
        int currentHead = head;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   284
        Message h = elements[currentHead];
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   285
        if (!h.ready) {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   286
            throw new InternalError("Queue empty");
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   287
        }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   288
        h.type = null;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   289
        h.binary = null;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   290
        h.text = null;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   291
        h.attachment = null;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   292
        h.action = null;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   293
        h.future = null;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   294
        h.ready = false;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   295
        head = (currentHead + 1) % elements.length;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   296
    }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   297
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   298
    private enum Type {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   299
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   300
        TEXT,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   301
        BINARY,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   302
        PING,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   303
        PONG,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   304
        CLOSE
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   305
    }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   306
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   307
    /*
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   308
     * A callback for consuming a queue element's fields. Can return a result of
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   309
     * type T or throw an exception of type E. This design allows to avoid
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   310
     * "returning" results or "throwing" errors by updating some objects from
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   311
     * the outside of the methods.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   312
     */
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   313
    public interface QueueCallback<R, E extends Throwable> {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   314
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   315
        <T> R onText(CharBuffer message,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   316
                     boolean isLast,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   317
                     T attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   318
                     BiConsumer<? super T, ? super Throwable> action,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   319
                     CompletableFuture<? super T> future) throws E;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   320
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   321
        <T> R onBinary(ByteBuffer message,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   322
                       boolean isLast,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   323
                       T attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   324
                       BiConsumer<? super T, ? super Throwable> action,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   325
                       CompletableFuture<? super T> future) throws E;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   326
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   327
        <T> R onPing(ByteBuffer message,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   328
                     T attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   329
                     BiConsumer<? super T, ? super Throwable> action,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   330
                     CompletableFuture<? super T> future) throws E;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   331
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   332
        <T> R onPong(ByteBuffer message,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   333
                     T attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   334
                     BiConsumer<? super T, ? super Throwable> action,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   335
                     CompletableFuture<? super T> future) throws E;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   336
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   337
        <T> R onClose(int statusCode,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   338
                      CharBuffer reason,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   339
                      T attachment,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   340
                      BiConsumer<? super T, ? super Throwable> action,
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   341
                      CompletableFuture<? super T> future) throws E;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   342
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   343
        /* The queue is empty*/
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   344
        R onEmpty() throws E;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   345
    }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   346
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   347
    /*
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   348
     * A union of components of all WebSocket message types; also a node in a
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   349
     * queue.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   350
     *
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   351
     * A `Message` never leaves the context of the queue, thus the reference to
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   352
     * it cannot be retained by anyone other than the queue.
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   353
     */
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   354
    private static class Message {
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   355
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   356
        private volatile boolean ready;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   357
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   358
        // -- The source message fields --
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   359
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   360
        private Type type;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   361
        private ByteBuffer binary;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   362
        private CharBuffer text;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   363
        private boolean isLast;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   364
        private int statusCode;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   365
        private Object attachment;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   366
        @SuppressWarnings("rawtypes")
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   367
        private BiConsumer action;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   368
        @SuppressWarnings("rawtypes")
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   369
        private CompletableFuture future;
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   370
    }
4933a477d628 http-client-branch: (WebSocket) impl change
prappo
parents:
diff changeset
   371
}