src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java
branchhttp-client-branch
changeset 55763 634d8e14c172
parent 47216 71c04702a3d5
child 55799 c71f52f48d97
equal deleted inserted replaced
55762:e947a3a50a95 55763:634d8e14c172
    24  */
    24  */
    25 
    25 
    26 package jdk.incubator.http;
    26 package jdk.incubator.http;
    27 
    27 
    28 import java.io.IOException;
    28 import java.io.IOException;
       
    29 import java.lang.System.Logger.Level;
    29 import java.util.concurrent.CompletableFuture;
    30 import java.util.concurrent.CompletableFuture;
    30 import java.util.concurrent.Executor;
    31 import java.util.concurrent.Executor;
       
    32 import java.util.function.Function;
    31 import jdk.incubator.http.internal.common.MinimalFuture;
    33 import jdk.incubator.http.internal.common.MinimalFuture;
    32 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1;
    34 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1;
       
    35 import jdk.incubator.http.internal.common.Utils;
    33 
    36 
    34 /**
    37 /**
    35  * Splits request so that headers and body can be sent separately with optional
    38  * Splits request so that headers and body can be sent separately with optional
    36  * (multiple) responses in between (e.g. 100 Continue). Also request and
    39  * (multiple) responses in between (e.g. 100 Continue). Also request and
    37  * response always sent/received in different calls.
    40  * response always sent/received in different calls.
    43  *      Stream          (HTTP/2)
    46  *      Stream          (HTTP/2)
    44  *
    47  *
    45  * These implementation classes are where work is allocated to threads.
    48  * These implementation classes are where work is allocated to threads.
    46  */
    49  */
    47 abstract class ExchangeImpl<T> {
    50 abstract class ExchangeImpl<T> {
       
    51 
       
    52     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    53     private static final System.Logger DEBUG_LOGGER =
       
    54             Utils.getDebugLogger("ExchangeImpl"::toString, DEBUG);
    48 
    55 
    49     final Exchange<T> exchange;
    56     final Exchange<T> exchange;
    50 
    57 
    51     ExchangeImpl(Exchange<T> e) {
    58     ExchangeImpl(Exchange<T> e) {
    52         // e == null means a http/2 pushed stream
    59         // e == null means a http/2 pushed stream
    66 
    73 
    67     /**
    74     /**
    68      * Initiates a new exchange and assigns it to a connection if one exists
    75      * Initiates a new exchange and assigns it to a connection if one exists
    69      * already. connection usually null.
    76      * already. connection usually null.
    70      */
    77      */
    71     static <U> ExchangeImpl<U> get(Exchange<U> exchange, HttpConnection connection)
    78     static <U> CompletableFuture<? extends ExchangeImpl<U>>
    72         throws IOException, InterruptedException
    79     get(Exchange<U> exchange, HttpConnection connection)
    73     {
    80     {
    74         HttpRequestImpl req = exchange.request();
    81         HttpRequestImpl req = exchange.request();
    75         if (exchange.version() == HTTP_1_1) {
    82         if (exchange.version() == HTTP_1_1) {
    76             return new Http1Exchange<>(exchange, connection);
    83             DEBUG_LOGGER.log(Level.DEBUG, "get: HTTP/1.1: new Http1Exchange");
       
    84             return createHttp1Exchange(exchange, connection);
    77         } else {
    85         } else {
    78             Http2ClientImpl c2 = exchange.client().client2(); // TODO: improve
    86             Http2ClientImpl c2 = exchange.client().client2(); // TODO: improve
    79             HttpRequestImpl request = exchange.request();
    87             HttpRequestImpl request = exchange.request();
    80             Http2Connection c;
    88             CompletableFuture<Http2Connection> c2f = c2.getConnectionFor(request);
    81             try {
    89             DEBUG_LOGGER.log(Level.DEBUG, "get: Trying to get HTTP/2 connection");
    82                 c = c2.getConnectionFor(request);
    90             return c2f.handle((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection))
    83             } catch (Http2Connection.ALPNException e) {
    91                     .thenCompose(Function.identity());
    84                 // failed to negotiate "h2"
    92         }
    85                 AbstractAsyncSSLConnection as = e.getConnection();
    93     }
    86                 as.stopAsyncReading();
    94 
    87                 HttpConnection sslc = as.downgrade();
    95     private static <U> CompletableFuture<? extends ExchangeImpl<U>>
    88                 ExchangeImpl<U> ex = new Http1Exchange<>(exchange, sslc);
    96     createExchangeImpl(Http2Connection c,
       
    97                        Throwable t,
       
    98                        Exchange<U> exchange,
       
    99                        HttpConnection connection)
       
   100     {
       
   101         DEBUG_LOGGER.log(Level.DEBUG, "handling HTTP/2 connection creation result");
       
   102         if (t != null) {
       
   103             DEBUG_LOGGER.log(Level.DEBUG,
       
   104                              "handling HTTP/2 connection creation failed: %s",
       
   105                              (Object)t);
       
   106             t = Utils.getCompletionCause(t);
       
   107             if (t instanceof Http2Connection.ALPNException) {
       
   108                 Http2Connection.ALPNException ee = (Http2Connection.ALPNException)t;
       
   109                 AbstractAsyncSSLConnection as = ee.getConnection();
       
   110                 DEBUG_LOGGER.log(Level.DEBUG, "downgrading to HTTP/1.1 with: %s", as);
       
   111                 CompletableFuture<? extends ExchangeImpl<U>> ex =
       
   112                         createHttp1Exchange(exchange, as);
    89                 return ex;
   113                 return ex;
       
   114             } else {
       
   115                 DEBUG_LOGGER.log(Level.DEBUG, "HTTP/2 connection creation failed "
       
   116                                   + "with unexpected exception: %s", (Object)t);
       
   117                 return CompletableFuture.failedFuture(t);
    90             }
   118             }
    91             if (c == null) {
   119         }
    92                 // no existing connection. Send request with HTTP 1 and then
   120         if (c == null) {
    93                 // upgrade if successful
   121             // no existing connection. Send request with HTTP 1 and then
    94                 ExchangeImpl<U> ex = new Http1Exchange<>(exchange, connection);
   122             // upgrade if successful
    95                 exchange.h2Upgrade();
   123             DEBUG_LOGGER.log(Level.DEBUG, "new Http1Exchange, try to upgrade");
    96                 return ex;
   124             return createHttp1Exchange(exchange, connection)
    97             }
   125                     .thenApply((e) -> {
    98             return c.createStream(exchange);
   126                         exchange.h2Upgrade();
       
   127                         return e;
       
   128                     });
       
   129         } else {
       
   130             DEBUG_LOGGER.log(Level.DEBUG, "creating HTTP/2 streams");
       
   131             Stream<U> s = c.createStream(exchange);
       
   132             CompletableFuture<? extends ExchangeImpl<U>> ex = MinimalFuture.completedFuture(s);
       
   133             return ex;
       
   134         }
       
   135     }
       
   136 
       
   137     private static <T> CompletableFuture<Http1Exchange<T>>
       
   138     createHttp1Exchange(Exchange<T> ex, HttpConnection as)
       
   139     {
       
   140         try {
       
   141             return MinimalFuture.completedFuture(new Http1Exchange<>(ex, as));
       
   142         } catch (Throwable e) {
       
   143             return MinimalFuture.failedFuture(e);
    99         }
   144         }
   100     }
   145     }
   101 
   146 
   102     /* The following methods have separate HTTP/1.1 and HTTP/2 implementations */
   147     /* The following methods have separate HTTP/1.1 and HTTP/2 implementations */
   103 
   148 
   104     /**
   149     abstract CompletableFuture<ExchangeImpl<T>> sendHeadersAsync();
   105      * Sends the request headers only. May block until all sent.
       
   106      */
       
   107     abstract void sendHeadersOnly() throws IOException, InterruptedException;
       
   108 
   150 
   109     // Blocking impl but in async style
   151     /** Sends a request body, after request headers have been sent. */
   110 
   152     abstract CompletableFuture<ExchangeImpl<T>> sendBodyAsync();
   111     CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
       
   112         // this is blocking. cf will already be completed.
       
   113         return MinimalFuture.supply(() -> {
       
   114             sendHeadersOnly();
       
   115             return this;
       
   116         });
       
   117     }
       
   118 
       
   119     /**
       
   120      * Gets response by blocking if necessary. This may be an
       
   121      * intermediate response (like 101) or a final response 200 etc. Returns
       
   122      * before body is read.
       
   123      */
       
   124     abstract Response getResponse() throws IOException;
       
   125 
       
   126     abstract T readBody(HttpResponse.BodyHandler<T> handler,
       
   127                         boolean returnConnectionToPool) throws IOException;
       
   128 
   153 
   129     abstract CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
   154     abstract CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
   130                                                 boolean returnConnectionToPool,
   155                                                 boolean returnConnectionToPool,
   131                                                 Executor executor);
   156                                                 Executor executor);
   132 
   157 
   133     /**
   158     /** Gets the response headers. Completes before body is read. */
   134      * Async version of getResponse. Completes before body is read.
       
   135      */
       
   136     abstract CompletableFuture<Response> getResponseAsync(Executor executor);
   159     abstract CompletableFuture<Response> getResponseAsync(Executor executor);
   137 
   160 
   138     /**
       
   139      * Sends a request body after request headers.
       
   140      */
       
   141     abstract void sendBody() throws IOException, InterruptedException;
       
   142 
   161 
   143     // Async version of sendBody(). This only used when body sent separately
   162     /** Cancels a request.  Not currently exposed through API. */
   144     // to headers (100 continue)
       
   145     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
       
   146         return MinimalFuture.supply(() -> {
       
   147             sendBody();
       
   148             return this;
       
   149         });
       
   150     }
       
   151 
       
   152     /**
       
   153      * Cancels a request.  Not currently exposed through API.
       
   154      */
       
   155     abstract void cancel();
   163     abstract void cancel();
   156 
   164 
   157     /**
   165     /**
   158      * Cancels a request with a cause.  Not currently exposed through API.
   166      * Cancels a request with a cause.  Not currently exposed through API.
   159      */
   167      */
   160     abstract void cancel(IOException cause);
   168     abstract void cancel(IOException cause);
       
   169 
       
   170     /**
       
   171      * Called when the exchange is released, so that cleanup actions may be
       
   172      * performed - such as deregistering callbacks.
       
   173      * Typically released is called during upgrade, when an HTTP/2 stream
       
   174      * takes over from an Http1Exchange, or when a new exchange is created
       
   175      * during a multi exchange before the final response body was received.
       
   176      */
       
   177     abstract void released();
       
   178 
       
   179     /**
       
   180      * Called when the exchange is completed, so that cleanup actions may be
       
   181      * performed - such as deregistering callbacks.
       
   182      * Typically, completed is called at the end of the exchange, when the
       
   183      * final response body has been received (or an error has caused the
       
   184      * completion of the exchange).
       
   185      */
       
   186     abstract void completed();
       
   187 
       
   188     /**
       
   189      * Returns true if this exchange was canceled.
       
   190      * @return true if this exchange was canceled.
       
   191      */
       
   192     abstract boolean isCanceled();
       
   193 
       
   194     /**
       
   195      * Returns the cause for which this exchange was canceled, if available.
       
   196      * @return the cause for which this exchange was canceled, if available.
       
   197      */
       
   198     abstract Throwable getCancelCause();
   161 }
   199 }