--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1AsyncReceiver.java Tue Feb 06 11:39:55 2018 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,651 +0,0 @@
-/*
- * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.lang.System.Logger.Level;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Flow;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-import jdk.incubator.http.internal.common.Demand;
-import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
-import jdk.incubator.http.internal.common.SequentialScheduler;
-import jdk.incubator.http.internal.common.ConnectionExpiredException;
-import jdk.incubator.http.internal.common.Utils;
-
-
-/**
- * A helper class that will queue up incoming data until the receiving
- * side is ready to handle it.
- */
-class Http1AsyncReceiver {
-
- static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
- final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
-
- /**
- * A delegate that can asynchronously receive data from an upstream flow,
- * parse, it, then possibly transform it and either store it (response
- * headers) or possibly pass it to a downstream subscriber (response body).
- * Usually, there will be one Http1AsyncDelegate in charge of receiving
- * and parsing headers, and another one in charge of receiving, parsing,
- * and forwarding body. Each will sequentially subscribe with the
- * Http1AsyncReceiver in turn. There may be additional delegates which
- * subscribe to the Http1AsyncReceiver, mainly for the purpose of handling
- * errors while the connection is busy transmitting the request body and the
- * Http1Exchange::readBody method hasn't been called yet, and response
- * delegates haven't subscribed yet.
- */
- static interface Http1AsyncDelegate {
- /**
- * Receives and handles a byte buffer reference.
- * @param ref A byte buffer reference coming from upstream.
- * @return false, if the byte buffer reference should be kept in the queue.
- * Usually, this means that either the byte buffer reference
- * was handled and parsing is finished, or that the receiver
- * didn't handle the byte reference at all.
- * There may or may not be any remaining data in the
- * byte buffer, and the byte buffer reference must not have
- * been cleared.
- * true, if the byte buffer reference was fully read and
- * more data can be received.
- */
- public boolean tryAsyncReceive(ByteBuffer ref);
-
- /**
- * Called when an exception is raised.
- * @param ex The raised Throwable.
- */
- public void onReadError(Throwable ex);
-
- /**
- * Must be called before any other method on the delegate.
- * The subscription can be either used directly by the delegate
- * to request more data (e.g. if the delegate is a header parser),
- * or can be forwarded to a downstream subscriber (if the delegate
- * is a body parser that wraps a response BodySubscriber).
- * In all cases, it is the responsibility of the delegate to ensure
- * that request(n) and demand.tryDecrement() are called appropriately.
- * No data will be sent to {@code tryAsyncReceive} unless
- * the subscription has some demand.
- *
- * @param s A subscription that allows the delegate to control the
- * data flow.
- */
- public void onSubscribe(AbstractSubscription s);
-
- /**
- * Returns the subscription that was passed to {@code onSubscribe}
- * @return the subscription that was passed to {@code onSubscribe}..
- */
- public AbstractSubscription subscription();
-
- }
-
- /**
- * A simple subclass of AbstractSubscription that ensures the
- * SequentialScheduler will be run when request() is called and demand
- * becomes positive again.
- */
- private static final class Http1AsyncDelegateSubscription
- extends AbstractSubscription
- {
- private final Runnable onCancel;
- private final SequentialScheduler scheduler;
- Http1AsyncDelegateSubscription(SequentialScheduler scheduler,
- Runnable onCancel) {
- this.scheduler = scheduler;
- this.onCancel = onCancel;
- }
- @Override
- public void request(long n) {
- final Demand demand = demand();
- if (demand.increase(n)) {
- scheduler.runOrSchedule();
- }
- }
- @Override
- public void cancel() { onCancel.run();}
- }
-
- private final ConcurrentLinkedDeque<ByteBuffer> queue
- = new ConcurrentLinkedDeque<>();
- private final SequentialScheduler scheduler =
- SequentialScheduler.synchronizedScheduler(this::flush);
- private final Executor executor;
- private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber();
- private final AtomicReference<Http1AsyncDelegate> pendingDelegateRef;
- private final AtomicLong received = new AtomicLong();
- final AtomicBoolean canRequestMore = new AtomicBoolean();
-
- private volatile Throwable error;
- private volatile Http1AsyncDelegate delegate;
- // This reference is only used to prevent early GC of the exchange.
- private volatile Http1Exchange<?> owner;
- // Only used for checking whether we run on the selector manager thread.
- private final HttpClientImpl client;
- private boolean retry;
-
- public Http1AsyncReceiver(Executor executor, Http1Exchange<?> owner) {
- this.pendingDelegateRef = new AtomicReference<>();
- this.executor = executor;
- this.owner = owner;
- this.client = owner.client;
- }
-
- // This is the main loop called by the SequentialScheduler.
- // It attempts to empty the queue until the scheduler is stopped,
- // or the delegate is unregistered, or the delegate is unable to
- // process the data (because it's not ready or already done), which
- // it signals by returning 'true';
- private void flush() {
- ByteBuffer buf;
- try {
- assert !client.isSelectorThread() :
- "Http1AsyncReceiver::flush should not run in the selector: "
- + Thread.currentThread().getName();
-
- // First check whether we have a pending delegate that has
- // just subscribed, and if so, create a Subscription for it
- // and call onSubscribe.
- handlePendingDelegate();
-
- // Then start emptying the queue, if possible.
- while ((buf = queue.peek()) != null) {
- Http1AsyncDelegate delegate = this.delegate;
- debug.log(Level.DEBUG, "Got %s bytes for delegate %s",
- buf.remaining(), delegate);
- if (!hasDemand(delegate)) {
- // The scheduler will be invoked again later when the demand
- // becomes positive.
- return;
- }
-
- assert delegate != null;
- debug.log(Level.DEBUG, "Forwarding %s bytes to delegate %s",
- buf.remaining(), delegate);
- // The delegate has demand: feed it the next buffer.
- if (!delegate.tryAsyncReceive(buf)) {
- final long remaining = buf.remaining();
- debug.log(Level.DEBUG, () -> {
- // If the scheduler is stopped, the queue may already
- // be empty and the reference may already be released.
- String remstr = scheduler.isStopped() ? "" :
- " remaining in ref: "
- + remaining;
- remstr = remstr
- + " total remaining: " + remaining();
- return "Delegate done: " + remaining;
- });
- canRequestMore.set(false);
- // The last buffer parsed may have remaining unparsed bytes.
- // Don't take it out of the queue.
- return; // done.
- }
-
- // removed parsed buffer from queue, and continue with next
- // if available
- ByteBuffer parsed = queue.remove();
- canRequestMore.set(queue.isEmpty());
- assert parsed == buf;
- }
-
- // queue is empty: let's see if we should request more
- checkRequestMore();
-
- } catch (Throwable t) {
- Throwable x = error;
- if (x == null) error = t; // will be handled in the finally block
- debug.log(Level.DEBUG, "Unexpected error caught in flush()", t);
- } finally {
- // Handles any pending error.
- // The most recently subscribed delegate will get the error.
- checkForErrors();
- }
- }
-
- /**
- * Must be called from within the scheduler main loop.
- * Handles any pending errors by calling delegate.onReadError().
- * If the error can be forwarded to the delegate, stops the scheduler.
- */
- private void checkForErrors() {
- // Handles any pending error.
- // The most recently subscribed delegate will get the error.
- // If the delegate is null, the error will be handled by the next
- // delegate that subscribes.
- // If the queue is not empty, wait until it it is empty before
- // handling the error.
- Http1AsyncDelegate delegate = pendingDelegateRef.get();
- if (delegate == null) delegate = this.delegate;
- Throwable x = error;
- if (delegate != null && x != null && queue.isEmpty()) {
- // forward error only after emptying the queue.
- final Object captured = delegate;
- debug.log(Level.DEBUG, () -> "flushing " + x
- + "\n\t delegate: " + captured
- + "\t\t queue.isEmpty: " + queue.isEmpty());
- scheduler.stop();
- delegate.onReadError(x);
- }
- }
-
- /**
- * Must be called from within the scheduler main loop.
- * Figure out whether more data should be requested from the
- * Http1TubeSubscriber.
- */
- private void checkRequestMore() {
- Http1AsyncDelegate delegate = this.delegate;
- boolean more = this.canRequestMore.get();
- boolean hasDemand = hasDemand(delegate);
- debug.log(Level.DEBUG, () -> "checkRequestMore: "
- + "canRequestMore=" + more + ", hasDemand=" + hasDemand
- + (delegate == null ? ", delegate=null" : ""));
- if (hasDemand) {
- subscriber.requestMore();
- }
- }
-
- /**
- * Must be called from within the scheduler main loop.
- * Return true if the delegate is not null and has some demand.
- * @param delegate The Http1AsyncDelegate delegate
- * @return true if the delegate is not null and has some demand
- */
- private boolean hasDemand(Http1AsyncDelegate delegate) {
- if (delegate == null) return false;
- AbstractSubscription subscription = delegate.subscription();
- long demand = subscription.demand().get();
- debug.log(Level.DEBUG, "downstream subscription demand is %s", demand);
- return demand > 0;
- }
-
- /**
- * Must be called from within the scheduler main loop.
- * Handles pending delegate subscription.
- * Return true if there was some pending delegate subscription and a new
- * delegate was subscribed, false otherwise.
- *
- * @return true if there was some pending delegate subscription and a new
- * delegate was subscribed, false otherwise.
- */
- private boolean handlePendingDelegate() {
- Http1AsyncDelegate pending = pendingDelegateRef.get();
- if (pending != null && pendingDelegateRef.compareAndSet(pending, null)) {
- Http1AsyncDelegate delegate = this.delegate;
- if (delegate != null) unsubscribe(delegate);
- Runnable cancel = () -> {
- debug.log(Level.DEBUG, "Downstream subscription cancelled by %s", pending);
- // The connection should be closed, as some data may
- // be left over in the stream.
- try {
- setRetryOnError(false);
- onReadError(new IOException("subscription cancelled"));
- unsubscribe(pending);
- } finally {
- Http1Exchange<?> exchg = owner;
- stop();
- if (exchg != null) exchg.connection().close();
- }
- };
- // The subscription created by a delegate is only loosely
- // coupled with the upstream subscription. This is partly because
- // the header/body parser work with a flow of ByteBuffer, whereas
- // we have a flow List<ByteBuffer> upstream.
- Http1AsyncDelegateSubscription subscription =
- new Http1AsyncDelegateSubscription(scheduler, cancel);
- pending.onSubscribe(subscription);
- this.delegate = delegate = pending;
- final Object captured = delegate;
- debug.log(Level.DEBUG, () -> "delegate is now " + captured
- + ", demand=" + subscription.demand().get()
- + ", canRequestMore=" + canRequestMore.get()
- + ", queue.isEmpty=" + queue.isEmpty());
- return true;
- }
- return false;
- }
-
- synchronized void setRetryOnError(boolean retry) {
- this.retry = retry;
- }
-
- void clear() {
- debug.log(Level.DEBUG, "cleared");
- this.pendingDelegateRef.set(null);
- this.delegate = null;
- this.owner = null;
- }
-
- void subscribe(Http1AsyncDelegate delegate) {
- synchronized(this) {
- pendingDelegateRef.set(delegate);
- }
- if (queue.isEmpty()) {
- canRequestMore.set(true);
- }
- debug.log(Level.DEBUG, () ->
- "Subscribed pending " + delegate + " queue.isEmpty: "
- + queue.isEmpty());
- // Everything may have been received already. Make sure
- // we parse it.
- if (client.isSelectorThread()) {
- scheduler.runOrSchedule(executor);
- } else {
- scheduler.runOrSchedule();
- }
- }
-
- // Used for debugging only!
- long remaining() {
- return Utils.remaining(queue.toArray(Utils.EMPTY_BB_ARRAY));
- }
-
- void unsubscribe(Http1AsyncDelegate delegate) {
- synchronized(this) {
- if (this.delegate == delegate) {
- debug.log(Level.DEBUG, "Unsubscribed %s", delegate);
- this.delegate = null;
- }
- }
- }
-
- // Callback: Consumer of ByteBuffer
- private void asyncReceive(ByteBuffer buf) {
- debug.log(Level.DEBUG, "Putting %s bytes into the queue", buf.remaining());
- received.addAndGet(buf.remaining());
- queue.offer(buf);
-
- // This callback is called from within the selector thread.
- // Use an executor here to avoid doing the heavy lifting in the
- // selector.
- scheduler.runOrSchedule(executor);
- }
-
- // Callback: Consumer of Throwable
- void onReadError(Throwable ex) {
- Http1AsyncDelegate delegate;
- Throwable recorded;
- debug.log(Level.DEBUG, "onError: %s", (Object) ex);
- synchronized (this) {
- delegate = this.delegate;
- recorded = error;
- if (recorded == null) {
- // retry is set to true by HttpExchange when the connection is
- // already connected, which means it's been retrieved from
- // the pool.
- if (retry && (ex instanceof IOException)) {
- // could be either EOFException, or
- // IOException("connection reset by peer), or
- // SSLHandshakeException resulting from the server having
- // closed the SSL session.
- if (received.get() == 0) {
- // If we receive such an exception before having
- // received any byte, then in this case, we will
- // throw ConnectionExpiredException
- // to try & force a retry of the request.
- retry = false;
- ex = new ConnectionExpiredException(
- "subscription is finished", ex);
- }
- }
- error = ex;
- }
- final Throwable t = (recorded == null ? ex : recorded);
- debug.log(Level.DEBUG, () -> "recorded " + t
- + "\n\t delegate: " + delegate
- + "\t\t queue.isEmpty: " + queue.isEmpty(), ex);
- }
- if (queue.isEmpty() || pendingDelegateRef.get() != null) {
- // This callback is called from within the selector thread.
- // Use an executor here to avoid doing the heavy lifting in the
- // selector.
- scheduler.runOrSchedule(executor);
- }
- }
-
- void stop() {
- debug.log(Level.DEBUG, "stopping");
- scheduler.stop();
- delegate = null;
- owner = null;
- }
-
- /**
- * Returns the TubeSubscriber for reading from the connection flow.
- * @return the TubeSubscriber for reading from the connection flow.
- */
- TubeSubscriber subscriber() {
- return subscriber;
- }
-
- /**
- * A simple tube subscriber for reading from the connection flow.
- */
- final class Http1TubeSubscriber implements TubeSubscriber {
- volatile Flow.Subscription subscription;
- volatile boolean completed;
- volatile boolean dropped;
-
- public void onSubscribe(Flow.Subscription subscription) {
- // supports being called multiple time.
- // doesn't cancel the previous subscription, since that is
- // most probably the same as the new subscription.
- assert this.subscription == null || dropped == false;
- this.subscription = subscription;
- dropped = false;
- canRequestMore.set(true);
- if (delegate != null) {
- scheduler.runOrSchedule(executor);
- }
- }
-
- void requestMore() {
- Flow.Subscription s = subscription;
- if (s == null) return;
- if (canRequestMore.compareAndSet(true, false)) {
- if (!completed && !dropped) {
- debug.log(Level.DEBUG,
- "Http1TubeSubscriber: requesting one more from upstream");
- s.request(1);
- return;
- }
- }
- debug.log(Level.DEBUG, "Http1TubeSubscriber: no need to request more");
- }
-
- @Override
- public void onNext(List<ByteBuffer> item) {
- canRequestMore.set(item.isEmpty());
- for (ByteBuffer buffer : item) {
- asyncReceive(buffer);
- }
- }
-
- @Override
- public void onError(Throwable throwable) {
- onReadError(throwable);
- completed = true;
- }
-
- @Override
- public void onComplete() {
- onReadError(new EOFException("EOF reached while reading"));
- completed = true;
- }
-
- public void dropSubscription() {
- debug.log(Level.DEBUG, "Http1TubeSubscriber: dropSubscription");
- // we could probably set subscription to null here...
- // then we might not need the 'dropped' boolean?
- dropped = true;
- }
-
- }
-
- // Drains the content of the queue into a single ByteBuffer.
- // The scheduler must be permanently stopped before calling drain().
- ByteBuffer drain(ByteBuffer initial) {
- // Revisit: need to clean that up.
- //
- ByteBuffer b = initial = (initial == null ? Utils.EMPTY_BYTEBUFFER : initial);
- assert scheduler.isStopped();
-
- if (queue.isEmpty()) return b;
-
- // sanity check: we shouldn't have queued the same
- // buffer twice.
- ByteBuffer[] qbb = queue.toArray(new ByteBuffer[queue.size()]);
- assert java.util.stream.Stream.of(qbb)
- .collect(Collectors.toSet())
- .size() == qbb.length : debugQBB(qbb);
-
- // compute the number of bytes in the queue, the number of bytes
- // in the initial buffer
- // TODO: will need revisiting - as it is not guaranteed that all
- // data will fit in single BB!
- int size = Utils.remaining(qbb, Integer.MAX_VALUE);
- int remaining = b.remaining();
- int free = b.capacity() - b.position() - remaining;
- debug.log(Level.DEBUG,
- "Flushing %s bytes from queue into initial buffer (remaining=%s, free=%s)",
- size, remaining, free);
-
- // check whether the initial buffer has enough space
- if (size > free) {
- debug.log(Level.DEBUG,
- "Allocating new buffer for initial: %s", (size + remaining));
- // allocates a new buffer and copy initial to it
- b = ByteBuffer.allocate(size + remaining);
- Utils.copy(initial, b);
- assert b.position() == remaining;
- b.flip();
- assert b.position() == 0;
- assert b.limit() == remaining;
- assert b.remaining() == remaining;
- }
-
- // store position and limit
- int pos = b.position();
- int limit = b.limit();
- assert limit - pos == remaining;
- assert b.capacity() >= remaining + size
- : "capacity: " + b.capacity()
- + ", remaining: " + b.remaining()
- + ", size: " + size;
-
- // prepare to copy the content of the queue
- b.position(limit);
- b.limit(pos + remaining + size);
- assert b.remaining() >= size :
- "remaining: " + b.remaining() + ", size: " + size;
-
- // copy the content of the queue
- int count = 0;
- for (int i=0; i<qbb.length; i++) {
- ByteBuffer b2 = qbb[i];
- int r = b2.remaining();
- assert b.remaining() >= r : "need at least " + r + " only "
- + b.remaining() + " available";
- int copied = Utils.copy(b2, b);
- assert copied == r : "copied="+copied+" available="+r;
- assert b2.remaining() == 0;
- count += copied;
- }
- assert count == size;
- assert b.position() == pos + remaining + size :
- "b.position="+b.position()+" != "+pos+"+"+remaining+"+"+size;
-
- // reset limit and position
- b.limit(limit+size);
- b.position(pos);
-
- // we can clear the refs
- queue.clear();
- final ByteBuffer bb = b;
- debug.log(Level.DEBUG, () -> "Initial buffer now has " + bb.remaining()
- + " pos=" + bb.position() + " limit=" + bb.limit());
-
- return b;
- }
-
- private String debugQBB(ByteBuffer[] qbb) {
- StringBuilder msg = new StringBuilder();
- List<ByteBuffer> lbb = Arrays.asList(qbb);
- Set<ByteBuffer> sbb = new HashSet<>(Arrays.asList(qbb));
-
- int uniquebb = sbb.size();
- msg.append("qbb: ").append(lbb.size())
- .append(" (unique: ").append(uniquebb).append("), ")
- .append("duplicates: ");
- String sep = "";
- for (ByteBuffer b : lbb) {
- if (!sbb.remove(b)) {
- msg.append(sep)
- .append(String.valueOf(b))
- .append("[remaining=")
- .append(b.remaining())
- .append(", position=")
- .append(b.position())
- .append(", capacity=")
- .append(b.capacity())
- .append("]");
- sep = ", ";
- }
- }
- return msg.toString();
- }
-
- volatile String dbgTag;
- String dbgString() {
- String tag = dbgTag;
- if (tag == null) {
- String flowTag = null;
- Http1Exchange<?> exchg = owner;
- Object flow = (exchg != null)
- ? exchg.connection().getConnectionFlow()
- : null;
- flowTag = tag = flow == null ? null: (String.valueOf(flow));
- if (flowTag != null) {
- dbgTag = tag = flowTag + " Http1AsyncReceiver";
- } else {
- tag = "Http1AsyncReceiver";
- }
- }
- return tag;
- }
-}