src/java.net.http/share/classes/jdk/internal/net/http/RawChannelTube.java
author jdv
Tue, 15 May 2018 11:34:25 +0530
changeset 50147 23a8ccafa7ba
parent 49765 ee6f7a61f3a5
child 56451 9585061fdb04
permissions -rw-r--r--
8202824: Cleanup discrepancies in ProblemList for java_awt jtreg tests Reviewed-by: serb

/*
 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.internal.net.http;

import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.Utils;
import jdk.internal.net.http.websocket.RawChannel;

import java.io.EOFException;
import java.io.IOException;
import java.lang.ref.Cleaner;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.lang.System.Logger.Level;

/*
 * I/O abstraction used to implement WebSocket.
 *
 */
public class RawChannelTube implements RawChannel {

    final HttpConnection connection;
    final FlowTube tube;
    final WritePublisher writePublisher;
    final ReadSubscriber readSubscriber;
    final Supplier<ByteBuffer> initial;
    final AtomicBoolean inited = new AtomicBoolean();
    final AtomicBoolean outputClosed = new AtomicBoolean();
    final AtomicBoolean inputClosed = new AtomicBoolean();
    final AtomicBoolean closed = new AtomicBoolean();
    final String dbgTag;
    final Logger debug;
    private static final Cleaner cleaner =
            Utils.ASSERTIONSENABLED  && Utils.DEBUG_WS ? Cleaner.create() : null;

    RawChannelTube(HttpConnection connection,
                   Supplier<ByteBuffer> initial) {
        this.connection = connection;
        this.tube = connection.getConnectionFlow();
        this.initial = initial;
        this.writePublisher = new WritePublisher();
        this.readSubscriber = new ReadSubscriber();
        dbgTag = "[WebSocket] RawChannelTube(" + tube.toString() +")";
        debug = Utils.getWebSocketLogger(dbgTag::toString, Utils.DEBUG_WS);
        connection.client().webSocketOpen();
        connectFlows();
        if (Utils.ASSERTIONSENABLED && Utils.DEBUG_WS) {
            // this is just for debug...
            cleaner.register(this, new CleanupChecker(closed, debug));
        }
    }

    // Make sure no back reference to RawChannelTube can exist
    // from this class. In particular it would be dangerous
    // to reference connection, since connection has a reference
    // to SocketTube with which a RawChannelTube is registered.
    // Ditto for HttpClientImpl, which might have a back reference
    // to the connection.
    static final class CleanupChecker implements Runnable {
        final AtomicBoolean closed;
        final System.Logger debug;
        CleanupChecker(AtomicBoolean closed, System.Logger debug) {
            this.closed = closed;
            this.debug = debug;
        }

        @Override
        public void run() {
            if (!closed.get()) {
                debug.log(Level.DEBUG,
                         "RawChannelTube was not closed before being released");
            }
        }
    }

    private void connectFlows() {
        if (debug.on()) debug.log("connectFlows");
        tube.connectFlows(writePublisher, readSubscriber);
    }

    class WriteSubscription implements Flow.Subscription {
        final Flow.Subscriber<? super List<ByteBuffer>> subscriber;
        final Demand demand = new Demand();
        volatile boolean cancelled;
        WriteSubscription(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
            this.subscriber = subscriber;
        }
        @Override
        public void request(long n) {
            if (debug.on()) debug.log("WriteSubscription::request %d", n);
            demand.increase(n);
            RawEvent event;
            while ((event = writePublisher.events.poll()) != null) {
                if (debug.on()) debug.log("WriteSubscriber: handling event");
                event.handle();
                if (demand.isFulfilled()) break;
            }
        }
        @Override
        public void cancel() {
            cancelled = true;
            if (debug.on()) debug.log("WriteSubscription::cancel");
            shutdownOutput();
            RawEvent event;
            while ((event = writePublisher.events.poll()) != null) {
                if (debug.on()) debug.log("WriteSubscriber: handling event");
                event.handle();
            }
        }
    }

