http-client-branch: merge http-client-branch
authorprappo
Wed, 08 Nov 2017 18:45:14 +0300
branchhttp-client-branch
changeset 55786 a32b59f7b7fb
parent 55785 eb4826e9f3f6 (current diff)
parent 55784 8b2d7be041dc (diff)
child 55787 d85e7823dce9
http-client-branch: merge
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncDataReadQueue.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncDataReadQueue.java	Wed Nov 08 18:22:38 2017 +0300
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,212 +0,0 @@
-/*
- * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package jdk.incubator.http.internal.common;
-
-import jdk.incubator.http.internal.frame.DataFrame;
-import jdk.incubator.http.internal.frame.Http2Frame;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
-
-/**
- * Http2Frame Producer-Consumer queue which either allows to consume all frames in blocking way
- * or allows to consume it asynchronously. In the latter case put operation from the producer thread
- * executes consume operation in the given executor.
- */
-public class AsyncDataReadQueue implements Closeable {
-
-    @FunctionalInterface
-    public interface DataConsumer {
-        /**
-         *
-         * @param t - frame
-         * @return true if consuming should be continued. false when END_STREAM was received.
-         * @throws Throwable
-         */
-        boolean accept(Http2Frame t) throws Throwable;
-    }
-
-    private static final int BLOCKING = 0;
-    private static final int FLUSHING = 1;
-    private static final int REFLUSHING = 2;
-    private static final int ASYNC  = 3;
-    private static final int CLOSED = 4;
-
-
-    private final AtomicInteger state = new AtomicInteger(BLOCKING);
-    private final BlockingQueue<Http2Frame> queue = new LinkedBlockingQueue<>();
-    private Executor executor;
-    private DataConsumer onData;
-    private Consumer<Throwable> onError;
-
-    public AsyncDataReadQueue() {
-    }
-
-    public boolean tryPut(Http2Frame f) {
-        if(state.get() == CLOSED) {
-            return false;
-        } else {
-            queue.offer(f);
-            flushAsync(false);
-            return true;
-        }
-    }
-
-    public void put(Http2Frame f) throws IOException {
-        if(!tryPut(f))
-            throw new IOException("stream closed");
-    }
-
-    public void blockingReceive(DataConsumer onData, Consumer<Throwable> onError) {
-        if (state.get() == CLOSED) {
-            onError.accept(new IOException("stream closed"));
-            return;
-        }
-        assert state.get() == BLOCKING;
-        try {
-            while (onData.accept(queue.take()));
-            assert state.get() == CLOSED;
-        } catch (Throwable e) {
-            onError.accept(e);
-        }
-    }
-
-    public void asyncReceive(Executor executor, DataConsumer onData,
-                             Consumer<Throwable> onError) {
-        if (state.get() == CLOSED) {
-            onError.accept(new IOException("stream closed"));
-            return;
-        }
-
-        assert state.get() == BLOCKING;
-
-        // Validates that fields not already set.
-        if (!checkCanSet("executor", this.executor, onError)
-            || !checkCanSet("onData", this.onData, onError)
-            || !checkCanSet("onError", this.onError, onError)) {
-            return;
-        }
-
-        this.executor = executor;
-        this.onData = onData;
-        this.onError = onError;
-
-        // This will report an error if asyncReceive is called twice,
-        // because we won't be in BLOCKING state if that happens
-        if (!this.state.compareAndSet(BLOCKING, ASYNC)) {
-            onError.accept(new IOException(
-                  new IllegalStateException("State: "+this.state.get())));
-            return;
-        }
-
-        flushAsync(false);
-    }
-
-    private static <T> boolean checkCanSet(String name, T oldval, Consumer<Throwable> onError) {
-        if (oldval != null) {
-            onError.accept(new IOException(
-                     new IllegalArgumentException(name)));
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public void close() {
-        int prevState = state.getAndSet(CLOSED);
-        if(prevState == BLOCKING) {
-            // wake up blocked take()
-            queue.offer(new DataFrame(0, DataFrame.END_STREAM, new ByteBufferReference[0]));
-        }
-    }
-
-    private void flushAsync(boolean alreadyInExecutor) {
-        while(true) {
-            switch (state.get()) {
-                case BLOCKING:
-                case CLOSED:
-                case REFLUSHING:
-                    return;
-                case ASYNC:
-                    if(state.compareAndSet(ASYNC, FLUSHING)) {
-                        if(alreadyInExecutor) {
-                            flushLoop();
-                        } else {
-                            executor.execute(this::flushLoop);
-                        }
-                        return;
-                    }
-                    break;
-                case FLUSHING:
-                    if(state.compareAndSet(FLUSHING, REFLUSHING)) {
-                        return;
-                    }
-                    break;
-            }
-        }
-    }
-
-    private void flushLoop() {
-        try {
-            while(true) {
-                Http2Frame frame = queue.poll();
-                while (frame != null) {
-                    if(!onData.accept(frame)) {
-                        assert state.get() == CLOSED;
-                        return; // closed
-                    }
-                    frame = queue.poll();
-                }
-                switch (state.get()) {
-                    case BLOCKING:
-                        assert false;
-                        break;
-                    case ASYNC:
-                        throw new RuntimeException("Shouldn't happen");
-                    case FLUSHING:
-                        if(state.compareAndSet(FLUSHING, ASYNC)) {
-                            return;
-                        }
-                        break;
-                    case REFLUSHING:
-                        // We need to check if new elements were put after last
-                        // poll() and do graceful exit
-                        state.compareAndSet(REFLUSHING, FLUSHING);
-                        break;
-                    case CLOSED:
-                        return;
-                }
-            }
-        } catch (Throwable e) {
-            onError.accept(e);
-            close();
-        }
-    }
-}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java	Wed Nov 08 18:22:38 2017 +0300
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,235 +0,0 @@
-/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package jdk.incubator.http.internal.common;
-
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Deque;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class AsyncWriteQueue implements Closeable {
-
-    @FunctionalInterface
-    public static interface AsyncConsumer {
-        /**
-         * Takes an array of buffer reference and attempt to send the data
-         * downstream. If not all the data can be sent, then push back
-         * to the source queue by calling {@code source.setDelayed(buffers)}
-         * and return false. If all the data was successfully sent downstream
-         * then returns true.
-         * @param buffers An array of ByteBufferReference containing data
-         *                to send downstream.
-         * @param source This AsyncWriteQueue.
-         * @return true if all the data could be sent downstream, false otherwise.
-         */
-        boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source);
-    }
-
-    private static final int IDLE    = 0;     // nobody is flushing from the queue
-    private static final int FLUSHING = 1;    // there is the only thread flushing from the queue
-    private static final int REFLUSHING = 2;  // while one thread was flushing from the queue
-                                              // the other thread put data into the queue.
-                                              // flushing thread should recheck queue before switching to idle state.
-    private static final int DELAYED = 3;     // flushing is delayed
-                                              // either by PlainHttpConnection.WriteEvent registration, or
-                                              // SSL handshaking
-
-    private static final int CLOSED = 4;      // queue is closed
-
-    private final AtomicInteger state = new AtomicInteger(IDLE);
-    private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
-    private final AsyncConsumer consumeAction;
-
-    // Queue may be processed in two modes:
-    // 1. if(!doFullDrain) - invoke callback on each chunk
-    // 2. if(doFullDrain)  - drain the whole queue, merge all chunks into the single array and invoke callback
-    private final boolean doFullDrain;
-
-    private ByteBufferReference[] delayedElement = null;
-
-    public AsyncWriteQueue(AsyncConsumer consumeAction) {
-        this(consumeAction, true);
-    }
-
-    public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) {
-        this.consumeAction = consumeAction;
-        this.doFullDrain = doFullDrain;
-    }
-
-    public void put(ByteBufferReference[] e) throws IOException {
-        ensureOpen();
-        queue.addLast(e);
-    }
-
-    public void putFirst(ByteBufferReference[] e) throws IOException {
-        ensureOpen();
-        queue.addFirst(e);
-    }
-
-    /**
-     * Returns true if flushing was performed
-     * @return
-     * @throws IOException
-     */
-    public boolean flush() throws IOException {
-        while(true) {
-            switch (state.get()) {
-                case IDLE:
-                    if(state.compareAndSet(IDLE, FLUSHING)) {
-                        flushLoop();
-                        return true;
-                    }
-                    break;
-                case FLUSHING:
-                    if(state.compareAndSet(FLUSHING, REFLUSHING)) {
-                        return false;
-                    }
-                    break;
-                case REFLUSHING:
-                case DELAYED:
-                    return false;
-                case CLOSED:
-                    throw new IOException("Queue closed");
-            }
-        }
-    }
-
-    /*
-     *  race invocations of flushDelayed are not allowed.
-     *  flushDelayed should be invoked only from:
-     *   - SelectorManager thread
-     *   - Handshaking thread
-     */
-    public void flushDelayed() throws IOException {
-        ensureOpen();
-        if(!state.compareAndSet(DELAYED, FLUSHING)) {
-            ensureOpen(); // if CAS failed when close was set - throw proper exception
-            throw new RuntimeException("Shouldn't happen");
-        }
-        flushLoop();
-    }
-
-    private ByteBufferReference[] drain(ByteBufferReference[] prev) {
-        assert prev != null;
-        if(doFullDrain) {
-            ByteBufferReference[] next = queue.poll();
-            if(next == null) {
-                return prev;
-            }
-            List<ByteBufferReference> drained = new ArrayList<>();
-            drained.addAll(Arrays.asList(prev));
-            drained.addAll(Arrays.asList(next));
-            while ((next = queue.poll()) != null) {
-                drained.addAll(Arrays.asList(next));
-            }
-            return drained.toArray(new ByteBufferReference[0]);
-        } else {
-            return prev;
-        }
-    }
-
-    private ByteBufferReference[] drain() {
-        ByteBufferReference[] next = queue.poll();
-        return next == null ? null : drain(next);
-    }
-
-    private void flushLoop() throws IOException {
-        ByteBufferReference[] element;
-        if (delayedElement != null) {
-            element = drain(delayedElement);
-            delayedElement = null;
-        } else {
-            element = drain();
-        }
-        while(true) {
-            while (element != null) {
-                if (!consumeAction.trySend(element, this)) {
-                    return;
-                }
-                element = drain();
-            }
-            switch (state.get()) {
-                case IDLE:
-                case DELAYED:
-                    throw new RuntimeException("Shouldn't happen");
-                case FLUSHING:
-                    if(state.compareAndSet(FLUSHING, IDLE)) {
-                        return;
-                    }
-                    break;
-                case REFLUSHING:
-                    // We need to check if new elements were put after last poll() and do graceful exit
-                    state.compareAndSet(REFLUSHING, FLUSHING);
-                    break;
-                case CLOSED:
-                    throw new IOException("Queue closed");
-            }
-            element = drain();
-        }
-    }
-
-    /*
-     * The methods returns unprocessed chunk of buffers into beginning of the queue.
-     * Invocation of the method allowed only inside consume callback,
-     * and consume callback is invoked only when the queue in FLUSHING or REFLUSHING state.
-     */
-    public void setDelayed(ByteBufferReference[] delayedElement) throws IOException {
-        while(true) {
-            int state = this.state.get();
-            switch (state) {
-                case IDLE:
-                case DELAYED:
-                    throw new RuntimeException("Shouldn't happen");
-                case FLUSHING:
-                case REFLUSHING:
-                    if(this.state.compareAndSet(state, DELAYED)) {
-                        this.delayedElement = delayedElement;
-                        return;
-                    }
-                    break;
-                case CLOSED:
-                    throw new IOException("Queue closed");
-            }
-        }
-
-    }
-
-    private void ensureOpen() throws IOException {
-        if (state.get() == CLOSED) {
-            throw new IOException("Queue closed");
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        state.getAndSet(CLOSED);
-    }
-
-}