--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Stream.java Tue Feb 06 19:37:56 2018 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,1180 +0,0 @@
-/*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal;
-
-import java.io.IOException;
-import java.lang.System.Logger.Level;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Flow;
-import java.util.concurrent.Flow.Subscription;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiPredicate;
-import jdk.incubator.http.HttpClient;
-import jdk.incubator.http.HttpHeaders;
-import jdk.incubator.http.HttpRequest;
-import jdk.incubator.http.HttpResponse;
-import jdk.incubator.http.HttpResponse.BodySubscriber;
-import jdk.incubator.http.internal.common.*;
-import jdk.incubator.http.internal.frame.*;
-import jdk.incubator.http.internal.hpack.DecodingCallback;
-
-/**
- * Http/2 Stream handling.
- *
- * REQUESTS
- *
- * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
- *
- * sendRequest() -- sendHeadersOnly() + sendBody()
- *
- * sendBodyAsync() -- calls sendBody() in an executor thread.
- *
- * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
- *
- * sendRequestAsync() -- calls sendRequest() in an executor thread
- *
- * RESPONSES
- *
- * Multiple responses can be received per request. Responses are queued up on
- * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
- * with the next response
- *
- * getResponseAsync() -- queries list of response CFs and returns first one
- * if one exists. Otherwise, creates one and adds it to list
- * and returns it. Completion is achieved through the
- * incoming() upcall from connection reader thread.
- *
- * getResponse() -- calls getResponseAsync() and waits for CF to complete
- *
- * responseBodyAsync() -- calls responseBody() in an executor thread.
- *
- * incoming() -- entry point called from connection reader thread. Frames are
- * either handled immediately without blocking or for data frames
- * placed on the stream's inputQ which is consumed by the stream's
- * reader thread.
- *
- * PushedStream sub class
- * ======================
- * Sending side methods are not used because the request comes from a PUSH_PROMISE
- * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
- * is created. PushedStream does not use responseCF list as there can be only
- * one response. The CF is created when the object created and when the response
- * HEADERS frame is received the object is completed.
- */
-class Stream<T> extends ExchangeImpl<T> {
-
- final static boolean DEBUG = Utils.DEBUG; // Revisit: temporary developer's flag
- final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
-
- final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
- final SequentialScheduler sched =
- SequentialScheduler.synchronizedScheduler(this::schedule);
- final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel);
-
- /**
- * This stream's identifier. Assigned lazily by the HTTP2Connection before
- * the stream's first frame is sent.
- */
- protected volatile int streamid;
-
- long requestContentLen;
-
- final Http2Connection connection;
- final HttpRequestImpl request;
- final DecodingCallback rspHeadersConsumer;
- HttpHeadersImpl responseHeaders;
- final HttpHeadersImpl requestPseudoHeaders;
- volatile HttpResponse.BodySubscriber<T> responseSubscriber;
- final HttpRequest.BodyPublisher requestPublisher;
- volatile RequestSubscriber requestSubscriber;
- volatile int responseCode;
- volatile Response response;
- volatile Throwable failed; // The exception with which this stream was canceled.
- final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
- volatile CompletableFuture<T> responseBodyCF;
-
- /** True if END_STREAM has been seen in a frame received on this stream. */
- private volatile boolean remotelyClosed;
- private volatile boolean closed;
- private volatile boolean endStreamSent;
-
- // state flags
- private boolean requestSent, responseReceived;
-
- /**
- * A reference to this Stream's connection Send Window controller. The
- * stream MUST acquire the appropriate amount of Send Window before
- * sending any data. Will be null for PushStreams, as they cannot send data.
- */
- private final WindowController windowController;
- private final WindowUpdateSender windowUpdater;
-
- @Override
- HttpConnection connection() {
- return connection.connection;
- }
-
- /**
- * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
- * of after user subscription window has re-opened, from SubscriptionBase.request()
- */
- private void schedule() {
- if (responseSubscriber == null)
- // can't process anything yet
- return;
-
- try {
- while (!inputQ.isEmpty()) {
- Http2Frame frame = inputQ.peek();
- if (frame instanceof ResetFrame) {
- inputQ.remove();
- handleReset((ResetFrame)frame);
- return;
- }
- DataFrame df = (DataFrame)frame;
- boolean finished = df.getFlag(DataFrame.END_STREAM);
-
- List<ByteBuffer> buffers = df.getData();
- List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
- int size = Utils.remaining(dsts, Integer.MAX_VALUE);
- if (size == 0 && finished) {
- inputQ.remove();
- Log.logTrace("responseSubscriber.onComplete");
- debug.log(Level.DEBUG, "incoming: onComplete");
- sched.stop();
- responseSubscriber.onComplete();
- setEndStreamReceived();
- return;
- } else if (userSubscription.tryDecrement()) {
- inputQ.remove();
- Log.logTrace("responseSubscriber.onNext {0}", size);
- debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
- responseSubscriber.onNext(dsts);
- if (consumed(df)) {
- Log.logTrace("responseSubscriber.onComplete");
- debug.log(Level.DEBUG, "incoming: onComplete");
- sched.stop();
- responseSubscriber.onComplete();
- setEndStreamReceived();
- return;
- }
- } else {
- return;
- }
- }
- } catch (Throwable throwable) {
- failed = throwable;
- }
-
- Throwable t = failed;
- if (t != null) {
- sched.stop();
- responseSubscriber.onError(t);
- close();
- }
- }
-
- // Callback invoked after the Response BodySubscriber has consumed the
- // buffers contained in a DataFrame.
- // Returns true if END_STREAM is reached, false otherwise.
- private boolean consumed(DataFrame df) {
- // RFC 7540 6.1:
- // The entire DATA frame payload is included in flow control,
- // including the Pad Length and Padding fields if present
- int len = df.payloadLength();
- connection.windowUpdater.update(len);
-
- if (!df.getFlag(DataFrame.END_STREAM)) {
- // Don't send window update on a stream which is
- // closed or half closed.
- windowUpdater.update(len);
- return false; // more data coming
- }
- return true; // end of stream
- }
-
- @Override
- CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
- boolean returnConnectionToPool,
- Executor executor)
- {
- Log.logTrace("Reading body on stream {0}", streamid);
- BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
- CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
-
- PushGroup<?> pg = exchange.getPushGroup();
- if (pg != null) {
- // if an error occurs make sure it is recorded in the PushGroup
- cf = cf.whenComplete((t,e) -> pg.pushError(e));
- }
- return cf;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("streamid: ")
- .append(streamid);
- return sb.toString();
- }
-
- private void receiveDataFrame(DataFrame df) {
- inputQ.add(df);
- sched.runOrSchedule();
- }
-
- /** Handles a RESET frame. RESET is always handled inline in the queue. */
- private void receiveResetFrame(ResetFrame frame) {
- inputQ.add(frame);
- sched.runOrSchedule();
- }
-
- // pushes entire response body into response subscriber
- // blocking when required by local or remote flow control
- CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
- responseBodyCF = new MinimalFuture<>();
- // We want to allow the subscriber's getBody() method to block so it
- // can work with InputStreams. So, we offload execution.
- executor.execute(() -> {
- bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
- if (t == null)
- responseBodyCF.complete(body);
- else
- responseBodyCF.completeExceptionally(t);
- });
- });
-
- if (isCanceled()) {
- Throwable t = getCancelCause();
- responseBodyCF.completeExceptionally(t);
- } else {
- bodySubscriber.onSubscribe(userSubscription);
- }
- // Set the responseSubscriber field now that onSubscribe has been called.
- // This effectively allows the scheduler to start invoking the callbacks.
- responseSubscriber = bodySubscriber;
- sched.runOrSchedule(); // in case data waiting already to be processed
- return responseBodyCF;
- }
-
- @Override
- CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
- return sendBodyImpl().thenApply( v -> this);
- }
-
- @SuppressWarnings("unchecked")
- Stream(Http2Connection connection,
- Exchange<T> e,
- WindowController windowController)
- {
- super(e);
- this.connection = connection;
- this.windowController = windowController;
- this.request = e.request();
- this.requestPublisher = request.requestPublisher; // may be null
- responseHeaders = new HttpHeadersImpl();
- rspHeadersConsumer = (name, value) -> {
- responseHeaders.addHeader(name.toString(), value.toString());
- if (Log.headers() && Log.trace()) {
- Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
- streamid, name, value);
- }
- };
- this.requestPseudoHeaders = new HttpHeadersImpl();
- // NEW
- this.windowUpdater = new StreamWindowUpdateSender(connection);
- }
-
- /**
- * Entry point from Http2Connection reader thread.
- *
- * Data frames will be removed by response body thread.
- */
- void incoming(Http2Frame frame) throws IOException {
- debug.log(Level.DEBUG, "incoming: %s", frame);
- if ((frame instanceof HeaderFrame)) {
- HeaderFrame hframe = (HeaderFrame)frame;
- if (hframe.endHeaders()) {
- Log.logTrace("handling response (streamid={0})", streamid);
- handleResponse();
- if (hframe.getFlag(HeaderFrame.END_STREAM)) {
- receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
- }
- }
- } else if (frame instanceof DataFrame) {
- receiveDataFrame((DataFrame)frame);
- } else {
- otherFrame(frame);
- }
- }
-
- void otherFrame(Http2Frame frame) throws IOException {
- switch (frame.type()) {
- case WindowUpdateFrame.TYPE:
- incoming_windowUpdate((WindowUpdateFrame) frame);
- break;
- case ResetFrame.TYPE:
- incoming_reset((ResetFrame) frame);
- break;
- case PriorityFrame.TYPE:
- incoming_priority((PriorityFrame) frame);
- break;
- default:
- String msg = "Unexpected frame: " + frame.toString();
- throw new IOException(msg);
- }
- }
-
- // The Hpack decoder decodes into one of these consumers of name,value pairs
-
- DecodingCallback rspHeadersConsumer() {
- return rspHeadersConsumer;
- }
-
- protected void handleResponse() throws IOException {
- responseCode = (int)responseHeaders
- .firstValueAsLong(":status")
- .orElseThrow(() -> new IOException("no statuscode in response"));
-
- response = new Response(
- request, exchange, responseHeaders,
- responseCode, HttpClient.Version.HTTP_2);
-
- /* TODO: review if needs to be removed
- the value is not used, but in case `content-length` doesn't parse as
- long, there will be NumberFormatException. If left as is, make sure
- code up the stack handles NFE correctly. */
- responseHeaders.firstValueAsLong("content-length");
-
- if (Log.headers()) {
- StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
- Log.dumpHeaders(sb, " ", responseHeaders);
- Log.logHeaders(sb.toString());
- }
-
- completeResponse(response);
- }
-
- void incoming_reset(ResetFrame frame) {
- Log.logTrace("Received RST_STREAM on stream {0}", streamid);
- if (endStreamReceived()) {
- Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
- } else if (closed) {
- Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
- } else {
- // put it in the input queue in order to read all
- // pending data frames first. Indeed, a server may send
- // RST_STREAM after sending END_STREAM, in which case we should
- // ignore it. However, we won't know if we have received END_STREAM
- // or not until all pending data frames are read.
- receiveResetFrame(frame);
- // RST_STREAM was pushed to the queue. It will be handled by
- // asyncReceive after all pending data frames have been
- // processed.
- Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
- }
- }
-
- void handleReset(ResetFrame frame) {
- Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
- if (!closed) {
- close();
- int error = frame.getErrorCode();
- completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error)));
- } else {
- Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
- }
- }
-
- void incoming_priority(PriorityFrame frame) {
- // TODO: implement priority
- throw new UnsupportedOperationException("Not implemented");
- }
-
- private void incoming_windowUpdate(WindowUpdateFrame frame)
- throws IOException
- {
- int amount = frame.getUpdate();
- if (amount <= 0) {
- Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
- streamid, streamid, amount);
- connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
- } else {
- assert streamid != 0;
- boolean success = windowController.increaseStreamWindow(amount, streamid);
- if (!success) { // overflow
- connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
- }
- }
- }
-
- void incoming_pushPromise(HttpRequestImpl pushRequest,
- PushedStream<T> pushStream)
- throws IOException
- {
- if (Log.requests()) {
- Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
- }
- PushGroup<T> pushGroup = exchange.getPushGroup();
- if (pushGroup == null) {
- Log.logTrace("Rejecting push promise stream " + streamid);
- connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
- pushStream.close();
- return;
- }
-
- PushGroup.Acceptor<T> acceptor = pushGroup.acceptPushRequest(pushRequest);
-
- if (!acceptor.accepted()) {
- // cancel / reject
- IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
- if (Log.trace()) {
- Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
- ex.getMessage());
- }
- pushStream.cancelImpl(ex);
- return;
- }
-
- CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
- HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
- assert pushHandler != null;
-
- pushStream.requestSent();
- pushStream.setPushHandler(pushHandler); // TODO: could wrap the handler to throw on acceptPushPromise ?
- // setup housekeeping for when the push is received
- // TODO: deal with ignoring of CF anti-pattern
- CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
- cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
- t = Utils.getCompletionCause(t);
- if (Log.trace()) {
- Log.logTrace("Push completed on stream {0} for {1}{2}",
- pushStream.streamid, resp,
- ((t==null) ? "": " with exception " + t));
- }
- if (t != null) {
- pushGroup.pushError(t);
- pushResponseCF.completeExceptionally(t);
- } else {
- pushResponseCF.complete(resp);
- }
- pushGroup.pushCompleted();
- });
-
- }
-
- private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
- HttpHeadersImpl h = request.getSystemHeaders();
- if (contentLength > 0) {
- h.setHeader("content-length", Long.toString(contentLength));
- }
- setPseudoHeaderFields();
- HttpHeaders sysh = filter(h);
- HttpHeaders userh = filter(request.getUserHeaders());
- OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this);
- if (contentLength == 0) {
- f.setFlag(HeadersFrame.END_STREAM);
- endStreamSent = true;
- }
- return f;
- }
-
- private boolean hasProxyAuthorization(HttpHeaders headers) {
- return headers.firstValue("proxy-authorization")
- .isPresent();
- }
-
- // Determines whether we need to build a new HttpHeader object.
- //
- // Ideally we should pass the filter to OutgoingHeaders refactor the
- // code that creates the HeaderFrame to honor the filter.
- // We're not there yet - so depending on the filter we need to
- // apply and the content of the header we will try to determine
- // whether anything might need to be filtered.
- // If nothing needs filtering then we can just use the
- // original headers.
- private boolean needsFiltering(HttpHeaders headers,
- BiPredicate<String, List<String>> filter) {
- if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) {
- // we're either connecting or proxying
- // slight optimization: we only need to filter out
- // disabled schemes, so if there are none just
- // pass through.
- return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER)
- && hasProxyAuthorization(headers);
- } else {
- // we're talking to a server, either directly or through
- // a tunnel.
- // Slight optimization: we only need to filter out
- // proxy authorization headers, so if there are none just
- // pass through.
- return hasProxyAuthorization(headers);
- }
- }
-
- private HttpHeaders filter(HttpHeaders headers) {
- HttpConnection conn = connection();
- BiPredicate<String, List<String>> filter =
- conn.headerFilter(request);
- if (needsFiltering(headers, filter)) {
- return ImmutableHeaders.of(headers.map(), filter);
- }
- return headers;
- }
-
- private void setPseudoHeaderFields() {
- HttpHeadersImpl hdrs = requestPseudoHeaders;
- String method = request.method();
- hdrs.setHeader(":method", method);
- URI uri = request.uri();
- hdrs.setHeader(":scheme", uri.getScheme());
- // TODO: userinfo deprecated. Needs to be removed
- hdrs.setHeader(":authority", uri.getAuthority());
- // TODO: ensure header names beginning with : not in user headers
- String query = uri.getQuery();
- String path = uri.getPath();
- if (path == null || path.isEmpty()) {
- if (method.equalsIgnoreCase("OPTIONS")) {
- path = "*";
- } else {
- path = "/";
- }
- }
- if (query != null) {
- path += "?" + query;
- }
- hdrs.setHeader(":path", path);
- }
-
- HttpHeadersImpl getRequestPseudoHeaders() {
- return requestPseudoHeaders;
- }
-
- /** Sets endStreamReceived. Should be called only once. */
- void setEndStreamReceived() {
- assert remotelyClosed == false: "Unexpected endStream already set";
- remotelyClosed = true;
- responseReceived();
- }
-
- /** Tells whether, or not, the END_STREAM Flag has been seen in any frame
- * received on this stream. */
- private boolean endStreamReceived() {
- return remotelyClosed;
- }
-
- @Override
- CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
- debug.log(Level.DEBUG, "sendHeadersOnly()");
- if (Log.requests() && request != null) {
- Log.logRequest(request.toString());
- }
- if (requestPublisher != null) {
- requestContentLen = requestPublisher.contentLength();
- } else {
- requestContentLen = 0;
- }
- OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
- connection.sendFrame(f);
- CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
- cf.complete(this); // #### good enough for now
- return cf;
- }
-
- @Override
- void released() {
- if (streamid > 0) {
- debug.log(Level.DEBUG, "Released stream %d", streamid);
- // remove this stream from the Http2Connection map.
- connection.closeStream(streamid);
- } else {
- debug.log(Level.DEBUG, "Can't release stream %d", streamid);
- }
- }
-
- @Override
- void completed() {
- // There should be nothing to do here: the stream should have
- // been already closed (or will be closed shortly after).
- }
-
- void registerStream(int id) {
- this.streamid = id;
- connection.putStream(this, streamid);
- debug.log(Level.DEBUG, "Registered stream %d", id);
- }
-
- void signalWindowUpdate() {
- RequestSubscriber subscriber = requestSubscriber;
- assert subscriber != null;
- debug.log(Level.DEBUG, "Signalling window update");
- subscriber.sendScheduler.runOrSchedule();
- }
-
- static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
- class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
- // can be < 0 if the actual length is not known.
- private final long contentLength;
- private volatile long remainingContentLength;
- private volatile Subscription subscription;
-
- // Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
- // 1) The data that was published by the request body Publisher, and
- // 2) the COMPLETED sentinel, since onComplete can be invoked without demand.
- final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
-
- private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
- // A scheduler used to honor window updates. Writing must be paused
- // when the window is exhausted, and resumed when the window acquires
- // some space. The sendScheduler makes it possible to implement this
- // behaviour in an asynchronous non-blocking way.
- // See RequestSubscriber::trySend below.
- final SequentialScheduler sendScheduler;
-
- RequestSubscriber(long contentLen) {
- this.contentLength = contentLen;
- this.remainingContentLength = contentLen;
- this.sendScheduler =
- SequentialScheduler.synchronizedScheduler(this::trySend);
- }
-
- @Override
- public void onSubscribe(Flow.Subscription subscription) {
- if (this.subscription != null) {
- throw new IllegalStateException("already subscribed");
- }
- this.subscription = subscription;
- debug.log(Level.DEBUG, "RequestSubscriber: onSubscribe, request 1");
- subscription.request(1);
- }
-
- @Override
- public void onNext(ByteBuffer item) {
- debug.log(Level.DEBUG, "RequestSubscriber: onNext(%d)", item.remaining());
- int size = outgoing.size();
- assert size == 0 : "non-zero size: " + size;
- onNextImpl(item);
- }
-
- private void onNextImpl(ByteBuffer item) {
- // Got some more request body bytes to send.
- if (requestBodyCF.isDone()) {
- // stream already cancelled, probably in timeout
- sendScheduler.stop();
- subscription.cancel();
- return;
- }
- outgoing.add(item);
- sendScheduler.runOrSchedule();
- }
-
- @Override
- public void onError(Throwable throwable) {
- debug.log(Level.DEBUG, () -> "RequestSubscriber: onError: " + throwable);
- // ensure that errors are handled within the flow.
- if (errorRef.compareAndSet(null, throwable)) {
- sendScheduler.runOrSchedule();
- }
- }
-
- @Override
- public void onComplete() {
- debug.log(Level.DEBUG, "RequestSubscriber: onComplete");
- int size = outgoing.size();
- assert size == 0 || size == 1 : "non-zero or one size: " + size;
- // last byte of request body has been obtained.
- // ensure that everything is completed within the flow.
- onNextImpl(COMPLETED);
- }
-
- // Attempts to send the data, if any.
- // Handles errors and completion state.
- // Pause writing if the send window is exhausted, resume it if the
- // send window has some bytes that can be acquired.
- void trySend() {
- try {
- // handle errors raised by onError;
- Throwable t = errorRef.get();
- if (t != null) {
- sendScheduler.stop();
- if (requestBodyCF.isDone()) return;
- subscription.cancel();
- requestBodyCF.completeExceptionally(t);
- return;
- }
-
- do {
- // handle COMPLETED;
- ByteBuffer item = outgoing.peekFirst();
- if (item == null) return;
- else if (item == COMPLETED) {
- sendScheduler.stop();
- complete();
- return;
- }
-
- // handle bytes to send downstream
- while (item.hasRemaining()) {
- debug.log(Level.DEBUG, "trySend: %d", item.remaining());
- assert !endStreamSent : "internal error, send data after END_STREAM flag";
- DataFrame df = getDataFrame(item);
- if (df == null) {
- debug.log(Level.DEBUG, "trySend: can't send yet: %d",
- item.remaining());
- return; // the send window is exhausted: come back later
- }
-
- if (contentLength > 0) {
- remainingContentLength -= df.getDataLength();
- if (remainingContentLength < 0) {
- String msg = connection().getConnectionFlow()
- + " stream=" + streamid + " "
- + "[" + Thread.currentThread().getName() + "] "
- + "Too many bytes in request body. Expected: "
- + contentLength + ", got: "
- + (contentLength - remainingContentLength);
- connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
- throw new IOException(msg);
- } else if (remainingContentLength == 0) {
- df.setFlag(DataFrame.END_STREAM);
- endStreamSent = true;
- }
- }
- debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength());
- connection.sendDataFrame(df);
- }
- assert !item.hasRemaining();
- ByteBuffer b = outgoing.removeFirst();
- assert b == item;
- } while (outgoing.peekFirst() != null);
-
- debug.log(Level.DEBUG, "trySend: request 1");
- subscription.request(1);
- } catch (Throwable ex) {
- debug.log(Level.DEBUG, "trySend: ", ex);
- sendScheduler.stop();
- subscription.cancel();
- requestBodyCF.completeExceptionally(ex);
- }
- }
-
- private void complete() throws IOException {
- long remaining = remainingContentLength;
- long written = contentLength - remaining;
- if (remaining > 0) {
- connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
- // let trySend() handle the exception
- throw new IOException(connection().getConnectionFlow()
- + " stream=" + streamid + " "
- + "[" + Thread.currentThread().getName() +"] "
- + "Too few bytes returned by the publisher ("
- + written + "/"
- + contentLength + ")");
- }
- if (!endStreamSent) {
- endStreamSent = true;
- connection.sendDataFrame(getEmptyEndStreamDataFrame());
- }
- requestBodyCF.complete(null);
- }
- }
-
- /**
- * Send a RESET frame to tell server to stop sending data on this stream
- */
- @Override
- public CompletableFuture<Void> ignoreBody() {
- try {
- connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
- return MinimalFuture.completedFuture(null);
- } catch (Throwable e) {
- Log.logTrace("Error resetting stream {0}", e.toString());
- return MinimalFuture.failedFuture(e);
- }
- }
-
- DataFrame getDataFrame(ByteBuffer buffer) {
- int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
- // blocks waiting for stream send window, if exhausted
- int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
- if (actualAmount <= 0) return null;
- ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer, actualAmount);
- DataFrame df = new DataFrame(streamid, 0 , outBuf);
- return df;
- }
-
- private DataFrame getEmptyEndStreamDataFrame() {
- return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
- }
-
- /**
- * A List of responses relating to this stream. Normally there is only
- * one response, but intermediate responses like 100 are allowed
- * and must be passed up to higher level before continuing. Deals with races
- * such as if responses are returned before the CFs get created by
- * getResponseAsync()
- */
-
- final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
-
- @Override
- CompletableFuture<Response> getResponseAsync(Executor executor) {
- CompletableFuture<Response> cf;
- // The code below deals with race condition that can be caused when
- // completeResponse() is being called before getResponseAsync()
- synchronized (response_cfs) {
- if (!response_cfs.isEmpty()) {
- // This CompletableFuture was created by completeResponse().
- // it will be already completed.
- cf = response_cfs.remove(0);
- // if we find a cf here it should be already completed.
- // finding a non completed cf should not happen. just assert it.
- assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
- } else {
- // getResponseAsync() is called first. Create a CompletableFuture
- // that will be completed by completeResponse() when
- // completeResponse() is called.
- cf = new MinimalFuture<>();
- response_cfs.add(cf);
- }
- }
- if (executor != null && !cf.isDone()) {
- // protect from executing later chain of CompletableFuture operations from SelectorManager thread
- cf = cf.thenApplyAsync(r -> r, executor);
- }
- Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
- PushGroup<?> pg = exchange.getPushGroup();
- if (pg != null) {
- // if an error occurs make sure it is recorded in the PushGroup
- cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
- }
- return cf;
- }
-
- /**
- * Completes the first uncompleted CF on list, and removes it. If there is no
- * uncompleted CF then creates one (completes it) and adds to list
- */
- void completeResponse(Response resp) {
- synchronized (response_cfs) {
- CompletableFuture<Response> cf;
- int cfs_len = response_cfs.size();
- for (int i=0; i<cfs_len; i++) {
- cf = response_cfs.get(i);
- if (!cf.isDone()) {
- Log.logTrace("Completing response (streamid={0}): {1}",
- streamid, cf);
- cf.complete(resp);
- response_cfs.remove(cf);
- return;
- } // else we found the previous response: just leave it alone.
- }
- cf = MinimalFuture.completedFuture(resp);
- Log.logTrace("Created completed future (streamid={0}): {1}",
- streamid, cf);
- response_cfs.add(cf);
- }
- }
-
- // methods to update state and remove stream when finished
-
- synchronized void requestSent() {
- requestSent = true;
- if (responseReceived) {
- close();
- }
- }
-
- synchronized void responseReceived() {
- responseReceived = true;
- if (requestSent) {
- close();
- }
- }
-
- /**
- * same as above but for errors
- */
- void completeResponseExceptionally(Throwable t) {
- synchronized (response_cfs) {
- // use index to avoid ConcurrentModificationException
- // caused by removing the CF from within the loop.
- for (int i = 0; i < response_cfs.size(); i++) {
- CompletableFuture<Response> cf = response_cfs.get(i);
- if (!cf.isDone()) {
- cf.completeExceptionally(t);
- response_cfs.remove(i);
- return;
- }
- }
- response_cfs.add(MinimalFuture.failedFuture(t));
- }
- }
-
- CompletableFuture<Void> sendBodyImpl() {
- requestBodyCF.whenComplete((v, t) -> requestSent());
- if (requestPublisher != null) {
- final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
- requestPublisher.subscribe(requestSubscriber = subscriber);
- } else {
- // there is no request body, therefore the request is complete,
- // END_STREAM has already sent with outgoing headers
- requestBodyCF.complete(null);
- }
- return requestBodyCF;
- }
-
- @Override
- void cancel() {
- cancel(new IOException("Stream " + streamid + " cancelled"));
- }
-
- @Override
- void cancel(IOException cause) {
- cancelImpl(cause);
- }
-
- // This method sends a RST_STREAM frame
- void cancelImpl(Throwable e) {
- debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e);
- if (Log.trace()) {
- Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
- }
- boolean closing;
- if (closing = !closed) { // assigning closing to !closed
- synchronized (this) {
- failed = e;
- if (closing = !closed) { // assigning closing to !closed
- closed=true;
- }
- }
- }
- if (closing) { // true if the stream has not been closed yet
- if (responseSubscriber != null)
- sched.runOrSchedule();
- }
- completeResponseExceptionally(e);
- if (!requestBodyCF.isDone()) {
- requestBodyCF.completeExceptionally(e); // we may be sending the body..
- }
- if (responseBodyCF != null) {
- responseBodyCF.completeExceptionally(e);
- }
- try {
- // will send a RST_STREAM frame
- if (streamid != 0) {
- connection.resetStream(streamid, ResetFrame.CANCEL);
- }
- } catch (IOException ex) {
- Log.logError(ex);
- }
- }
-
- // This method doesn't send any frame
- void close() {
- if (closed) return;
- synchronized(this) {
- if (closed) return;
- closed = true;
- }
- Log.logTrace("Closing stream {0}", streamid);
- connection.closeStream(streamid);
- Log.logTrace("Stream {0} closed", streamid);
- }
-
- static class PushedStream<T> extends Stream<T> {
- final PushGroup<T> pushGroup;
- // push streams need the response CF allocated up front as it is
- // given directly to user via the multi handler callback function.
- final CompletableFuture<Response> pushCF;
- CompletableFuture<HttpResponse<T>> responseCF;
- final HttpRequestImpl pushReq;
- HttpResponse.BodyHandler<T> pushHandler;
-
- PushedStream(PushGroup<T> pushGroup,
- Http2Connection connection,
- Exchange<T> pushReq) {
- // ## no request body possible, null window controller
- super(connection, pushReq, null);
- this.pushGroup = pushGroup;
- this.pushReq = pushReq.request();
- this.pushCF = new MinimalFuture<>();
- this.responseCF = new MinimalFuture<>();
-
- }
-
- CompletableFuture<HttpResponse<T>> responseCF() {
- return responseCF;
- }
-
- synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
- this.pushHandler = pushHandler;
- }
-
- synchronized HttpResponse.BodyHandler<T> getPushHandler() {
- // ignored parameters to function can be used as BodyHandler
- return this.pushHandler;
- }
-
- // Following methods call the super class but in case of
- // error record it in the PushGroup. The error method is called
- // with a null value when no error occurred (is a no-op)
- @Override
- CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
- return super.sendBodyAsync()
- .whenComplete((ExchangeImpl<T> v, Throwable t)
- -> pushGroup.pushError(Utils.getCompletionCause(t)));
- }
-
- @Override
- CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
- return super.sendHeadersAsync()
- .whenComplete((ExchangeImpl<T> ex, Throwable t)
- -> pushGroup.pushError(Utils.getCompletionCause(t)));
- }
-
- @Override
- CompletableFuture<Response> getResponseAsync(Executor executor) {
- CompletableFuture<Response> cf = pushCF.whenComplete(
- (v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
- if(executor!=null && !cf.isDone()) {
- cf = cf.thenApplyAsync( r -> r, executor);
- }
- return cf;
- }
-
- @Override
- CompletableFuture<T> readBodyAsync(
- HttpResponse.BodyHandler<T> handler,
- boolean returnConnectionToPool,
- Executor executor)
- {
- return super.readBodyAsync(handler, returnConnectionToPool, executor)
- .whenComplete((v, t) -> pushGroup.pushError(t));
- }
-
- @Override
- void completeResponse(Response r) {
- Log.logResponse(r::toString);
- pushCF.complete(r); // not strictly required for push API
- // start reading the body using the obtained BodySubscriber
- CompletableFuture<Void> start = new MinimalFuture<>();
- start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
- .whenComplete((T body, Throwable t) -> {
- if (t != null) {
- responseCF.completeExceptionally(t);
- } else {
- HttpResponseImpl<T> resp =
- new HttpResponseImpl<>(r.request, r, null, body, getExchange());
- responseCF.complete(resp);
- }
- });
- start.completeAsync(() -> null, getExchange().executor());
- }
-
- @Override
- void completeResponseExceptionally(Throwable t) {
- pushCF.completeExceptionally(t);
- }
-
-// @Override
-// synchronized void responseReceived() {
-// super.responseReceived();
-// }
-
- // create and return the PushResponseImpl
- @Override
- protected void handleResponse() {
- responseCode = (int)responseHeaders
- .firstValueAsLong(":status")
- .orElse(-1);
-
- if (responseCode == -1) {
- completeResponseExceptionally(new IOException("No status code"));
- }
-
- this.response = new Response(
- pushReq, exchange, responseHeaders,
- responseCode, HttpClient.Version.HTTP_2);
-
- /* TODO: review if needs to be removed
- the value is not used, but in case `content-length` doesn't parse
- as long, there will be NumberFormatException. If left as is, make
- sure code up the stack handles NFE correctly. */
- responseHeaders.firstValueAsLong("content-length");
-
- if (Log.headers()) {
- StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
- sb.append(" (streamid=").append(streamid).append("): ");
- Log.dumpHeaders(sb, " ", responseHeaders);
- Log.logHeaders(sb.toString());
- }
-
- // different implementations for normal streams and pushed streams
- completeResponse(response);
- }
- }
-
- final class StreamWindowUpdateSender extends WindowUpdateSender {
-
- StreamWindowUpdateSender(Http2Connection connection) {
- super(connection);
- }
-
- @Override
- int getStreamId() {
- return streamid;
- }
- }
-
- /**
- * Returns true if this exchange was canceled.
- * @return true if this exchange was canceled.
- */
- synchronized boolean isCanceled() {
- return failed != null;
- }
-
- /**
- * Returns the cause for which this exchange was canceled, if available.
- * @return the cause for which this exchange was canceled, if available.
- */
- synchronized Throwable getCancelCause() {
- return failed;
- }
-
- final String dbgString() {
- return connection.dbgString() + "/Stream("+streamid+")";
- }
-}