src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java
branchhttp-client-branch
changeset 56079 d23b02f37fce
parent 56078 6c11b48a0695
child 56080 64846522c0d5
equal deleted inserted replaced
56078:6c11b48a0695 56079:d23b02f37fce
     1 /*
       
     2  * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http;
       
    27 
       
    28 import java.io.EOFException;
       
    29 import java.lang.System.Logger.Level;
       
    30 import java.nio.ByteBuffer;
       
    31 import java.util.concurrent.CompletableFuture;
       
    32 import java.util.concurrent.CompletionStage;
       
    33 import java.util.concurrent.Executor;
       
    34 import java.util.function.BiConsumer;
       
    35 import java.util.function.Consumer;
       
    36 import java.util.function.Function;
       
    37 import jdk.incubator.http.ResponseContent.BodyParser;
       
    38 import jdk.incubator.http.internal.common.Log;
       
    39 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1;
       
    40 import jdk.incubator.http.internal.common.MinimalFuture;
       
    41 import jdk.incubator.http.internal.common.Utils;
       
    42 
       
    43 /**
       
    44  * Handles a HTTP/1.1 response (headers + body).
       
    45  * There can be more than one of these per Http exchange.
       
    46  */
       
    47 class Http1Response<T> {
       
    48 
       
    49     private volatile ResponseContent content;
       
    50     private final HttpRequestImpl request;
       
    51     private Response response;
       
    52     private final HttpConnection connection;
       
    53     private HttpHeaders headers;
       
    54     private int responseCode;
       
    55     private final Http1Exchange<T> exchange;
       
    56     private boolean return2Cache; // return connection to cache when finished
       
    57     private final HeadersReader headersReader; // used to read the headers
       
    58     private final BodyReader bodyReader; // used to read the body
       
    59     private final Http1AsyncReceiver asyncReceiver;
       
    60     private volatile EOFException eof;
       
    61     // max number of bytes of (fixed length) body to ignore on redirect
       
    62     private final static int MAX_IGNORE = 1024;
       
    63 
       
    64     // Revisit: can we get rid of this?
       
    65     static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
       
    66     private volatile State readProgress = State.INITIAL;
       
    67     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    68     final System.Logger  debug = Utils.getDebugLogger(this.getClass()::getSimpleName, DEBUG);
       
    69 
       
    70 
       
    71     Http1Response(HttpConnection conn,
       
    72                   Http1Exchange<T> exchange,
       
    73                   Http1AsyncReceiver asyncReceiver) {
       
    74         this.readProgress = State.INITIAL;
       
    75         this.request = exchange.request();
       
    76         this.exchange = exchange;
       
    77         this.connection = conn;
       
    78         this.asyncReceiver = asyncReceiver;
       
    79         headersReader = new HeadersReader(this::advance);
       
    80         bodyReader = new BodyReader(this::advance);
       
    81     }
       
    82 
       
    83    public CompletableFuture<Response> readHeadersAsync(Executor executor) {
       
    84         debug.log(Level.DEBUG, () -> "Reading Headers: (remaining: "
       
    85                 + asyncReceiver.remaining() +") "  + readProgress);
       
    86         // with expect continue we will resume reading headers + body.
       
    87         asyncReceiver.unsubscribe(bodyReader);
       
    88         bodyReader.reset();
       
    89         Http1HeaderParser hd = new Http1HeaderParser();
       
    90         readProgress = State.READING_HEADERS;
       
    91         headersReader.start(hd);
       
    92         asyncReceiver.subscribe(headersReader);
       
    93         CompletableFuture<State> cf = headersReader.completion();
       
    94         assert cf != null : "parsing not started";
       
    95 
       
    96         Function<State, Response> lambda = (State completed) -> {
       
    97                 assert completed == State.READING_HEADERS;
       
    98                 debug.log(Level.DEBUG, () ->
       
    99                             "Reading Headers: creating Response object;"
       
   100                             + " state is now " + readProgress);
       
   101                 asyncReceiver.unsubscribe(headersReader);
       
   102                 responseCode = hd.responseCode();
       
   103                 headers = hd.headers();
       
   104 
       
   105                 response = new Response(request,
       
   106                                         exchange.getExchange(),
       
   107                                         headers,
       
   108                                         responseCode,
       
   109                                         HTTP_1_1);
       
   110                 return response;
       
   111             };
       
   112 
       
   113         if (executor != null) {
       
   114             return cf.thenApplyAsync(lambda, executor);
       
   115         } else {
       
   116             return cf.thenApply(lambda);
       
   117         }
       
   118     }
       
   119 
       
   120     private boolean finished;
       
   121 
       
   122     synchronized void completed() {
       
   123         finished = true;
       
   124     }
       
   125 
       
   126     synchronized boolean finished() {
       
   127         return finished;
       
   128     }
       
   129 
       
   130     int fixupContentLen(int clen) {
       
   131         if (request.method().equalsIgnoreCase("HEAD")) {
       
   132             return 0;
       
   133         }
       
   134         if (clen == -1) {
       
   135             if (headers.firstValue("Transfer-encoding").orElse("")
       
   136                        .equalsIgnoreCase("chunked")) {
       
   137                 return -1;
       
   138             }
       
   139             return 0;
       
   140         }
       
   141         return clen;
       
   142     }
       
   143 
       
   144     /**
       
   145      * Read up to MAX_IGNORE bytes discarding
       
   146      */
       
   147     public CompletableFuture<Void> ignoreBody(Executor executor) {
       
   148         int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
       
   149         if (clen == -1 || clen > MAX_IGNORE) {
       
   150             connection.close();
       
   151             return MinimalFuture.completedFuture(null); // not treating as error
       
   152         } else {
       
   153             return readBody(HttpResponse.BodySubscriber.discard((Void)null), true, executor);
       
   154         }
       
   155     }
       
   156 
       
   157     public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
       
   158                                          boolean return2Cache,
       
   159                                          Executor executor) {
       
   160         this.return2Cache = return2Cache;
       
   161         final HttpResponse.BodySubscriber<U> pusher = p;
       
   162         final CompletionStage<U> bodyCF = p.getBody();
       
   163         final CompletableFuture<U> cf = MinimalFuture.of(bodyCF);
       
   164 
       
   165         int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
       
   166 
       
   167         final int clen = fixupContentLen(clen0);
       
   168 
       
   169         // expect-continue reads headers and body twice.
       
   170         // if we reach here, we must reset the headersReader state.
       
   171         asyncReceiver.unsubscribe(headersReader);
       
   172         headersReader.reset();
       
   173 
       
   174         executor.execute(() -> {
       
   175             try {
       
   176                 HttpClientImpl client = connection.client();
       
   177                 content = new ResponseContent(
       
   178                         connection, clen, headers, pusher,
       
   179                         this::onFinished
       
   180                 );
       
   181                 if (cf.isCompletedExceptionally()) {
       
   182                     // if an error occurs during subscription
       
   183                     connection.close();
       
   184                     return;
       
   185                 }
       
   186                 // increment the reference count on the HttpClientImpl
       
   187                 // to prevent the SelectorManager thread from exiting until
       
   188                 // the body is fully read.
       
   189                 client.reference();
       
   190                 bodyReader.start(content.getBodyParser(
       
   191                     (t) -> {
       
   192                         try {
       
   193                             if (t != null) {
       
   194                                 pusher.onError(t);
       
   195                                 connection.close();
       
   196                                 if (!cf.isDone())
       
   197                                     cf.completeExceptionally(t);
       
   198                             }
       
   199                         } finally {
       
   200                             // decrement the reference count on the HttpClientImpl
       
   201                             // to allow the SelectorManager thread to exit if no
       
   202                             // other operation is pending and the facade is no
       
   203                             // longer referenced.
       
   204                             client.unreference();
       
   205                             bodyReader.onComplete(t);
       
   206                         }
       
   207                     }));
       
   208                 CompletableFuture<State> bodyReaderCF = bodyReader.completion();
       
   209                 asyncReceiver.subscribe(bodyReader);
       
   210                 assert bodyReaderCF != null : "parsing not started";
       
   211                 // Make sure to keep a reference to asyncReceiver from
       
   212                 // within this
       
   213                 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) ->  {
       
   214                     t = Utils.getCompletionCause(t);
       
   215                     try {
       
   216                         if (t != null) {
       
   217                             debug.log(Level.DEBUG, () ->
       
   218                                     "Finished reading body: " + s);
       
   219                             assert s == State.READING_BODY;
       
   220                         }
       
   221                         if (t != null && !cf.isDone()) {
       
   222                             pusher.onError(t);
       
   223                             cf.completeExceptionally(t);
       
   224                         }
       
   225                     } catch (Throwable x) {
       
   226                         // not supposed to happen
       
   227                         asyncReceiver.onReadError(x);
       
   228                     }
       
   229                 });
       
   230                 connection.addTrailingOperation(trailingOp);
       
   231             } catch (Throwable t) {
       
   232                debug.log(Level.DEBUG, () -> "Failed reading body: " + t);
       
   233                 try {
       
   234                     if (!cf.isDone()) {
       
   235                         pusher.onError(t);
       
   236                         cf.completeExceptionally(t);
       
   237                     }
       
   238                 } finally {
       
   239                     asyncReceiver.onReadError(t);
       
   240                 }
       
   241             }
       
   242         });
       
   243         return cf;
       
   244     }
       
   245 
       
   246 
       
   247     private void onFinished() {
       
   248         asyncReceiver.clear();
       
   249         if (return2Cache) {
       
   250             Log.logTrace("Attempting to return connection to the pool: {0}", connection);
       
   251             // TODO: need to do something here?
       
   252             // connection.setAsyncCallbacks(null, null, null);
       
   253 
       
   254             // don't return the connection to the cache if EOF happened.
       
   255             debug.log(Level.DEBUG, () -> connection.getConnectionFlow()
       
   256                                    + ": return to HTTP/1.1 pool");
       
   257             connection.closeOrReturnToCache(eof == null ? headers : null);
       
   258         }
       
   259     }
       
   260 
       
   261     HttpHeaders responseHeaders() {
       
   262         return headers;
       
   263     }
       
   264 
       
   265     int responseCode() {
       
   266         return responseCode;
       
   267     }
       
   268 
       
   269 // ================ Support for plugging into Http1Receiver   =================
       
   270 // ============================================================================
       
   271 
       
   272     // Callback: Error receiver: Consumer of Throwable.
       
   273     void onReadError(Throwable t) {
       
   274         Log.logError(t);
       
   275         Receiver<?> receiver = receiver(readProgress);
       
   276         if (t instanceof EOFException) {
       
   277             debug.log(Level.DEBUG, "onReadError: received EOF");
       
   278             eof = (EOFException) t;
       
   279         }
       
   280         CompletableFuture<?> cf = receiver == null ? null : receiver.completion();
       
   281         debug.log(Level.DEBUG, () -> "onReadError: cf is "
       
   282                 + (cf == null  ? "null"
       
   283                 : (cf.isDone() ? "already completed"
       
   284                                : "not yet completed")));
       
   285         if (cf != null && !cf.isDone()) cf.completeExceptionally(t);
       
   286         else { debug.log(Level.DEBUG, "onReadError", t); }
       
   287         debug.log(Level.DEBUG, () -> "closing connection: cause is " + t);
       
   288         connection.close();
       
   289     }
       
   290 
       
   291     // ========================================================================
       
   292 
       
   293     private State advance(State previous) {
       
   294         assert readProgress == previous;
       
   295         switch(previous) {
       
   296             case READING_HEADERS:
       
   297                 asyncReceiver.unsubscribe(headersReader);
       
   298                 return readProgress = State.READING_BODY;
       
   299             case READING_BODY:
       
   300                 asyncReceiver.unsubscribe(bodyReader);
       
   301                 return readProgress = State.DONE;
       
   302             default:
       
   303                 throw new InternalError("can't advance from " + previous);
       
   304         }
       
   305     }
       
   306 
       
   307     Receiver<?> receiver(State state) {
       
   308         switch(state) {
       
   309             case READING_HEADERS: return headersReader;
       
   310             case READING_BODY: return bodyReader;
       
   311             default: return null;
       
   312         }
       
   313 
       
   314     }
       
   315 
       
   316     static abstract class Receiver<T>
       
   317             implements Http1AsyncReceiver.Http1AsyncDelegate {
       
   318         abstract void start(T parser);
       
   319         abstract CompletableFuture<State> completion();
       
   320         // accepts a buffer from upstream.
       
   321         // this should be implemented as a simple call to
       
   322         // accept(ref, parser, cf)
       
   323         public abstract boolean tryAsyncReceive(ByteBuffer buffer);
       
   324         public abstract void onReadError(Throwable t);
       
   325         // handle a byte buffer received from upstream.
       
   326         // this method should set the value of Http1Response.buffer
       
   327         // to ref.get() before beginning parsing.
       
   328         abstract void handle(ByteBuffer buf, T parser,
       
   329                              CompletableFuture<State> cf);
       
   330         // resets this objects state so that it can be reused later on
       
   331         // typically puts the reference to parser and completion to null
       
   332         abstract void reset();
       
   333 
       
   334         // accepts a byte buffer received from upstream
       
   335         // returns true if the buffer is fully parsed and more data can
       
   336         // be accepted, false otherwise.
       
   337         final boolean accept(ByteBuffer buf, T parser,
       
   338                 CompletableFuture<State> cf) {
       
   339             if (cf == null || parser == null || cf.isDone()) return false;
       
   340             handle(buf, parser, cf);
       
   341             return !cf.isDone();
       
   342         }
       
   343         public abstract void onSubscribe(AbstractSubscription s);
       
   344         public abstract AbstractSubscription subscription();
       
   345 
       
   346     }
       
   347 
       
   348     // Invoked with each new ByteBuffer when reading headers...
       
   349     final class HeadersReader extends Receiver<Http1HeaderParser> {
       
   350         final Consumer<State> onComplete;
       
   351         volatile Http1HeaderParser parser;
       
   352         volatile CompletableFuture<State> cf;
       
   353         volatile long count; // bytes parsed (for debug)
       
   354         volatile AbstractSubscription subscription;
       
   355 
       
   356         HeadersReader(Consumer<State> onComplete) {
       
   357             this.onComplete = onComplete;
       
   358         }
       
   359 
       
   360         @Override
       
   361         public AbstractSubscription subscription() {
       
   362             return subscription;
       
   363         }
       
   364 
       
   365         @Override
       
   366         public void onSubscribe(AbstractSubscription s) {
       
   367             this.subscription = s;
       
   368             s.request(1);
       
   369         }
       
   370 
       
   371         @Override
       
   372         void reset() {
       
   373             cf = null;
       
   374             parser = null;
       
   375             count = 0;
       
   376             subscription = null;
       
   377         }
       
   378 
       
   379         // Revisit: do we need to support restarting?
       
   380         @Override
       
   381         final void start(Http1HeaderParser hp) {
       
   382             count = 0;
       
   383             cf = new MinimalFuture<>();
       
   384             parser = hp;
       
   385         }
       
   386 
       
   387         @Override
       
   388         CompletableFuture<State> completion() {
       
   389             return cf;
       
   390         }
       
   391 
       
   392         @Override
       
   393         public final boolean tryAsyncReceive(ByteBuffer ref) {
       
   394             boolean hasDemand = subscription.demand().tryDecrement();
       
   395             assert hasDemand;
       
   396             boolean needsMore = accept(ref, parser, cf);
       
   397             if (needsMore) subscription.request(1);
       
   398             return needsMore;
       
   399         }
       
   400 
       
   401         @Override
       
   402         public final void onReadError(Throwable t) {
       
   403             Http1Response.this.onReadError(t);
       
   404         }
       
   405 
       
   406         @Override
       
   407         final void handle(ByteBuffer b,
       
   408                           Http1HeaderParser parser,
       
   409                           CompletableFuture<State> cf) {
       
   410             assert cf != null : "parsing not started";
       
   411             assert parser != null : "no parser";
       
   412             try {
       
   413                 count += b.remaining();
       
   414                 debug.log(Level.DEBUG, () -> "Sending " + b.remaining()
       
   415                         + "/" + b.capacity() + " bytes to header parser");
       
   416                 if (parser.parse(b)) {
       
   417                     count -= b.remaining();
       
   418                     debug.log(Level.DEBUG, () ->
       
   419                             "Parsing headers completed. bytes=" + count);
       
   420                     onComplete.accept(State.READING_HEADERS);
       
   421                     cf.complete(State.READING_HEADERS);
       
   422                 }
       
   423             } catch (Throwable t) {
       
   424                 debug.log(Level.DEBUG,
       
   425                         () -> "Header parser failed to handle buffer: " + t);
       
   426                 cf.completeExceptionally(t);
       
   427             }
       
   428         }
       
   429     }
       
   430 
       
   431     // Invoked with each new ByteBuffer when reading bodies...
       
   432     final class BodyReader extends Receiver<BodyParser> {
       
   433         final Consumer<State> onComplete;
       
   434         volatile BodyParser parser;
       
   435         volatile CompletableFuture<State> cf;
       
   436         volatile AbstractSubscription subscription;
       
   437         BodyReader(Consumer<State> onComplete) {
       
   438             this.onComplete = onComplete;
       
   439         }
       
   440 
       
   441         @Override
       
   442         void reset() {
       
   443             parser = null;
       
   444             cf = null;
       
   445             subscription = null;
       
   446         }
       
   447 
       
   448         // Revisit: do we need to support restarting?
       
   449         @Override
       
   450         final void start(BodyParser parser) {
       
   451             cf = new MinimalFuture<>();
       
   452             this.parser = parser;
       
   453         }
       
   454 
       
   455         @Override
       
   456         CompletableFuture<State> completion() {
       
   457             return cf;
       
   458         }
       
   459 
       
   460         @Override
       
   461         public final boolean tryAsyncReceive(ByteBuffer b) {
       
   462             return accept(b, parser, cf);
       
   463         }
       
   464 
       
   465         @Override
       
   466         public final void onReadError(Throwable t) {
       
   467             Http1Response.this.onReadError(t);
       
   468         }
       
   469 
       
   470         @Override
       
   471         public AbstractSubscription subscription() {
       
   472             return subscription;
       
   473         }
       
   474 
       
   475         @Override
       
   476         public void onSubscribe(AbstractSubscription s) {
       
   477             this.subscription = s;
       
   478             parser.onSubscribe(s);
       
   479         }
       
   480 
       
   481         @Override
       
   482         final void handle(ByteBuffer b,
       
   483                           BodyParser parser,
       
   484                           CompletableFuture<State> cf) {
       
   485             assert cf != null : "parsing not started";
       
   486             assert parser != null : "no parser";
       
   487             try {
       
   488                 debug.log(Level.DEBUG, () -> "Sending " + b.remaining()
       
   489                         + "/" + b.capacity() + " bytes to body parser");
       
   490                 parser.accept(b);
       
   491             } catch (Throwable t) {
       
   492                 debug.log(Level.DEBUG,
       
   493                         () -> "Body parser failed to handle buffer: " + t);
       
   494                 if (!cf.isDone()) {
       
   495                     cf.completeExceptionally(t);
       
   496                 }
       
   497             }
       
   498         }
       
   499 
       
   500         final void onComplete(Throwable closedExceptionally) {
       
   501             if (cf.isDone()) return;
       
   502             if (closedExceptionally != null) {
       
   503                 cf.completeExceptionally(closedExceptionally);
       
   504             } else {
       
   505                 onComplete.accept(State.READING_BODY);
       
   506                 cf.complete(State.READING_BODY);
       
   507             }
       
   508         }
       
   509 
       
   510         @Override
       
   511         public String toString() {
       
   512             return super.toString() + "/parser=" + String.valueOf(parser);
       
   513         }
       
   514 
       
   515     }
       
   516 }