    class WritePublisher implements FlowTube.TubePublisher {
        final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();
        volatile WriteSubscription writeSubscription;
        @Override
        public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
            if (debug.on()) debug.log("WritePublisher::subscribe");
            WriteSubscription subscription = new WriteSubscription(subscriber);
            subscriber.onSubscribe(subscription);
            writeSubscription = subscription;
        }
    }

    class ReadSubscriber implements  FlowTube.TubeSubscriber {

        volatile Flow.Subscription readSubscription;
        volatile boolean completed;
        long initialRequest;
        final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();
        final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
        final AtomicReference<Throwable> errorRef = new AtomicReference<>();

        void checkEvents() {
            Flow.Subscription subscription = readSubscription;
            if (subscription != null) {
                Throwable error = errorRef.get();
                while (!buffers.isEmpty() || error != null || closed.get() || completed) {
                    RawEvent event = events.poll();
                    if (event == null) break;
                    if (debug.on()) debug.log("ReadSubscriber: handling event");
                    event.handle();
                }
            }
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            //buffers.add(initial.get());
            long n;
            synchronized (this) {
                readSubscription = subscription;
                n = initialRequest;
                initialRequest = 0;
            }
            if (debug.on()) debug.log("ReadSubscriber::onSubscribe");
            if (n > 0) {
                Throwable error = errorRef.get();
                if (error == null && !closed.get() && !completed) {
                    if (debug.on()) debug.log("readSubscription: requesting " + n);
                    subscription.request(n);
                }
            }
            checkEvents();
        }

        @Override
        public void onNext(List<ByteBuffer> item) {
            if (debug.on()) debug.log(() -> "ReadSubscriber::onNext "
                    + Utils.remaining(item) + " bytes");
            buffers.addAll(item);
            checkEvents();
        }

        @Override
        public void onError(Throwable throwable) {
            if (closed.get() || errorRef.compareAndSet(null, throwable)) {
                if (debug.on()) debug.log("ReadSubscriber::onError", throwable);
                if (buffers.isEmpty()) {
                    checkEvents();
                    shutdownInput();
                }
            }
        }

        @Override
        public void onComplete() {
            if (debug.on()) debug.log("ReadSubscriber::onComplete");
            completed = true;
            if (buffers.isEmpty()) {
                checkEvents();
                shutdownInput();
            }
        }
    }


    /*
     * Registers given event whose callback will be called once only (i.e.
     * register new event for each callback).
     *
     * Memory consistency effects: actions in a thread calling registerEvent
     * happen-before any subsequent actions in the thread calling event.handle
     */
    public void registerEvent(RawEvent event) throws IOException {
        int interestOps = event.interestOps();
        if ((interestOps & SelectionKey.OP_WRITE) != 0) {
            if (debug.on()) debug.log("register write event");
            if (outputClosed.get()) throw new IOException("closed output");
            writePublisher.events.add(event);
            WriteSubscription writeSubscription = writePublisher.writeSubscription;
            if (writeSubscription != null) {
                while (!writeSubscription.demand.isFulfilled()) {
                    event = writePublisher.events.poll();
                    if (event == null) break;
                    event.handle();
                }
            }
        }
        if ((interestOps & SelectionKey.OP_READ) != 0) {
            if (debug.on()) debug.log("register read event");
            if (inputClosed.get()) throw new IOException("closed input");
            readSubscriber.events.add(event);
            readSubscriber.checkEvents();
            if (readSubscriber.buffers.isEmpty()
                    && !readSubscriber.events.isEmpty()) {
                Flow.Subscription readSubscription =
                        readSubscriber.readSubscription;
                if (readSubscription == null) {
                    synchronized (readSubscriber) {
                        readSubscription = readSubscriber.readSubscription;
                        if (readSubscription == null) {
                            readSubscriber.initialRequest = 1;
                            return;
                        }
                    }
                }
                assert  readSubscription != null;
                if (debug.on()) debug.log("readSubscription: requesting 1");
                readSubscription.request(1);
            }
        }
    }

    /**
     * Hands over the initial bytes. Once the bytes have been returned they are
     * no longer available and the method will throw an {@link
     * IllegalStateException} on each subsequent invocation.
     *
     * @return the initial bytes
     * @throws IllegalStateException
     *         if the method has been already invoked
     */
    public ByteBuffer initialByteBuffer() throws IllegalStateException {
        if (inited.compareAndSet(false, true)) {
            return initial.get();
        } else throw new IllegalStateException("initial buffer already drained");
    }

    /*
     * Returns a ByteBuffer with the data read or null if EOF is reached. Has no
     * remaining bytes if no data available at the moment.
     */
    public ByteBuffer read() throws IOException {
        if (debug.on()) debug.log("read");
        Flow.Subscription readSubscription = readSubscriber.readSubscription;
        if (readSubscription == null) return Utils.EMPTY_BYTEBUFFER;
        ByteBuffer buffer = readSubscriber.buffers.poll();
        if (buffer != null) {
            if (debug.on()) debug.log("read: " + buffer.remaining());
            return buffer;
        }
        Throwable error = readSubscriber.errorRef.get();
        if (error != null) error = Utils.getIOException(error);
        if (error instanceof EOFException) {
            if (debug.on()) debug.log("read: EOFException");
            shutdownInput();
            return null;
        }
        if (error != null) {
            if (debug.on()) debug.log("read: " + error);
            if (closed.get()) {
                return null;
            }
            shutdownInput();
            throw Utils.getIOException(error);
        }
        if (readSubscriber.completed) {
            if (debug.on()) debug.log("read: EOF");
            shutdownInput();
            return null;
        }
        if (inputClosed.get()) {
            if (debug.on()) debug.log("read: CLOSED");
            throw new IOException("closed output");
        }
        if (debug.on()) debug.log("read: nothing to read");
        return Utils.EMPTY_BYTEBUFFER;
    }

    /*
     * Writes a sequence of bytes to this channel from a subsequence of the
     * given buffers.
     */
    public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
        if (outputClosed.get()) {
            if (debug.on()) debug.log("write: CLOSED");
            throw new IOException("closed output");
        }
        WriteSubscription writeSubscription =  writePublisher.writeSubscription;
        if (writeSubscription == null) {
            if (debug.on()) debug.log("write: unsubscribed: 0");
            return 0;
        }
        if (writeSubscription.cancelled) {
            if (debug.on()) debug.log("write: CANCELLED");
            shutdownOutput();
            throw new IOException("closed output");
        }
        if (writeSubscription.demand.tryDecrement()) {
            List<ByteBuffer> buffers = copy(srcs, offset, length);
            long res = Utils.remaining(buffers);
            if (debug.on()) debug.log("write: writing %d", res);
            writeSubscription.subscriber.onNext(buffers);
            return res;
        } else {
            if (debug.on()) debug.log("write: no demand: 0");
            return 0;
        }
    }

    /**
     * Shutdown the connection for reading without closing the channel.
     *
     * <p> Once shutdown for reading then further reads on the channel will
     * return {@code null}, the end-of-stream indication. If the input side of
     * the connection is already shutdown then invoking this method has no
     * effect.
     *
     * @throws ClosedChannelException
     *         If this channel is closed
     * @throws IOException
     *         If some other I/O error occurs
     */
    public void shutdownInput() {
        if (inputClosed.compareAndSet(false, true)) {
            if (debug.on()) debug.log("shutdownInput");
            // TransportImpl will eventually call RawChannel::close.
            // We must not call it here as this would close the socket
            // and can cause an exception to back fire before
            // TransportImpl and WebSocketImpl have updated their state.
        }
    }

    /**
     * Shutdown the connection for writing without closing the channel.
     *
     * <p> Once shutdown for writing then further attempts to write to the
     * channel will throw {@link ClosedChannelException}. If the output side of
     * the connection is already shutdown then invoking this method has no
     * effect.
     *
     * @throws ClosedChannelException
     *         If this channel is closed
     * @throws IOException
     *         If some other I/O error occurs
     */
    public void shutdownOutput() {
        if (outputClosed.compareAndSet(false, true)) {
            if (debug.on()) debug.log("shutdownOutput");
            // TransportImpl will eventually call RawChannel::close.
            // We must not call it here as this would close the socket
            // and can cause an exception to back fire before
            // TransportImpl and WebSocketImpl have updated their state.
        }
    }

    /**
     * Closes this channel.
     *
     * @throws IOException
     *         If an I/O error occurs
     */
    @Override
    public void close() {
        if (closed.compareAndSet(false, true)) {
            if (debug.on()) debug.log("close");
            connection.client().webSocketClose();
            connection.close();
        }
    }

    private static List<ByteBuffer> copy(ByteBuffer[] src, int offset, int len) {
        int count = Math.min(len, src.length - offset);
        if (count <= 0) return Utils.EMPTY_BB_LIST;
        if (count == 1) return List.of(Utils.copy(src[offset]));
        if (count == 2) return List.of(Utils.copy(src[offset]), Utils.copy(src[offset+1]));
        List<ByteBuffer> list = new ArrayList<>(count);
        for (int i = 0; i < count; i++) {
            list.add(Utils.copy(src[offset + i]));
        }
        return list;
    }
}