jdk/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java
author dfuchs
Thu, 17 Aug 2017 16:48:45 +0100
changeset 46836 2684bef98033
parent 42460 7133f144981a
permissions -rw-r--r--
8177935: java/net/httpclient/http2/FixedThreadPoolTest.java fails frequently Summary: fixes a race condition in AsyncWriteQueue Reviewed-by: chegar

/*
 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */
package jdk.incubator.http.internal.common;


import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;

public class AsyncWriteQueue implements Closeable {

    @FunctionalInterface
    public static interface AsyncConsumer {
        /**
         * Takes an array of buffer reference and attempt to send the data
         * downstream. If not all the data can be sent, then push back
         * to the source queue by calling {@code source.setDelayed(buffers)}
         * and return false. If all the data was successfully sent downstream
         * then returns true.
         * @param buffers An array of ButeBufferReference containing data
         *                to send downstream.
         * @param source This AsyncWriteQueue.
         * @return true if all the data could be sent downstream, false otherwise.
         */
        boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source);
    }

    private static final int IDLE    = 0;     // nobody is flushing from the queue
    private static final int FLUSHING = 1;    // there is the only thread flushing from the queue
    private static final int REFLUSHING = 2;  // while one thread was flushing from the queue
                                              // the other thread put data into the queue.
                                              // flushing thread should recheck queue before switching to idle state.
    private static final int DELAYED = 3;     // flushing is delayed
                                              // either by PlainHttpConnection.WriteEvent registration, or
                                              // SSL handshaking

    private static final int CLOSED = 4;      // queue is closed

    private final AtomicInteger state = new AtomicInteger(IDLE);
    private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
    private final AsyncConsumer consumeAction;

    // Queue may be processed in two modes:
    // 1. if(!doFullDrain) - invoke callback on each chunk
    // 2. if(doFullDrain)  - drain the whole queue, merge all chunks into the single array and invoke callback
    private final boolean doFullDrain;

    private ByteBufferReference[] delayedElement = null;

    public AsyncWriteQueue(AsyncConsumer consumeAction) {
        this(consumeAction, true);
    }

    public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) {
        this.consumeAction = consumeAction;
        this.doFullDrain = doFullDrain;
    }

    public void put(ByteBufferReference[] e) throws IOException {
        ensureOpen();
        queue.addLast(e);
    }

    public void putFirst(ByteBufferReference[] e) throws IOException {
        ensureOpen();
        queue.addFirst(e);
    }

    /**
     * retruns true if flushing was performed
     * @return
     * @throws IOException
     */
    public boolean flush() throws IOException {
        while(true) {
            switch (state.get()) {
                case IDLE:
                    if(state.compareAndSet(IDLE, FLUSHING)) {
                        flushLoop();
                        return true;
                    }
                    break;
                case FLUSHING:
                    if(state.compareAndSet(FLUSHING, REFLUSHING)) {
                        return false;
                    }
                    break;
                case REFLUSHING:
                case DELAYED:
                    return false;
                case CLOSED:
                    throw new IOException("Queue closed");
            }
        }
    }

    /*
     *  race invocations of flushDelayed are not allowed.
     *  flushDelayed should be invoked only from:
     *   - SelectorManager thread
     *   - Handshaking thread
     */
    public void flushDelayed() throws IOException {
        ensureOpen();
        if(!state.compareAndSet(DELAYED, FLUSHING)) {
            ensureOpen(); // if CAS failed when close was set - throw proper exception
            throw new RuntimeException("Shouldn't happen");
        }
        flushLoop();
    }

    private ByteBufferReference[] drain(ByteBufferReference[] prev) {
        assert prev != null;
        if(doFullDrain) {
            ByteBufferReference[] next = queue.poll();
            if(next == null) {
                return prev;
            }
            List<ByteBufferReference> drained = new ArrayList<>();
            drained.addAll(Arrays.asList(prev));
            drained.addAll(Arrays.asList(next));
            while ((next = queue.poll()) != null) {
                drained.addAll(Arrays.asList(next));
            }
            return drained.toArray(new ByteBufferReference[0]);
        } else {
            return prev;
        }
    }

    private ByteBufferReference[] drain() {
        ByteBufferReference[] next = queue.poll();
        return next == null ? null : drain(next);
    }

    private void flushLoop() throws IOException {
        ByteBufferReference[] element;
        if (delayedElement != null) {
            element = drain(delayedElement);
            delayedElement = null;
        } else {
            element = drain();
        }
        while(true) {
            while (element != null) {
                if (!consumeAction.trySend(element, this)) {
                    return;
                }
                element = drain();
            }
            switch (state.get()) {
                case IDLE:
                case DELAYED:
                    throw new RuntimeException("Shouldn't happen");
                case FLUSHING:
                    if(state.compareAndSet(FLUSHING, IDLE)) {
                        return;
                    }
                    break;
                case REFLUSHING:
                    // We need to check if new elements were put after last poll() and do graceful exit
                    state.compareAndSet(REFLUSHING, FLUSHING);
                    break;
                case CLOSED:
                    throw new IOException("Queue closed");
            }
            element = drain();
        }
    }

    /*
     * The methods returns unprocessed chunk of buffers into beginning of the queue.
     * Invocation of the method allowed only inside consume callback,
     * and consume callback is invoked only when the queue in FLUSHING or REFLUSHING state.
     */
    public void setDelayed(ByteBufferReference[] delayedElement) throws IOException {
        while(true) {
            int state = this.state.get();
            switch (state) {
                case IDLE:
                case DELAYED:
                    throw new RuntimeException("Shouldn't happen");
                case FLUSHING:
                case REFLUSHING:
                    if(this.state.compareAndSet(state, DELAYED)) {
                        this.delayedElement = delayedElement;
                        return;
                    }
                    break;
                case CLOSED:
                    throw new IOException("Queue closed");
            }
        }

    }

    private void ensureOpen() throws IOException {
        if (state.get() == CLOSED) {
            throw new IOException("Queue closed");
        }
    }

    @Override
    public void close() throws IOException {
        state.getAndSet(CLOSED);
    }

}