src/java.net.http/share/classes/jdk/internal/net/http/RawChannelTube.java
author chegar
Fri, 25 May 2018 16:13:11 +0100
branchhttp-client-branch
changeset 56619 57f17e890a40
parent 56451 9585061fdb04
permissions -rw-r--r--
http-client-branch: Make HttpHeaders final
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
49765
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     1
/*
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     2
 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     4
 *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     7
 * published by the Free Software Foundation.  Oracle designates this
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     8
 * particular file as subject to the "Classpath" exception as provided
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
     9
 * by Oracle in the LICENSE file that accompanied this code.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    10
 *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    11
 * This code is distributed in the hope that it will be useful, but WITHOUT
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    12
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    13
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    14
 * version 2 for more details (a copy is included in the LICENSE file that
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    15
 * accompanied this code).
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    16
 *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    17
 * You should have received a copy of the GNU General Public License version
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    18
 * 2 along with this work; if not, write to the Free Software Foundation,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    19
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    20
 *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    21
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    22
 * or visit www.oracle.com if you need additional information or have any
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    23
 * questions.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    24
 */
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    25
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    26
package jdk.internal.net.http;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    27
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    28
import jdk.internal.net.http.common.Demand;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    29
import jdk.internal.net.http.common.FlowTube;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    30
import jdk.internal.net.http.common.Logger;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    31
import jdk.internal.net.http.common.Utils;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    32
import jdk.internal.net.http.websocket.RawChannel;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    33
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    34
import java.io.EOFException;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    35
import java.io.IOException;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    36
import java.lang.ref.Cleaner;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    37
import java.nio.ByteBuffer;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    38
import java.nio.channels.ClosedChannelException;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    39
import java.nio.channels.SelectionKey;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    40
import java.util.ArrayList;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    41
import java.util.List;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    42
import java.util.concurrent.ConcurrentLinkedQueue;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    43
import java.util.concurrent.Flow;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    44
import java.util.concurrent.atomic.AtomicBoolean;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    45
import java.util.concurrent.atomic.AtomicReference;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    46
import java.util.function.Supplier;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    47
import java.lang.System.Logger.Level;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    48
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    49
/*
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    50
 * I/O abstraction used to implement WebSocket.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    51
 *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    52
 */
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    53
public class RawChannelTube implements RawChannel {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    54
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    55
    final HttpConnection connection;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    56
    final FlowTube tube;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    57
    final WritePublisher writePublisher;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    58
    final ReadSubscriber readSubscriber;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    59
    final Supplier<ByteBuffer> initial;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    60
    final AtomicBoolean inited = new AtomicBoolean();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    61
    final AtomicBoolean outputClosed = new AtomicBoolean();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    62
    final AtomicBoolean inputClosed = new AtomicBoolean();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    63
    final AtomicBoolean closed = new AtomicBoolean();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    64
    final String dbgTag;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    65
    final Logger debug;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    66
    private static final Cleaner cleaner =
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    67
            Utils.ASSERTIONSENABLED  && Utils.DEBUG_WS ? Cleaner.create() : null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    68
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    69
    RawChannelTube(HttpConnection connection,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    70
                   Supplier<ByteBuffer> initial) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    71
        this.connection = connection;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    72
        this.tube = connection.getConnectionFlow();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    73
        this.initial = initial;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    74
        this.writePublisher = new WritePublisher();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    75
        this.readSubscriber = new ReadSubscriber();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    76
        dbgTag = "[WebSocket] RawChannelTube(" + tube.toString() +")";
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    77
        debug = Utils.getWebSocketLogger(dbgTag::toString, Utils.DEBUG_WS);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    78
        connection.client().webSocketOpen();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    79
        connectFlows();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    80
        if (Utils.ASSERTIONSENABLED && Utils.DEBUG_WS) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    81
            // this is just for debug...
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    82
            cleaner.register(this, new CleanupChecker(closed, debug));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    83
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    84
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    85
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    86
    // Make sure no back reference to RawChannelTube can exist
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    87
    // from this class. In particular it would be dangerous
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    88
    // to reference connection, since connection has a reference
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    89
    // to SocketTube with which a RawChannelTube is registered.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    90
    // Ditto for HttpClientImpl, which might have a back reference
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    91
    // to the connection.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    92
    static final class CleanupChecker implements Runnable {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    93
        final AtomicBoolean closed;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    94
        final System.Logger debug;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    95
        CleanupChecker(AtomicBoolean closed, System.Logger debug) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    96
            this.closed = closed;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    97
            this.debug = debug;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    98
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
    99
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   100
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   101
        public void run() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   102
            if (!closed.get()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   103
                debug.log(Level.DEBUG,
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   104
                         "RawChannelTube was not closed before being released");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   105
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   106
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   107
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   108
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   109
    private void connectFlows() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   110
        if (debug.on()) debug.log("connectFlows");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   111
        tube.connectFlows(writePublisher, readSubscriber);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   112
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   113
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   114
    class WriteSubscription implements Flow.Subscription {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   115
        final Flow.Subscriber<? super List<ByteBuffer>> subscriber;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   116
        final Demand demand = new Demand();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   117
        volatile boolean cancelled;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   118
        WriteSubscription(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   119
            this.subscriber = subscriber;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   120
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   121
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   122
        public void request(long n) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   123
            if (debug.on()) debug.log("WriteSubscription::request %d", n);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   124
            demand.increase(n);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   125
            RawEvent event;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   126
            while ((event = writePublisher.events.poll()) != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   127
                if (debug.on()) debug.log("WriteSubscriber: handling event");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   128
                event.handle();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   129
                if (demand.isFulfilled()) break;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   130
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   131
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   132
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   133
        public void cancel() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   134
            cancelled = true;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   135
            if (debug.on()) debug.log("WriteSubscription::cancel");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   136
            shutdownOutput();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   137
            RawEvent event;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   138
            while ((event = writePublisher.events.poll()) != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   139
                if (debug.on()) debug.log("WriteSubscriber: handling event");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   140
                event.handle();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   141
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   142
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   143
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   144
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   145
    class WritePublisher implements FlowTube.TubePublisher {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   146
        final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   147
        volatile WriteSubscription writeSubscription;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   148
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   149
        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   150
            if (debug.on()) debug.log("WritePublisher::subscribe");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   151
            WriteSubscription subscription = new WriteSubscription(subscriber);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   152
            subscriber.onSubscribe(subscription);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   153
            writeSubscription = subscription;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   154
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   155
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   156
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   157
    class ReadSubscriber implements  FlowTube.TubeSubscriber {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   158
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   159
        volatile Flow.Subscription readSubscription;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   160
        volatile boolean completed;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   161
        long initialRequest;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   162
        final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   163
        final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   164
        final AtomicReference<Throwable> errorRef = new AtomicReference<>();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   165
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   166
        void checkEvents() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   167
            Flow.Subscription subscription = readSubscription;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   168
            if (subscription != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   169
                Throwable error = errorRef.get();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   170
                while (!buffers.isEmpty() || error != null || closed.get() || completed) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   171
                    RawEvent event = events.poll();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   172
                    if (event == null) break;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   173
                    if (debug.on()) debug.log("ReadSubscriber: handling event");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   174
                    event.handle();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   175
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   176
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   177
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   178
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   179
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   180
        public void onSubscribe(Flow.Subscription subscription) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   181
            //buffers.add(initial.get());
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   182
            long n;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   183
            synchronized (this) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   184
                readSubscription = subscription;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   185
                n = initialRequest;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   186
                initialRequest = 0;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   187
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   188
            if (debug.on()) debug.log("ReadSubscriber::onSubscribe");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   189
            if (n > 0) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   190
                Throwable error = errorRef.get();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   191
                if (error == null && !closed.get() && !completed) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   192
                    if (debug.on()) debug.log("readSubscription: requesting " + n);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   193
                    subscription.request(n);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   194
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   195
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   196
            checkEvents();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   197
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   198
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   199
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   200
        public void onNext(List<ByteBuffer> item) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   201
            if (debug.on()) debug.log(() -> "ReadSubscriber::onNext "
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   202
                    + Utils.remaining(item) + " bytes");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   203
            buffers.addAll(item);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   204
            checkEvents();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   205
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   206
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   207
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   208
        public void onError(Throwable throwable) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   209
            if (closed.get() || errorRef.compareAndSet(null, throwable)) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   210
                if (debug.on()) debug.log("ReadSubscriber::onError", throwable);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   211
                if (buffers.isEmpty()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   212
                    checkEvents();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   213
                    shutdownInput();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   214
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   215
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   216
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   217
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   218
        @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   219
        public void onComplete() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   220
            if (debug.on()) debug.log("ReadSubscriber::onComplete");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   221
            completed = true;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   222
            if (buffers.isEmpty()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   223
                checkEvents();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   224
                shutdownInput();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   225
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   226
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   227
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   228
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   229
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   230
    /*
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   231
     * Registers given event whose callback will be called once only (i.e.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   232
     * register new event for each callback).
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   233
     *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   234
     * Memory consistency effects: actions in a thread calling registerEvent
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   235
     * happen-before any subsequent actions in the thread calling event.handle
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   236
     */
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   237
    public void registerEvent(RawEvent event) throws IOException {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   238
        int interestOps = event.interestOps();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   239
        if ((interestOps & SelectionKey.OP_WRITE) != 0) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   240
            if (debug.on()) debug.log("register write event");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   241
            if (outputClosed.get()) throw new IOException("closed output");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   242
            writePublisher.events.add(event);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   243
            WriteSubscription writeSubscription = writePublisher.writeSubscription;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   244
            if (writeSubscription != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   245
                while (!writeSubscription.demand.isFulfilled()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   246
                    event = writePublisher.events.poll();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   247
                    if (event == null) break;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   248
                    event.handle();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   249
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   250
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   251
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   252
        if ((interestOps & SelectionKey.OP_READ) != 0) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   253
            if (debug.on()) debug.log("register read event");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   254
            if (inputClosed.get()) throw new IOException("closed input");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   255
            readSubscriber.events.add(event);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   256
            readSubscriber.checkEvents();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   257
            if (readSubscriber.buffers.isEmpty()
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   258
                    && !readSubscriber.events.isEmpty()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   259
                Flow.Subscription readSubscription =
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   260
                        readSubscriber.readSubscription;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   261
                if (readSubscription == null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   262
                    synchronized (readSubscriber) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   263
                        readSubscription = readSubscriber.readSubscription;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   264
                        if (readSubscription == null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   265
                            readSubscriber.initialRequest = 1;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   266
                            return;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   267
                        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   268
                    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   269
                }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   270
                assert  readSubscription != null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   271
                if (debug.on()) debug.log("readSubscription: requesting 1");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   272
                readSubscription.request(1);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   273
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   274
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   275
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   276
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   277
    /**
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   278
     * Hands over the initial bytes. Once the bytes have been returned they are
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   279
     * no longer available and the method will throw an {@link
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   280
     * IllegalStateException} on each subsequent invocation.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   281
     *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   282
     * @return the initial bytes
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   283
     * @throws IllegalStateException
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   284
     *         if the method has been already invoked
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   285
     */
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   286
    public ByteBuffer initialByteBuffer() throws IllegalStateException {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   287
        if (inited.compareAndSet(false, true)) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   288
            return initial.get();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   289
        } else throw new IllegalStateException("initial buffer already drained");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   290
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   291
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   292
    /*
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   293
     * Returns a ByteBuffer with the data read or null if EOF is reached. Has no
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   294
     * remaining bytes if no data available at the moment.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   295
     */
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   296
    public ByteBuffer read() throws IOException {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   297
        if (debug.on()) debug.log("read");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   298
        Flow.Subscription readSubscription = readSubscriber.readSubscription;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   299
        if (readSubscription == null) return Utils.EMPTY_BYTEBUFFER;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   300
        ByteBuffer buffer = readSubscriber.buffers.poll();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   301
        if (buffer != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   302
            if (debug.on()) debug.log("read: " + buffer.remaining());
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   303
            return buffer;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   304
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   305
        Throwable error = readSubscriber.errorRef.get();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   306
        if (error != null) error = Utils.getIOException(error);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   307
        if (error instanceof EOFException) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   308
            if (debug.on()) debug.log("read: EOFException");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   309
            shutdownInput();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   310
            return null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   311
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   312
        if (error != null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   313
            if (debug.on()) debug.log("read: " + error);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   314
            if (closed.get()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   315
                return null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   316
            }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   317
            shutdownInput();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   318
            throw Utils.getIOException(error);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   319
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   320
        if (readSubscriber.completed) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   321
            if (debug.on()) debug.log("read: EOF");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   322
            shutdownInput();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   323
            return null;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   324
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   325
        if (inputClosed.get()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   326
            if (debug.on()) debug.log("read: CLOSED");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   327
            throw new IOException("closed output");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   328
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   329
        if (debug.on()) debug.log("read: nothing to read");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   330
        return Utils.EMPTY_BYTEBUFFER;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   331
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   332
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   333
    /*
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   334
     * Writes a sequence of bytes to this channel from a subsequence of the
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   335
     * given buffers.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   336
     */
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   337
    public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   338
        if (outputClosed.get()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   339
            if (debug.on()) debug.log("write: CLOSED");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   340
            throw new IOException("closed output");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   341
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   342
        WriteSubscription writeSubscription =  writePublisher.writeSubscription;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   343
        if (writeSubscription == null) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   344
            if (debug.on()) debug.log("write: unsubscribed: 0");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   345
            return 0;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   346
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   347
        if (writeSubscription.cancelled) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   348
            if (debug.on()) debug.log("write: CANCELLED");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   349
            shutdownOutput();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   350
            throw new IOException("closed output");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   351
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   352
        if (writeSubscription.demand.tryDecrement()) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   353
            List<ByteBuffer> buffers = copy(srcs, offset, length);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   354
            long res = Utils.remaining(buffers);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   355
            if (debug.on()) debug.log("write: writing %d", res);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   356
            writeSubscription.subscriber.onNext(buffers);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   357
            return res;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   358
        } else {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   359
            if (debug.on()) debug.log("write: no demand: 0");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   360
            return 0;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   361
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   362
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   363
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   364
    /**
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   365
     * Shutdown the connection for reading without closing the channel.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   366
     *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   367
     * <p> Once shutdown for reading then further reads on the channel will
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   368
     * return {@code null}, the end-of-stream indication. If the input side of
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   369
     * the connection is already shutdown then invoking this method has no
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   370
     * effect.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   371
     *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   372
     * @throws ClosedChannelException
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   373
     *         If this channel is closed
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   374
     * @throws IOException
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   375
     *         If some other I/O error occurs
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   376
     */
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   377
    public void shutdownInput() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   378
        if (inputClosed.compareAndSet(false, true)) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   379
            if (debug.on()) debug.log("shutdownInput");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   380
            // TransportImpl will eventually call RawChannel::close.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   381
            // We must not call it here as this would close the socket
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   382
            // and can cause an exception to back fire before
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   383
            // TransportImpl and WebSocketImpl have updated their state.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   384
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   385
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   386
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   387
    /**
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   388
     * Shutdown the connection for writing without closing the channel.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   389
     *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   390
     * <p> Once shutdown for writing then further attempts to write to the
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   391
     * channel will throw {@link ClosedChannelException}. If the output side of
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   392
     * the connection is already shutdown then invoking this method has no
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   393
     * effect.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   394
     *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   395
     * @throws ClosedChannelException
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   396
     *         If this channel is closed
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   397
     * @throws IOException
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   398
     *         If some other I/O error occurs
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   399
     */
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   400
    public void shutdownOutput() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   401
        if (outputClosed.compareAndSet(false, true)) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   402
            if (debug.on()) debug.log("shutdownOutput");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   403
            // TransportImpl will eventually call RawChannel::close.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   404
            // We must not call it here as this would close the socket
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   405
            // and can cause an exception to back fire before
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   406
            // TransportImpl and WebSocketImpl have updated their state.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   407
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   408
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   409
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   410
    /**
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   411
     * Closes this channel.
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   412
     *
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   413
     * @throws IOException
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   414
     *         If an I/O error occurs
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   415
     */
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   416
    @Override
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   417
    public void close() {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   418
        if (closed.compareAndSet(false, true)) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   419
            if (debug.on()) debug.log("close");
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   420
            connection.client().webSocketClose();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   421
            connection.close();
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   422
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   423
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   424
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   425
    private static List<ByteBuffer> copy(ByteBuffer[] src, int offset, int len) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   426
        int count = Math.min(len, src.length - offset);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   427
        if (count <= 0) return Utils.EMPTY_BB_LIST;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   428
        if (count == 1) return List.of(Utils.copy(src[offset]));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   429
        if (count == 2) return List.of(Utils.copy(src[offset]), Utils.copy(src[offset+1]));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   430
        List<ByteBuffer> list = new ArrayList<>(count);
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   431
        for (int i = 0; i < count; i++) {
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   432
            list.add(Utils.copy(src[offset + i]));
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   433
        }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   434
        return list;
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   435
    }
ee6f7a61f3a5 8197564: HTTP Client implementation
chegar
parents:
diff changeset
   436
}