src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java
author chegar
Wed, 07 Feb 2018 21:45:37 +0000
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56089 src/java.net.http/share/classes/java/net/http/internal/websocket/TransportImpl.java@42208b2f224e
child 56263 4933a477d628
permissions -rw-r--r--
http-client-branch: move implementation to jdk.internal.net.http
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
55988
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
     1
/*
56024
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
     2
 * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
55988
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
     4
 *
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
     7
 * published by the Free Software Foundation.  Oracle designates this
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
     8
 * particular file as subject to the "Classpath" exception as provided
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
     9
 * by Oracle in the LICENSE file that accompanied this code.
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    10
 *
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    11
 * This code is distributed in the hope that it will be useful, but WITHOUT
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    12
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    13
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    14
 * version 2 for more details (a copy is included in the LICENSE file that
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    15
 * accompanied this code).
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    16
 *
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    17
 * You should have received a copy of the GNU General Public License version
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    18
 * 2 along with this work; if not, write to the Free Software Foundation,
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    19
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    20
 *
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    21
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    22
 * or visit www.oracle.com if you need additional information or have any
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    23
 * questions.
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    24
 */
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    25
56092
fd85b2bf2b0d http-client-branch: move implementation to jdk.internal.net.http
chegar
parents: 56089
diff changeset
    26
package jdk.internal.net.http.websocket;
55988
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    27
56092
fd85b2bf2b0d http-client-branch: move implementation to jdk.internal.net.http
chegar
parents: 56089
diff changeset
    28
import jdk.internal.net.http.common.Demand;
fd85b2bf2b0d http-client-branch: move implementation to jdk.internal.net.http
chegar
parents: 56089
diff changeset
    29
import jdk.internal.net.http.common.MinimalFuture;
fd85b2bf2b0d http-client-branch: move implementation to jdk.internal.net.http
chegar
parents: 56089
diff changeset
    30
import jdk.internal.net.http.common.Pair;
fd85b2bf2b0d http-client-branch: move implementation to jdk.internal.net.http
chegar
parents: 56089
diff changeset
    31
import jdk.internal.net.http.common.SequentialScheduler;
55988
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    32
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    33
import java.io.IOException;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    34
import java.nio.ByteBuffer;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    35
import java.nio.channels.SelectionKey;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    36
import java.util.Queue;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    37
import java.util.concurrent.CompletableFuture;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    38
import java.util.concurrent.ConcurrentLinkedQueue;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    39
import java.util.concurrent.atomic.AtomicBoolean;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    40
import java.util.function.Consumer;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    41
import java.util.function.Supplier;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    42
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    43
import static java.util.Objects.requireNonNull;
56092
fd85b2bf2b0d http-client-branch: move implementation to jdk.internal.net.http
chegar
parents: 56089
diff changeset
    44
import static jdk.internal.net.http.common.MinimalFuture.failedFuture;
fd85b2bf2b0d http-client-branch: move implementation to jdk.internal.net.http
chegar
parents: 56089
diff changeset
    45
