# HG changeset patch # User chegar # Date 1510154917 0 # Node ID 8b2d7be041dc462595436a35a30c5452cf5b155c # Parent 5cd3c3a62aaf545da7c38479838a83bdad26f3cd http-client-branch: remove dead code diff -r 5cd3c3a62aaf -r 8b2d7be041dc src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncDataReadQueue.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncDataReadQueue.java Wed Nov 08 14:20:49 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,212 +0,0 @@ -/* - * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package jdk.incubator.http.internal.common; - -import jdk.incubator.http.internal.frame.DataFrame; -import jdk.incubator.http.internal.frame.Http2Frame; - -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; - -/** - * Http2Frame Producer-Consumer queue which either allows to consume all frames in blocking way - * or allows to consume it asynchronously. In the latter case put operation from the producer thread - * executes consume operation in the given executor. - */ -public class AsyncDataReadQueue implements Closeable { - - @FunctionalInterface - public interface DataConsumer { - /** - * - * @param t - frame - * @return true if consuming should be continued. false when END_STREAM was received. - * @throws Throwable - */ - boolean accept(Http2Frame t) throws Throwable; - } - - private static final int BLOCKING = 0; - private static final int FLUSHING = 1; - private static final int REFLUSHING = 2; - private static final int ASYNC = 3; - private static final int CLOSED = 4; - - - private final AtomicInteger state = new AtomicInteger(BLOCKING); - private final BlockingQueue queue = new LinkedBlockingQueue<>(); - private Executor executor; - private DataConsumer onData; - private Consumer onError; - - public AsyncDataReadQueue() { - } - - public boolean tryPut(Http2Frame f) { - if(state.get() == CLOSED) { - return false; - } else { - queue.offer(f); - flushAsync(false); - return true; - } - } - - public void put(Http2Frame f) throws IOException { - if(!tryPut(f)) - throw new IOException("stream closed"); - } - - public void blockingReceive(DataConsumer onData, Consumer onError) { - if (state.get() == CLOSED) { - onError.accept(new IOException("stream closed")); - return; - } - assert state.get() == BLOCKING; - try { - while (onData.accept(queue.take())); - assert state.get() == CLOSED; - } catch (Throwable e) { - onError.accept(e); - } - } - - public void asyncReceive(Executor executor, DataConsumer onData, - Consumer onError) { - if (state.get() == CLOSED) { - onError.accept(new IOException("stream closed")); - return; - } - - assert state.get() == BLOCKING; - - // Validates that fields not already set. - if (!checkCanSet("executor", this.executor, onError) - || !checkCanSet("onData", this.onData, onError) - || !checkCanSet("onError", this.onError, onError)) { - return; - } - - this.executor = executor; - this.onData = onData; - this.onError = onError; - - // This will report an error if asyncReceive is called twice, - // because we won't be in BLOCKING state if that happens - if (!this.state.compareAndSet(BLOCKING, ASYNC)) { - onError.accept(new IOException( - new IllegalStateException("State: "+this.state.get()))); - return; - } - - flushAsync(false); - } - - private static boolean checkCanSet(String name, T oldval, Consumer onError) { - if (oldval != null) { - onError.accept(new IOException( - new IllegalArgumentException(name))); - return false; - } - return true; - } - - @Override - public void close() { - int prevState = state.getAndSet(CLOSED); - if(prevState == BLOCKING) { - // wake up blocked take() - queue.offer(new DataFrame(0, DataFrame.END_STREAM, new ByteBufferReference[0])); - } - } - - private void flushAsync(boolean alreadyInExecutor) { - while(true) { - switch (state.get()) { - case BLOCKING: - case CLOSED: - case REFLUSHING: - return; - case ASYNC: - if(state.compareAndSet(ASYNC, FLUSHING)) { - if(alreadyInExecutor) { - flushLoop(); - } else { - executor.execute(this::flushLoop); - } - return; - } - break; - case FLUSHING: - if(state.compareAndSet(FLUSHING, REFLUSHING)) { - return; - } - break; - } - } - } - - private void flushLoop() { - try { - while(true) { - Http2Frame frame = queue.poll(); - while (frame != null) { - if(!onData.accept(frame)) { - assert state.get() == CLOSED; - return; // closed - } - frame = queue.poll(); - } - switch (state.get()) { - case BLOCKING: - assert false; - break; - case ASYNC: - throw new RuntimeException("Shouldn't happen"); - case FLUSHING: - if(state.compareAndSet(FLUSHING, ASYNC)) { - return; - } - break; - case REFLUSHING: - // We need to check if new elements were put after last - // poll() and do graceful exit - state.compareAndSet(REFLUSHING, FLUSHING); - break; - case CLOSED: - return; - } - } - } catch (Throwable e) { - onError.accept(e); - close(); - } - } -} diff -r 5cd3c3a62aaf -r 8b2d7be041dc src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/AsyncWriteQueue.java Wed Nov 08 14:20:49 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,235 +0,0 @@ -/* - * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package jdk.incubator.http.internal.common; - - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Deque; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.atomic.AtomicInteger; - -public class AsyncWriteQueue implements Closeable { - - @FunctionalInterface - public static interface AsyncConsumer { - /** - * Takes an array of buffer reference and attempt to send the data - * downstream. If not all the data can be sent, then push back - * to the source queue by calling {@code source.setDelayed(buffers)} - * and return false. If all the data was successfully sent downstream - * then returns true. - * @param buffers An array of ByteBufferReference containing data - * to send downstream. - * @param source This AsyncWriteQueue. - * @return true if all the data could be sent downstream, false otherwise. - */ - boolean trySend(ByteBufferReference[] buffers, AsyncWriteQueue source); - } - - private static final int IDLE = 0; // nobody is flushing from the queue - private static final int FLUSHING = 1; // there is the only thread flushing from the queue - private static final int REFLUSHING = 2; // while one thread was flushing from the queue - // the other thread put data into the queue. - // flushing thread should recheck queue before switching to idle state. - private static final int DELAYED = 3; // flushing is delayed - // either by PlainHttpConnection.WriteEvent registration, or - // SSL handshaking - - private static final int CLOSED = 4; // queue is closed - - private final AtomicInteger state = new AtomicInteger(IDLE); - private final Deque queue = new ConcurrentLinkedDeque<>(); - private final AsyncConsumer consumeAction; - - // Queue may be processed in two modes: - // 1. if(!doFullDrain) - invoke callback on each chunk - // 2. if(doFullDrain) - drain the whole queue, merge all chunks into the single array and invoke callback - private final boolean doFullDrain; - - private ByteBufferReference[] delayedElement = null; - - public AsyncWriteQueue(AsyncConsumer consumeAction) { - this(consumeAction, true); - } - - public AsyncWriteQueue(AsyncConsumer consumeAction, boolean doFullDrain) { - this.consumeAction = consumeAction; - this.doFullDrain = doFullDrain; - } - - public void put(ByteBufferReference[] e) throws IOException { - ensureOpen(); - queue.addLast(e); - } - - public void putFirst(ByteBufferReference[] e) throws IOException { - ensureOpen(); - queue.addFirst(e); - } - - /** - * Returns true if flushing was performed - * @return - * @throws IOException - */ - public boolean flush() throws IOException { - while(true) { - switch (state.get()) { - case IDLE: - if(state.compareAndSet(IDLE, FLUSHING)) { - flushLoop(); - return true; - } - break; - case FLUSHING: - if(state.compareAndSet(FLUSHING, REFLUSHING)) { - return false; - } - break; - case REFLUSHING: - case DELAYED: - return false; - case CLOSED: - throw new IOException("Queue closed"); - } - } - } - - /* - * race invocations of flushDelayed are not allowed. - * flushDelayed should be invoked only from: - * - SelectorManager thread - * - Handshaking thread - */ - public void flushDelayed() throws IOException { - ensureOpen(); - if(!state.compareAndSet(DELAYED, FLUSHING)) { - ensureOpen(); // if CAS failed when close was set - throw proper exception - throw new RuntimeException("Shouldn't happen"); - } - flushLoop(); - } - - private ByteBufferReference[] drain(ByteBufferReference[] prev) { - assert prev != null; - if(doFullDrain) { - ByteBufferReference[] next = queue.poll(); - if(next == null) { - return prev; - } - List drained = new ArrayList<>(); - drained.addAll(Arrays.asList(prev)); - drained.addAll(Arrays.asList(next)); - while ((next = queue.poll()) != null) { - drained.addAll(Arrays.asList(next)); - } - return drained.toArray(new ByteBufferReference[0]); - } else { - return prev; - } - } - - private ByteBufferReference[] drain() { - ByteBufferReference[] next = queue.poll(); - return next == null ? null : drain(next); - } - - private void flushLoop() throws IOException { - ByteBufferReference[] element; - if (delayedElement != null) { - element = drain(delayedElement); - delayedElement = null; - } else { - element = drain(); - } - while(true) { - while (element != null) { - if (!consumeAction.trySend(element, this)) { - return; - } - element = drain(); - } - switch (state.get()) { - case IDLE: - case DELAYED: - throw new RuntimeException("Shouldn't happen"); - case FLUSHING: - if(state.compareAndSet(FLUSHING, IDLE)) { - return; - } - break; - case REFLUSHING: - // We need to check if new elements were put after last poll() and do graceful exit - state.compareAndSet(REFLUSHING, FLUSHING); - break; - case CLOSED: - throw new IOException("Queue closed"); - } - element = drain(); - } - } - - /* - * The methods returns unprocessed chunk of buffers into beginning of the queue. - * Invocation of the method allowed only inside consume callback, - * and consume callback is invoked only when the queue in FLUSHING or REFLUSHING state. - */ - public void setDelayed(ByteBufferReference[] delayedElement) throws IOException { - while(true) { - int state = this.state.get(); - switch (state) { - case IDLE: - case DELAYED: - throw new RuntimeException("Shouldn't happen"); - case FLUSHING: - case REFLUSHING: - if(this.state.compareAndSet(state, DELAYED)) { - this.delayedElement = delayedElement; - return; - } - break; - case CLOSED: - throw new IOException("Queue closed"); - } - } - - } - - private void ensureOpen() throws IOException { - if (state.get() == CLOSED) { - throw new IOException("Queue closed"); - } - } - - @Override - public void close() throws IOException { - state.getAndSet(CLOSED); - } - -}