diff -r 38fac6d0521d -r 42208b2f224e src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Http1AsyncReceiver.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Http1AsyncReceiver.java Tue Feb 06 19:37:56 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,651 +0,0 @@ -/* - * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.incubator.http.internal; - -import java.io.EOFException; -import java.io.IOException; -import java.lang.System.Logger.Level; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.Executor; -import java.util.concurrent.Flow; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import jdk.incubator.http.internal.common.Demand; -import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber; -import jdk.incubator.http.internal.common.SequentialScheduler; -import jdk.incubator.http.internal.common.ConnectionExpiredException; -import jdk.incubator.http.internal.common.Utils; - - -/** - * A helper class that will queue up incoming data until the receiving - * side is ready to handle it. - */ -class Http1AsyncReceiver { - - static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. - final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); - - /** - * A delegate that can asynchronously receive data from an upstream flow, - * parse, it, then possibly transform it and either store it (response - * headers) or possibly pass it to a downstream subscriber (response body). - * Usually, there will be one Http1AsyncDelegate in charge of receiving - * and parsing headers, and another one in charge of receiving, parsing, - * and forwarding body. Each will sequentially subscribe with the - * Http1AsyncReceiver in turn. There may be additional delegates which - * subscribe to the Http1AsyncReceiver, mainly for the purpose of handling - * errors while the connection is busy transmitting the request body and the - * Http1Exchange::readBody method hasn't been called yet, and response - * delegates haven't subscribed yet. - */ - static interface Http1AsyncDelegate { - /** - * Receives and handles a byte buffer reference. - * @param ref A byte buffer reference coming from upstream. - * @return false, if the byte buffer reference should be kept in the queue. - * Usually, this means that either the byte buffer reference - * was handled and parsing is finished, or that the receiver - * didn't handle the byte reference at all. - * There may or may not be any remaining data in the - * byte buffer, and the byte buffer reference must not have - * been cleared. - * true, if the byte buffer reference was fully read and - * more data can be received. - */ - public boolean tryAsyncReceive(ByteBuffer ref); - - /** - * Called when an exception is raised. - * @param ex The raised Throwable. - */ - public void onReadError(Throwable ex); - - /** - * Must be called before any other method on the delegate. - * The subscription can be either used directly by the delegate - * to request more data (e.g. if the delegate is a header parser), - * or can be forwarded to a downstream subscriber (if the delegate - * is a body parser that wraps a response BodySubscriber). - * In all cases, it is the responsibility of the delegate to ensure - * that request(n) and demand.tryDecrement() are called appropriately. - * No data will be sent to {@code tryAsyncReceive} unless - * the subscription has some demand. - * - * @param s A subscription that allows the delegate to control the - * data flow. - */ - public void onSubscribe(AbstractSubscription s); - - /** - * Returns the subscription that was passed to {@code onSubscribe} - * @return the subscription that was passed to {@code onSubscribe}.. - */ - public AbstractSubscription subscription(); - - } - - /** - * A simple subclass of AbstractSubscription that ensures the - * SequentialScheduler will be run when request() is called and demand - * becomes positive again. - */ - private static final class Http1AsyncDelegateSubscription - extends AbstractSubscription - { - private final Runnable onCancel; - private final SequentialScheduler scheduler; - Http1AsyncDelegateSubscription(SequentialScheduler scheduler, - Runnable onCancel) { - this.scheduler = scheduler; - this.onCancel = onCancel; - } - @Override - public void request(long n) { - final Demand demand = demand(); - if (demand.increase(n)) { - scheduler.runOrSchedule(); - } - } - @Override - public void cancel() { onCancel.run();} - } - - private final ConcurrentLinkedDeque queue - = new ConcurrentLinkedDeque<>(); - private final SequentialScheduler scheduler = - SequentialScheduler.synchronizedScheduler(this::flush); - private final Executor executor; - private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber(); - private final AtomicReference pendingDelegateRef; - private final AtomicLong received = new AtomicLong(); - final AtomicBoolean canRequestMore = new AtomicBoolean(); - - private volatile Throwable error; - private volatile Http1AsyncDelegate delegate; - // This reference is only used to prevent early GC of the exchange. - private volatile Http1Exchange owner; - // Only used for checking whether we run on the selector manager thread. - private final HttpClientImpl client; - private boolean retry; - - public Http1AsyncReceiver(Executor executor, Http1Exchange owner) { - this.pendingDelegateRef = new AtomicReference<>(); - this.executor = executor; - this.owner = owner; - this.client = owner.client; - } - - // This is the main loop called by the SequentialScheduler. - // It attempts to empty the queue until the scheduler is stopped, - // or the delegate is unregistered, or the delegate is unable to - // process the data (because it's not ready or already done), which - // it signals by returning 'true'; - private void flush() { - ByteBuffer buf; - try { - assert !client.isSelectorThread() : - "Http1AsyncReceiver::flush should not run in the selector: " - + Thread.currentThread().getName(); - - // First check whether we have a pending delegate that has - // just subscribed, and if so, create a Subscription for it - // and call onSubscribe. - handlePendingDelegate(); - - // Then start emptying the queue, if possible. - while ((buf = queue.peek()) != null) { - Http1AsyncDelegate delegate = this.delegate; - debug.log(Level.DEBUG, "Got %s bytes for delegate %s", - buf.remaining(), delegate); - if (!hasDemand(delegate)) { - // The scheduler will be invoked again later when the demand - // becomes positive. - return; - } - - assert delegate != null; - debug.log(Level.DEBUG, "Forwarding %s bytes to delegate %s", - buf.remaining(), delegate); - // The delegate has demand: feed it the next buffer. - if (!delegate.tryAsyncReceive(buf)) { - final long remaining = buf.remaining(); - debug.log(Level.DEBUG, () -> { - // If the scheduler is stopped, the queue may already - // be empty and the reference may already be released. - String remstr = scheduler.isStopped() ? "" : - " remaining in ref: " - + remaining; - remstr = remstr - + " total remaining: " + remaining(); - return "Delegate done: " + remaining; - }); - canRequestMore.set(false); - // The last buffer parsed may have remaining unparsed bytes. - // Don't take it out of the queue. - return; // done. - } - - // removed parsed buffer from queue, and continue with next - // if available - ByteBuffer parsed = queue.remove(); - canRequestMore.set(queue.isEmpty()); - assert parsed == buf; - } - - // queue is empty: let's see if we should request more - checkRequestMore(); - - } catch (Throwable t) { - Throwable x = error; - if (x == null) error = t; // will be handled in the finally block - debug.log(Level.DEBUG, "Unexpected error caught in flush()", t); - } finally { - // Handles any pending error. - // The most recently subscribed delegate will get the error. - checkForErrors(); - } - } - - /** - * Must be called from within the scheduler main loop. - * Handles any pending errors by calling delegate.onReadError(). - * If the error can be forwarded to the delegate, stops the scheduler. - */ - private void checkForErrors() { - // Handles any pending error. - // The most recently subscribed delegate will get the error. - // If the delegate is null, the error will be handled by the next - // delegate that subscribes. - // If the queue is not empty, wait until it it is empty before - // handling the error. - Http1AsyncDelegate delegate = pendingDelegateRef.get(); - if (delegate == null) delegate = this.delegate; - Throwable x = error; - if (delegate != null && x != null && queue.isEmpty()) { - // forward error only after emptying the queue. - final Object captured = delegate; - debug.log(Level.DEBUG, () -> "flushing " + x - + "\n\t delegate: " + captured - + "\t\t queue.isEmpty: " + queue.isEmpty()); - scheduler.stop(); - delegate.onReadError(x); - } - } - - /** - * Must be called from within the scheduler main loop. - * Figure out whether more data should be requested from the - * Http1TubeSubscriber. - */ - private void checkRequestMore() { - Http1AsyncDelegate delegate = this.delegate; - boolean more = this.canRequestMore.get(); - boolean hasDemand = hasDemand(delegate); - debug.log(Level.DEBUG, () -> "checkRequestMore: " - + "canRequestMore=" + more + ", hasDemand=" + hasDemand - + (delegate == null ? ", delegate=null" : "")); - if (hasDemand) { - subscriber.requestMore(); - } - } - - /** - * Must be called from within the scheduler main loop. - * Return true if the delegate is not null and has some demand. - * @param delegate The Http1AsyncDelegate delegate - * @return true if the delegate is not null and has some demand - */ - private boolean hasDemand(Http1AsyncDelegate delegate) { - if (delegate == null) return false; - AbstractSubscription subscription = delegate.subscription(); - long demand = subscription.demand().get(); - debug.log(Level.DEBUG, "downstream subscription demand is %s", demand); - return demand > 0; - } - - /** - * Must be called from within the scheduler main loop. - * Handles pending delegate subscription. - * Return true if there was some pending delegate subscription and a new - * delegate was subscribed, false otherwise. - * - * @return true if there was some pending delegate subscription and a new - * delegate was subscribed, false otherwise. - */ - private boolean handlePendingDelegate() { - Http1AsyncDelegate pending = pendingDelegateRef.get(); - if (pending != null && pendingDelegateRef.compareAndSet(pending, null)) { - Http1AsyncDelegate delegate = this.delegate; - if (delegate != null) unsubscribe(delegate); - Runnable cancel = () -> { - debug.log(Level.DEBUG, "Downstream subscription cancelled by %s", pending); - // The connection should be closed, as some data may - // be left over in the stream. - try { - setRetryOnError(false); - onReadError(new IOException("subscription cancelled")); - unsubscribe(pending); - } finally { - Http1Exchange exchg = owner; - stop(); - if (exchg != null) exchg.connection().close(); - } - }; - // The subscription created by a delegate is only loosely - // coupled with the upstream subscription. This is partly because - // the header/body parser work with a flow of ByteBuffer, whereas - // we have a flow List upstream. - Http1AsyncDelegateSubscription subscription = - new Http1AsyncDelegateSubscription(scheduler, cancel); - pending.onSubscribe(subscription); - this.delegate = delegate = pending; - final Object captured = delegate; - debug.log(Level.DEBUG, () -> "delegate is now " + captured - + ", demand=" + subscription.demand().get() - + ", canRequestMore=" + canRequestMore.get() - + ", queue.isEmpty=" + queue.isEmpty()); - return true; - } - return false; - } - - synchronized void setRetryOnError(boolean retry) { - this.retry = retry; - } - - void clear() { - debug.log(Level.DEBUG, "cleared"); - this.pendingDelegateRef.set(null); - this.delegate = null; - this.owner = null; - } - - void subscribe(Http1AsyncDelegate delegate) { - synchronized(this) { - pendingDelegateRef.set(delegate); - } - if (queue.isEmpty()) { - canRequestMore.set(true); - } - debug.log(Level.DEBUG, () -> - "Subscribed pending " + delegate + " queue.isEmpty: " - + queue.isEmpty()); - // Everything may have been received already. Make sure - // we parse it. - if (client.isSelectorThread()) { - scheduler.runOrSchedule(executor); - } else { - scheduler.runOrSchedule(); - } - } - - // Used for debugging only! - long remaining() { - return Utils.remaining(queue.toArray(Utils.EMPTY_BB_ARRAY)); - } - - void unsubscribe(Http1AsyncDelegate delegate) { - synchronized(this) { - if (this.delegate == delegate) { - debug.log(Level.DEBUG, "Unsubscribed %s", delegate); - this.delegate = null; - } - } - } - - // Callback: Consumer of ByteBuffer - private void asyncReceive(ByteBuffer buf) { - debug.log(Level.DEBUG, "Putting %s bytes into the queue", buf.remaining()); - received.addAndGet(buf.remaining()); - queue.offer(buf); - - // This callback is called from within the selector thread. - // Use an executor here to avoid doing the heavy lifting in the - // selector. - scheduler.runOrSchedule(executor); - } - - // Callback: Consumer of Throwable - void onReadError(Throwable ex) { - Http1AsyncDelegate delegate; - Throwable recorded; - debug.log(Level.DEBUG, "onError: %s", (Object) ex); - synchronized (this) { - delegate = this.delegate; - recorded = error; - if (recorded == null) { - // retry is set to true by HttpExchange when the connection is - // already connected, which means it's been retrieved from - // the pool. - if (retry && (ex instanceof IOException)) { - // could be either EOFException, or - // IOException("connection reset by peer), or - // SSLHandshakeException resulting from the server having - // closed the SSL session. - if (received.get() == 0) { - // If we receive such an exception before having - // received any byte, then in this case, we will - // throw ConnectionExpiredException - // to try & force a retry of the request. - retry = false; - ex = new ConnectionExpiredException( - "subscription is finished", ex); - } - } - error = ex; - } - final Throwable t = (recorded == null ? ex : recorded); - debug.log(Level.DEBUG, () -> "recorded " + t - + "\n\t delegate: " + delegate - + "\t\t queue.isEmpty: " + queue.isEmpty(), ex); - } - if (queue.isEmpty() || pendingDelegateRef.get() != null) { - // This callback is called from within the selector thread. - // Use an executor here to avoid doing the heavy lifting in the - // selector. - scheduler.runOrSchedule(executor); - } - } - - void stop() { - debug.log(Level.DEBUG, "stopping"); - scheduler.stop(); - delegate = null; - owner = null; - } - - /** - * Returns the TubeSubscriber for reading from the connection flow. - * @return the TubeSubscriber for reading from the connection flow. - */ - TubeSubscriber subscriber() { - return subscriber; - } - - /** - * A simple tube subscriber for reading from the connection flow. - */ - final class Http1TubeSubscriber implements TubeSubscriber { - volatile Flow.Subscription subscription; - volatile boolean completed; - volatile boolean dropped; - - public void onSubscribe(Flow.Subscription subscription) { - // supports being called multiple time. - // doesn't cancel the previous subscription, since that is - // most probably the same as the new subscription. - assert this.subscription == null || dropped == false; - this.subscription = subscription; - dropped = false; - canRequestMore.set(true); - if (delegate != null) { - scheduler.runOrSchedule(executor); - } - } - - void requestMore() { - Flow.Subscription s = subscription; - if (s == null) return; - if (canRequestMore.compareAndSet(true, false)) { - if (!completed && !dropped) { - debug.log(Level.DEBUG, - "Http1TubeSubscriber: requesting one more from upstream"); - s.request(1); - return; - } - } - debug.log(Level.DEBUG, "Http1TubeSubscriber: no need to request more"); - } - - @Override - public void onNext(List item) { - canRequestMore.set(item.isEmpty()); - for (ByteBuffer buffer : item) { - asyncReceive(buffer); - } - } - - @Override - public void onError(Throwable throwable) { - onReadError(throwable); - completed = true; - } - - @Override - public void onComplete() { - onReadError(new EOFException("EOF reached while reading")); - completed = true; - } - - public void dropSubscription() { - debug.log(Level.DEBUG, "Http1TubeSubscriber: dropSubscription"); - // we could probably set subscription to null here... - // then we might not need the 'dropped' boolean? - dropped = true; - } - - } - - // Drains the content of the queue into a single ByteBuffer. - // The scheduler must be permanently stopped before calling drain(). - ByteBuffer drain(ByteBuffer initial) { - // Revisit: need to clean that up. - // - ByteBuffer b = initial = (initial == null ? Utils.EMPTY_BYTEBUFFER : initial); - assert scheduler.isStopped(); - - if (queue.isEmpty()) return b; - - // sanity check: we shouldn't have queued the same - // buffer twice. - ByteBuffer[] qbb = queue.toArray(new ByteBuffer[queue.size()]); - assert java.util.stream.Stream.of(qbb) - .collect(Collectors.toSet()) - .size() == qbb.length : debugQBB(qbb); - - // compute the number of bytes in the queue, the number of bytes - // in the initial buffer - // TODO: will need revisiting - as it is not guaranteed that all - // data will fit in single BB! - int size = Utils.remaining(qbb, Integer.MAX_VALUE); - int remaining = b.remaining(); - int free = b.capacity() - b.position() - remaining; - debug.log(Level.DEBUG, - "Flushing %s bytes from queue into initial buffer (remaining=%s, free=%s)", - size, remaining, free); - - // check whether the initial buffer has enough space - if (size > free) { - debug.log(Level.DEBUG, - "Allocating new buffer for initial: %s", (size + remaining)); - // allocates a new buffer and copy initial to it - b = ByteBuffer.allocate(size + remaining); - Utils.copy(initial, b); - assert b.position() == remaining; - b.flip(); - assert b.position() == 0; - assert b.limit() == remaining; - assert b.remaining() == remaining; - } - - // store position and limit - int pos = b.position(); - int limit = b.limit(); - assert limit - pos == remaining; - assert b.capacity() >= remaining + size - : "capacity: " + b.capacity() - + ", remaining: " + b.remaining() - + ", size: " + size; - - // prepare to copy the content of the queue - b.position(limit); - b.limit(pos + remaining + size); - assert b.remaining() >= size : - "remaining: " + b.remaining() + ", size: " + size; - - // copy the content of the queue - int count = 0; - for (int i=0; i= r : "need at least " + r + " only " - + b.remaining() + " available"; - int copied = Utils.copy(b2, b); - assert copied == r : "copied="+copied+" available="+r; - assert b2.remaining() == 0; - count += copied; - } - assert count == size; - assert b.position() == pos + remaining + size : - "b.position="+b.position()+" != "+pos+"+"+remaining+"+"+size; - - // reset limit and position - b.limit(limit+size); - b.position(pos); - - // we can clear the refs - queue.clear(); - final ByteBuffer bb = b; - debug.log(Level.DEBUG, () -> "Initial buffer now has " + bb.remaining() - + " pos=" + bb.position() + " limit=" + bb.limit()); - - return b; - } - - private String debugQBB(ByteBuffer[] qbb) { - StringBuilder msg = new StringBuilder(); - List lbb = Arrays.asList(qbb); - Set sbb = new HashSet<>(Arrays.asList(qbb)); - - int uniquebb = sbb.size(); - msg.append("qbb: ").append(lbb.size()) - .append(" (unique: ").append(uniquebb).append("), ") - .append("duplicates: "); - String sep = ""; - for (ByteBuffer b : lbb) { - if (!sbb.remove(b)) { - msg.append(sep) - .append(String.valueOf(b)) - .append("[remaining=") - .append(b.remaining()) - .append(", position=") - .append(b.position()) - .append(", capacity=") - .append(b.capacity()) - .append("]"); - sep = ", "; - } - } - return msg.toString(); - } - - volatile String dbgTag; - String dbgString() { - String tag = dbgTag; - if (tag == null) { - String flowTag = null; - Http1Exchange exchg = owner; - Object flow = (exchg != null) - ? exchg.connection().getConnectionFlow() - : null; - flowTag = tag = flow == null ? null: (String.valueOf(flow)); - if (flowTag != null) { - dbgTag = tag = flowTag + " Http1AsyncReceiver"; - } else { - tag = "Http1AsyncReceiver"; - } - } - return tag; - } -}