import static jdk.internal.net.http.common.Pair.pair;
55988
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    46
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    47
public class TransportImpl<T> implements Transport<T> {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    48
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    49
    /* This flag is used solely for assertions */
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    50
    private final AtomicBoolean busy = new AtomicBoolean();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    51
    private OutgoingMessage message;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    52
    private Consumer<Exception> completionHandler;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    53
    private final RawChannel channel;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    54
    private final RawChannel.RawEvent writeEvent = createWriteEvent();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    55
    private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask());
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    56
    private final Queue<Pair<OutgoingMessage, CompletableFuture<T>>>
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    57
            queue = new ConcurrentLinkedQueue<>();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    58
    private final OutgoingMessage.Context context = new OutgoingMessage.Context();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    59
    private final Supplier<T> resultSupplier;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    60
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    61
    private final MessageStreamConsumer messageConsumer;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    62
    private final FrameConsumer frameConsumer;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    63
    private final Frame.Reader reader = new Frame.Reader();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    64
    private final RawChannel.RawEvent readEvent = createReadEvent();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    65
    private final Demand demand = new Demand();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    66
    private final SequentialScheduler receiveScheduler;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    67
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    68
    private ByteBuffer data;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    69
    private volatile int state;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    70
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    71
    private static final int UNREGISTERED = 0;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    72
    private static final int AVAILABLE    = 1;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    73
    private static final int WAITING      = 2;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    74
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    75
    private final Object lock = new Object();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    76
    private boolean inputClosed;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    77
    private boolean outputClosed;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    78
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    79
    public TransportImpl(Supplier<T> sendResultSupplier,
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    80
                         MessageStreamConsumer consumer,
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    81
                         RawChannel channel) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    82
        this.resultSupplier = sendResultSupplier;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    83
        this.messageConsumer = consumer;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    84
        this.channel = channel;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    85
        this.frameConsumer = new FrameConsumer(this.messageConsumer);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    86
        this.data = channel.initialByteBuffer();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    87
        // To ensure the initial non-final `data` will be visible
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    88
        // (happens-before) when `readEvent.handle()` invokes `receiveScheduler`
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    89
        // the following assignment is done last:
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    90
        receiveScheduler = new SequentialScheduler(new ReceiveTask());
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    91
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    92
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    93
    /**
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    94
     * The supplied handler may be invoked in the calling thread.
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    95
     * A {@code StackOverflowError} may thus occur if there's a possibility
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    96
     * that this method is called again by the supplied handler.
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    97
     */
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    98
    public void send(OutgoingMessage message,
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
    99
                     Consumer<Exception> completionHandler) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   100
        requireNonNull(message);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   101
        requireNonNull(completionHandler);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   102
        if (!busy.compareAndSet(false, true)) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   103
            throw new IllegalStateException();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   104
        }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   105
        send0(message, completionHandler);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   106
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   107
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   108
    private RawChannel.RawEvent createWriteEvent() {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   109
        return new RawChannel.RawEvent() {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   110
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   111
            @Override
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   112
            public int interestOps() {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   113
                return SelectionKey.OP_WRITE;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   114
            }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   115
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   116
            @Override
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   117
            public void handle() {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   118
                // registerEvent(e) happens-before subsequent e.handle(), so
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   119
                // we're fine reading the stored message and the completionHandler
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   120
                send0(message, completionHandler);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   121
            }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   122
        };
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   123
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   124
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   125
    private void send0(OutgoingMessage message, Consumer<Exception> handler) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   126
        boolean b = busy.get();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   127
        assert b; // Please don't inline this, as busy.get() has memory
55989
76ac25076fdc http-client-branch: (WebSocket) refactoring
prappo
parents: 55988
diff changeset
   128
                  // visibility effects and we don't want the program behaviour
76ac25076fdc http-client-branch: (WebSocket) refactoring
prappo
parents: 55988
diff changeset
   129
                  // to depend on whether the assertions are turned on
76ac25076fdc http-client-branch: (WebSocket) refactoring
prappo
parents: 55988
diff changeset
   130
                  // or turned off
55988
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   131
        try {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   132
            boolean sent = message.sendTo(channel);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   133
            if (sent) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   134
                busy.set(false);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   135
                handler.accept(null);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   136
            } else {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   137
                // The message has not been fully sent, the transmitter needs to
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   138
                // remember the message until it can continue with sending it
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   139
                this.message = message;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   140
                this.completionHandler = handler;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   141
                try {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   142
                    channel.registerEvent(writeEvent);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   143
                } catch (IOException e) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   144
                    this.message = null;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   145
                    this.completionHandler = null;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   146
                    busy.set(false);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   147
                    handler.accept(e);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   148
                }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   149
            }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   150
        } catch (IOException e) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   151
            busy.set(false);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   152
            handler.accept(e);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   153
        }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   154
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   155
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   156
    public CompletableFuture<T> sendText(CharSequence message,
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   157
                                         boolean isLast) {
56024
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   158
        OutgoingMessage.Text m;
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   159
        try {
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   160
            m = new OutgoingMessage.Text(message, isLast);
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   161
        } catch (IllegalArgumentException e) {
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   162
            return failedFuture(e);
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   163
        }
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   164
        return enqueue(m);
55988
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   165
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   166
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   167
    public CompletableFuture<T> sendBinary(ByteBuffer message,
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   168
                                           boolean isLast) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   169
        return enqueue(new OutgoingMessage.Binary(message, isLast));
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   170
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   171
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   172
    public CompletableFuture<T> sendPing(ByteBuffer message) {
56024
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   173
        OutgoingMessage.Ping m;
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   174
        try {
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   175
            m = new OutgoingMessage.Ping(message);
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   176
        } catch (IllegalArgumentException e) {
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   177
            return failedFuture(e);
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   178
        }
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   179
        return enqueue(m);
55988
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   180
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   181
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   182
    public CompletableFuture<T> sendPong(ByteBuffer message) {
56024
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   183
        OutgoingMessage.Pong m;
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   184
        try {
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   185
            m = new OutgoingMessage.Pong(message);
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   186
        } catch (IllegalArgumentException e) {
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   187
            return failedFuture(e);
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   188
        }
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   189
        return enqueue(m);
55988
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   190
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   191
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   192
    public CompletableFuture<T> sendClose(int statusCode, String reason) {
56024
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   193
        OutgoingMessage.Close m;
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   194
        try {
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   195
            m = new OutgoingMessage.Close(statusCode, reason);
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   196
        } catch (IllegalArgumentException e) {
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   197
            return failedFuture(e);
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   198
        }
de352132c7e8 http-client-branch: (WebSocket) a number of tests for exceptional completion
prappo
parents: 55989
diff changeset
   199
        return enqueue(m);
55988
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   200
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   201
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   202
    private CompletableFuture<T> enqueue(OutgoingMessage m) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   203
        CompletableFuture<T> cf = new MinimalFuture<>();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   204
        boolean added = queue.add(pair(m, cf));
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   205
        if (!added) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   206
            // The queue is supposed to be unbounded
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   207
            throw new InternalError();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   208
        }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   209
        sendScheduler.runOrSchedule();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   210
        return cf;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   211
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   212
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   213
    /*
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   214
     * This is a message sending task. It pulls messages from the queue one by
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   215
     * one and sends them. It may be run in different threads, but never
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   216
     * concurrently.
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   217
     */
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   218
    private class SendTask implements SequentialScheduler.RestartableTask {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   219
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   220
        @Override
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   221
        public void run(SequentialScheduler.DeferredCompleter taskCompleter) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   222
            Pair<OutgoingMessage, CompletableFuture<T>> p = queue.poll();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   223
            if (p == null) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   224
                taskCompleter.complete();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   225
                return;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   226
            }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   227
            OutgoingMessage message = p.first;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   228
            CompletableFuture<T> cf = p.second;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   229
            try {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   230
                if (!message.contextualize(context)) { // Do not send the message
55989
76ac25076fdc http-client-branch: (WebSocket) refactoring
prappo
parents: 55988
diff changeset
   231
                    cf.complete(resultSupplier.get());
55988
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   232
                    repeat(taskCompleter);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   233
                    return;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   234
                }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   235
                Consumer<Exception> h = e -> {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   236
                    if (e == null) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   237
                        cf.complete(resultSupplier.get());
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   238
                    } else {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   239
                        cf.completeExceptionally(e);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   240
                    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   241
                    repeat(taskCompleter);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   242
                };
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   243
                send(message, h);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   244
            } catch (Throwable t) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   245
                cf.completeExceptionally(t);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   246
                repeat(taskCompleter);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   247
            }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   248
        }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   249
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   250
        private void repeat(SequentialScheduler.DeferredCompleter taskCompleter) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   251
            taskCompleter.complete();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   252
            // More than a single message may have been enqueued while
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   253
            // the task has been busy with the current message, but
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   254
            // there is only a single signal recorded
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   255
            sendScheduler.runOrSchedule();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   256
        }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   257
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   258
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   259
    private RawChannel.RawEvent createReadEvent() {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   260
        return new RawChannel.RawEvent() {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   261
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   262
            @Override
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   263
            public int interestOps() {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   264
                return SelectionKey.OP_READ;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   265
            }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   266
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   267
            @Override
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   268
            public void handle() {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   269
                state = AVAILABLE;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   270
                receiveScheduler.runOrSchedule();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   271
            }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   272
        };
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   273
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   274
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   275
    @Override
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   276
    public void request(long n) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   277
        if (demand.increase(n)) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   278
            receiveScheduler.runOrSchedule();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   279
        }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   280
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   281
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   282
    @Override
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   283
    public void acknowledgeReception() {
55989
76ac25076fdc http-client-branch: (WebSocket) refactoring
prappo
parents: 55988
diff changeset
   284
        boolean decremented = demand.tryDecrement();
76ac25076fdc http-client-branch: (WebSocket) refactoring
prappo
parents: 55988
diff changeset
   285
        if (!decremented) {
76ac25076fdc http-client-branch: (WebSocket) refactoring
prappo
parents: 55988
diff changeset
   286
            throw new InternalError();
55988
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   287
        }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   288
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   289
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   290
    private class ReceiveTask extends SequentialScheduler.CompleteRestartableTask {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   291
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   292
        @Override
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   293
        public void run() {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   294
            while (!receiveScheduler.isStopped()) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   295
                if (data.hasRemaining()) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   296
                    if (!demand.isFulfilled()) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   297
                        try {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   298
                            int oldPos = data.position();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   299
                            reader.readFrame(data, frameConsumer);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   300
                            int newPos = data.position();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   301
                            assert oldPos != newPos : data; // reader always consumes bytes
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   302
                        } catch (Throwable e) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   303
                            receiveScheduler.stop();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   304
                            messageConsumer.onError(e);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   305
                        }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   306
                        continue;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   307
                    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   308
                    break;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   309
                }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   310
                switch (state) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   311
                    case WAITING:
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   312
                        return;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   313
                    case UNREGISTERED:
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   314
                        try {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   315
                            state = WAITING;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   316
                            channel.registerEvent(readEvent);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   317
                        } catch (Throwable e) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   318
                            receiveScheduler.stop();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   319
                            messageConsumer.onError(e);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   320
                        }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   321
                        return;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   322
                    case AVAILABLE:
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   323
                        try {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   324
                            data = channel.read();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   325
                        } catch (Throwable e) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   326
                            receiveScheduler.stop();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   327
                            messageConsumer.onError(e);
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   328
                            return;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   329
                        }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   330
                        if (data == null) { // EOF
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   331
                            receiveScheduler.stop();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   332
                            messageConsumer.onComplete();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   333
                            return;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   334
                        } else if (!data.hasRemaining()) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   335
                            // No data at the moment Pretty much a "goto",
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   336
                            // reusing the existing code path for registration
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   337
                            state = UNREGISTERED;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   338
                        }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   339
                        continue;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   340
                    default:
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   341
                        throw new InternalError(String.valueOf(state));
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   342
                }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   343
            }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   344
        }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   345
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   346
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   347
    /*
55989
76ac25076fdc http-client-branch: (WebSocket) refactoring
prappo
parents: 55988
diff changeset
   348
     * Permanently stops reading from the channel and delivering messages
55988
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   349
     * regardless of the current demand and data availability.
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   350
     */
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   351
    @Override
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   352
    public void closeInput() throws IOException {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   353
        synchronized (lock) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   354
            if (!inputClosed) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   355
                inputClosed = true;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   356
                try {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   357
                    receiveScheduler.stop();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   358
                    channel.shutdownInput();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   359
                } finally {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   360
                    if (outputClosed) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   361
                        channel.close();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   362
                    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   363
                }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   364
            }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   365
        }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   366
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   367
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   368
    @Override
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   369
    public void closeOutput() throws IOException {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   370
        synchronized (lock) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   371
            if (!outputClosed) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   372
                outputClosed = true;
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   373
                try {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   374
                    channel.shutdownOutput();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   375
                } finally {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   376
                    if (inputClosed) {
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   377
                        channel.close();
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   378
                    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   379
                }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   380
            }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   381
        }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   382
    }
7f1e0cf933a6 http-client-branch: (WebSocket) refactoring for the sake of extra test coverage
prappo
parents:
diff changeset
   383
}