src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java
author smarks
Mon, 04 Dec 2017 11:50:04 -0800
changeset 48059 6ee80cd217e0
parent 47216 71c04702a3d5
child 48083 b1c1b4ef4be2
child 55763 634d8e14c172
permissions -rw-r--r--
8177290: add copy factory methods for unmodifiable List, Set, Map 8184690: add Collectors for collecting into unmodifiable List, Set, and Map Reviewed-by: alanb, briangoetz, dholmes, jrose, rriggs, scolebourne

/*
 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.incubator.http;

import java.io.IOException;
import java.net.InetSocketAddress;
import jdk.incubator.http.HttpResponse.BodyHandler;
import jdk.incubator.http.HttpResponse.BodyProcessor;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Utils;

/**
 * Encapsulates one HTTP/1.1 request/responseAsync exchange.
 */
class Http1Exchange<T> extends ExchangeImpl<T> {

    final HttpRequestImpl request;        // main request
    private final List<CompletableFuture<?>> operations; // used for cancel
    final Http1Request requestAction;
    private volatile Http1Response<T> response;
    // use to record possible cancellation raised before any operation
    // has been initiated.
    private IOException failed;
    final HttpConnection connection;
    final HttpClientImpl client;
    final Executor executor;
    volatile ByteBuffer buffer; // used for receiving

    @Override
    public String toString() {
        return request.toString();
    }

    HttpRequestImpl request() {
        return request;
    }

    Http1Exchange(Exchange<T> exchange, HttpConnection connection)
        throws IOException
    {
        super(exchange);
        this.request = exchange.request();
        this.client = exchange.client();
        this.executor = exchange.executor();
        this.operations = new LinkedList<>();
        this.buffer = Utils.EMPTY_BYTEBUFFER;
        if (connection != null) {
            this.connection = connection;
        } else {
            InetSocketAddress addr = request.getAddress(client);
            this.connection = HttpConnection.getConnection(addr, client, request);
        }
        this.requestAction = new Http1Request(request, client, this.connection);
    }


    HttpConnection connection() {
        return connection;
    }


    @Override
    T readBody(BodyHandler<T> handler, boolean returnConnectionToPool)
        throws IOException
    {
        BodyProcessor<T> processor = handler.apply(response.responseCode(),
                                                   response.responseHeaders());
        CompletableFuture<T> bodyCF = response.readBody(processor,
                                                        returnConnectionToPool,
                                                        this::executeInline);
        try {
            return bodyCF.join();
        } catch (CompletionException e) {
            throw Utils.getIOException(e);
        }
    }

    private void executeInline(Runnable r) {
        r.run();
    }

    synchronized ByteBuffer getBuffer() {
        return buffer;
    }

    @Override
    CompletableFuture<T> readBodyAsync(BodyHandler<T> handler,
                                       boolean returnConnectionToPool,
                                       Executor executor)
    {
        BodyProcessor<T> processor = handler.apply(response.responseCode(),
                                                   response.responseHeaders());
        CompletableFuture<T> bodyCF = response.readBody(processor,
                                                        returnConnectionToPool,
                                                        executor);
        return bodyCF;
    }

    @Override
    void sendHeadersOnly() throws IOException, InterruptedException {
        try {
            if (!connection.connected()) {
                connection.connect();
            }
            requestAction.sendHeadersOnly();
        } catch (Throwable e) {
            connection.close();
            throw e;
        }
    }

    @Override
    void sendBody() throws IOException {
        try {
            requestAction.continueRequest();
        } catch (Throwable e) {
            connection.close();
            throw e;
        }
    }

    @Override
    Response getResponse() throws IOException {
        try {
            response = new Http1Response<>(connection, this);
            response.readHeaders();
            Response r = response.response();
            buffer = response.getBuffer();
            return r;
        } catch (Throwable t) {
            connection.close();
            throw t;
        }
    }

    private void closeConnection() {
        connection.close();
    }

    /**
     * Cancel checks to see if request and responseAsync finished already.
     * If not it closes the connection and completes all pending operations
     */
    @Override
    void cancel() {
        cancel(new IOException("Request cancelled"));
    }

    /**
     * Cancel checks to see if request and responseAsync finished already.
     * If not it closes the connection and completes all pending operations
     */
    @Override
    synchronized void cancel(IOException cause) {
        if (requestAction != null && requestAction.finished()
                && response != null && response.finished()) {
            return;
        }
        connection.close();
        int count = 0;
        if (operations.isEmpty()) {
            failed = cause;
            Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation."
                         + "\n\tCan''t cancel yet with {2}",
                         request.uri(),
                         request.duration() == null ? -1 :
                         // calling duration.toMillis() can throw an exception.
                         // this is just debugging, we don't care if it overflows.
                         (request.duration().getSeconds() * 1000
                          + request.duration().getNano() / 1000000),
                         cause);
        } else {
            for (CompletableFuture<?> cf : operations) {
                cf.completeExceptionally(cause);
                count++;
            }
        }
        Log.logError("Http1Exchange.cancel: count=" + count);
    }

    CompletableFuture<Response> getResponseAsyncImpl(Executor executor) {
        return MinimalFuture.supply( () -> {
            response = new Http1Response<>(connection, Http1Exchange.this);
            response.readHeaders();
            Response r = response.response();
            buffer = response.getBuffer();
            return r;
        }, executor);
    }

    @Override
    CompletableFuture<Response> getResponseAsync(Executor executor) {
        CompletableFuture<Response> cf =
            connection.whenReceivingResponse()
                      .thenCompose((v) -> getResponseAsyncImpl(executor));
        IOException cause;
        synchronized(this) {
            operations.add(cf);
            cause = failed;
            failed = null;
        }
        if (cause != null) {
            Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]"
                         + "\n\tCompleting exceptionally with {2}\n",
                         request.uri(),
                         request.duration() == null ? -1 :
                         // calling duration.toMillis() can throw an exception.
                         // this is just debugging, we don't care if it overflows.
                         (request.duration().getSeconds() * 1000
                          + request.duration().getNano() / 1000000),
                         cause);
            cf.completeExceptionally(cause);
        }
        return cf;
    }
}