src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Http1Response.java
branchhttp-client-branch
changeset 56079 d23b02f37fce
parent 55973 4d9b002587db
child 56082 1da51fab3032
equal deleted inserted replaced
56078:6c11b48a0695 56079:d23b02f37fce
       
     1 /*
       
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http.internal;
       
    27 
       
    28 import java.io.EOFException;
       
    29 import java.lang.System.Logger.Level;
       
    30 import java.nio.ByteBuffer;
       
    31 import java.util.concurrent.CompletableFuture;
       
    32 import java.util.concurrent.CompletionStage;
       
    33 import java.util.concurrent.Executor;
       
    34 import java.util.function.BiConsumer;
       
    35 import java.util.function.Consumer;
       
    36 import java.util.function.Function;
       
    37 import jdk.incubator.http.HttpHeaders;
       
    38 import jdk.incubator.http.HttpResponse;
       
    39 import jdk.incubator.http.internal.ResponseContent.BodyParser;
       
    40 import jdk.incubator.http.internal.common.Log;
       
    41 import jdk.incubator.http.internal.common.MinimalFuture;
       
    42 import jdk.incubator.http.internal.common.Utils;
       
    43 import static jdk.incubator.http.HttpClient.Version.HTTP_1_1;
       
    44 
       
    45 /**
       
    46  * Handles a HTTP/1.1 response (headers + body).
       
    47  * There can be more than one of these per Http exchange.
       
    48  */
       
    49 class Http1Response<T> {
       
    50 
       
    51     private volatile ResponseContent content;
       
    52     private final HttpRequestImpl request;
       
    53     private Response response;
       
    54     private final HttpConnection connection;
       
    55     private HttpHeaders headers;
       
    56     private int responseCode;
       
    57     private final Http1Exchange<T> exchange;
       
    58     private boolean return2Cache; // return connection to cache when finished
       
    59     private final HeadersReader headersReader; // used to read the headers
       
    60     private final BodyReader bodyReader; // used to read the body
       
    61     private final Http1AsyncReceiver asyncReceiver;
       
    62     private volatile EOFException eof;
       
    63     // max number of bytes of (fixed length) body to ignore on redirect
       
    64     private final static int MAX_IGNORE = 1024;
       
    65 
       
    66     // Revisit: can we get rid of this?
       
    67     static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
       
    68     private volatile State readProgress = State.INITIAL;
       
    69     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    70     final System.Logger  debug = Utils.getDebugLogger(this.getClass()::getSimpleName, DEBUG);
       
    71 
       
    72 
       
    73     Http1Response(HttpConnection conn,
       
    74                   Http1Exchange<T> exchange,
       
    75                   Http1AsyncReceiver asyncReceiver) {
       
    76         this.readProgress = State.INITIAL;
       
    77         this.request = exchange.request();
       
    78         this.exchange = exchange;
       
    79         this.connection = conn;
       
    80         this.asyncReceiver = asyncReceiver;
       
    81         headersReader = new HeadersReader(this::advance);
       
    82         bodyReader = new BodyReader(this::advance);
       
    83     }
       
    84 
       
    85    public CompletableFuture<Response> readHeadersAsync(Executor executor) {
       
    86         debug.log(Level.DEBUG, () -> "Reading Headers: (remaining: "
       
    87                 + asyncReceiver.remaining() +") "  + readProgress);
       
    88         // with expect continue we will resume reading headers + body.
       
    89         asyncReceiver.unsubscribe(bodyReader);
       
    90         bodyReader.reset();
       
    91         Http1HeaderParser hd = new Http1HeaderParser();
       
    92         readProgress = State.READING_HEADERS;
       
    93         headersReader.start(hd);
       
    94         asyncReceiver.subscribe(headersReader);
       
    95         CompletableFuture<State> cf = headersReader.completion();
       
    96         assert cf != null : "parsing not started";
       
    97 
       
    98         Function<State, Response> lambda = (State completed) -> {
       
    99                 assert completed == State.READING_HEADERS;
       
   100                 debug.log(Level.DEBUG, () ->
       
   101                             "Reading Headers: creating Response object;"
       
   102                             + " state is now " + readProgress);
       
   103                 asyncReceiver.unsubscribe(headersReader);
       
   104                 responseCode = hd.responseCode();
       
   105                 headers = hd.headers();
       
   106 
       
   107                 response = new Response(request,
       
   108                                         exchange.getExchange(),
       
   109                                         headers,
       
   110                                         responseCode,
       
   111                                         HTTP_1_1);
       
   112                 return response;
       
   113             };
       
   114 
       
   115         if (executor != null) {
       
   116             return cf.thenApplyAsync(lambda, executor);
       
   117         } else {
       
   118             return cf.thenApply(lambda);
       
   119         }
       
   120     }
       
   121 
       
   122     private boolean finished;
       
   123 
       
   124     synchronized void completed() {
       
   125         finished = true;
       
   126     }
       
   127 
       
   128     synchronized boolean finished() {
       
   129         return finished;
       
   130     }
       
   131 
       
   132     int fixupContentLen(int clen) {
       
   133         if (request.method().equalsIgnoreCase("HEAD")) {
       
   134             return 0;
       
   135         }
       
   136         if (clen == -1) {
       
   137             if (headers.firstValue("Transfer-encoding").orElse("")
       
   138                        .equalsIgnoreCase("chunked")) {
       
   139                 return -1;
       
   140             }
       
   141             return 0;
       
   142         }
       
   143         return clen;
       
   144     }
       
   145 
       
   146     /**
       
   147      * Read up to MAX_IGNORE bytes discarding
       
   148      */
       
   149     public CompletableFuture<Void> ignoreBody(Executor executor) {
       
   150         int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
       
   151         if (clen == -1 || clen > MAX_IGNORE) {
       
   152             connection.close();
       
   153             return MinimalFuture.completedFuture(null); // not treating as error
       
   154         } else {
       
   155             return readBody(HttpResponse.BodySubscriber.discard((Void)null), true, executor);
       
   156         }
       
   157     }
       
   158 
       
   159     public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
       
   160                                          boolean return2Cache,
       
   161                                          Executor executor) {
       
   162         this.return2Cache = return2Cache;
       
   163         final HttpResponse.BodySubscriber<U> pusher = p;
       
   164         final CompletionStage<U> bodyCF = p.getBody();
       
   165         final CompletableFuture<U> cf = MinimalFuture.of(bodyCF);
       
   166 
       
   167         int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
       
   168 
       
   169         final int clen = fixupContentLen(clen0);
       
   170 
       
   171         // expect-continue reads headers and body twice.
       
   172         // if we reach here, we must reset the headersReader state.
       
   173         asyncReceiver.unsubscribe(headersReader);
       
   174         headersReader.reset();
       
   175 
       
   176         executor.execute(() -> {
       
   177             try {
       
   178                 HttpClientImpl client = connection.client();
       
   179                 content = new ResponseContent(
       
   180                         connection, clen, headers, pusher,
       
   181                         this::onFinished
       
   182                 );
       
   183                 if (cf.isCompletedExceptionally()) {
       
   184                     // if an error occurs during subscription
       
   185                     connection.close();
       
   186                     return;
       
   187                 }
       
   188                 // increment the reference count on the HttpClientImpl
       
   189                 // to prevent the SelectorManager thread from exiting until
       
   190                 // the body is fully read.
       
   191                 client.reference();
       
   192                 bodyReader.start(content.getBodyParser(
       
   193                     (t) -> {
       
   194                         try {
       
   195                             if (t != null) {
       
   196                                 pusher.onError(t);
       
   197                                 connection.close();
       
   198                                 if (!cf.isDone())
       
   199                                     cf.completeExceptionally(t);
       
   200                             }
       
   201                         } finally {
       
   202                             // decrement the reference count on the HttpClientImpl
       
   203                             // to allow the SelectorManager thread to exit if no
       
   204                             // other operation is pending and the facade is no
       
   205                             // longer referenced.
       
   206                             client.unreference();
       
   207                             bodyReader.onComplete(t);
       
   208                         }
       
   209                     }));
       
   210                 CompletableFuture<State> bodyReaderCF = bodyReader.completion();
       
   211                 asyncReceiver.subscribe(bodyReader);
       
   212                 assert bodyReaderCF != null : "parsing not started";
       
   213                 // Make sure to keep a reference to asyncReceiver from
       
   214                 // within this
       
   215                 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) ->  {
       
   216                     t = Utils.getCompletionCause(t);
       
   217                     try {
       
   218                         if (t != null) {
       
   219                             debug.log(Level.DEBUG, () ->
       
   220                                     "Finished reading body: " + s);
       
   221                             assert s == State.READING_BODY;
       
   222                         }
       
   223                         if (t != null && !cf.isDone()) {
       
   224                             pusher.onError(t);
       
   225                             cf.completeExceptionally(t);
       
   226                         }
       
   227                     } catch (Throwable x) {
       
   228                         // not supposed to happen
       
   229                         asyncReceiver.onReadError(x);
       
   230                     }
       
   231                 });
       
   232                 connection.addTrailingOperation(trailingOp);
       
   233             } catch (Throwable t) {
       
   234                debug.log(Level.DEBUG, () -> "Failed reading body: " + t);
       
   235                 try {
       
   236                     if (!cf.isDone()) {
       
   237                         pusher.onError(t);
       
   238                         cf.completeExceptionally(t);
       
   239                     }
       
   240                 } finally {
       
   241                     asyncReceiver.onReadError(t);
       
   242                 }
       
   243             }
       
   244         });
       
   245         return cf;
       
   246     }
       
   247 
       
   248 
       
   249     private void onFinished() {
       
   250         asyncReceiver.clear();
       
   251         if (return2Cache) {
       
   252             Log.logTrace("Attempting to return connection to the pool: {0}", connection);
       
   253             // TODO: need to do something here?
       
   254             // connection.setAsyncCallbacks(null, null, null);
       
   255 
       
   256             // don't return the connection to the cache if EOF happened.
       
   257             debug.log(Level.DEBUG, () -> connection.getConnectionFlow()
       
   258                                    + ": return to HTTP/1.1 pool");
       
   259             connection.closeOrReturnToCache(eof == null ? headers : null);
       
   260         }
       
   261     }
       
   262 
       
   263     HttpHeaders responseHeaders() {
       
   264         return headers;
       
   265     }
       
   266 
       
   267     int responseCode() {
       
   268         return responseCode;
       
   269     }
       
   270 
       
   271 // ================ Support for plugging into Http1Receiver   =================
       
   272 // ============================================================================
       
   273 
       
   274     // Callback: Error receiver: Consumer of Throwable.
       
   275     void onReadError(Throwable t) {
       
   276         Log.logError(t);
       
   277         Receiver<?> receiver = receiver(readProgress);
       
   278         if (t instanceof EOFException) {
       
   279             debug.log(Level.DEBUG, "onReadError: received EOF");
       
   280             eof = (EOFException) t;
       
   281         }
       
   282         CompletableFuture<?> cf = receiver == null ? null : receiver.completion();
       
   283         debug.log(Level.DEBUG, () -> "onReadError: cf is "
       
   284                 + (cf == null  ? "null"
       
   285                 : (cf.isDone() ? "already completed"
       
   286                                : "not yet completed")));
       
   287         if (cf != null && !cf.isDone()) cf.completeExceptionally(t);
       
   288         else { debug.log(Level.DEBUG, "onReadError", t); }
       
   289         debug.log(Level.DEBUG, () -> "closing connection: cause is " + t);
       
   290         connection.close();
       
   291     }
       
   292 
       
   293     // ========================================================================
       
   294 
       
   295     private State advance(State previous) {
       
   296         assert readProgress == previous;
       
   297         switch(previous) {
       
   298             case READING_HEADERS:
       
   299                 asyncReceiver.unsubscribe(headersReader);
       
   300                 return readProgress = State.READING_BODY;
       
   301             case READING_BODY:
       
   302                 asyncReceiver.unsubscribe(bodyReader);
       
   303                 return readProgress = State.DONE;
       
   304             default:
       
   305                 throw new InternalError("can't advance from " + previous);
       
   306         }
       
   307     }
       
   308 
       
   309     Receiver<?> receiver(State state) {
       
   310         switch(state) {
       
   311             case READING_HEADERS: return headersReader;
       
   312             case READING_BODY: return bodyReader;
       
   313             default: return null;
       
   314         }
       
   315 
       
   316     }
       
   317 
       
   318     static abstract class Receiver<T>
       
   319             implements Http1AsyncReceiver.Http1AsyncDelegate {
       
   320         abstract void start(T parser);
       
   321         abstract CompletableFuture<State> completion();
       
   322         // accepts a buffer from upstream.
       
   323         // this should be implemented as a simple call to
       
   324         // accept(ref, parser, cf)
       
   325         public abstract boolean tryAsyncReceive(ByteBuffer buffer);
       
   326         public abstract void onReadError(Throwable t);
       
   327         // handle a byte buffer received from upstream.
       
   328         // this method should set the value of Http1Response.buffer
       
   329         // to ref.get() before beginning parsing.
       
   330         abstract void handle(ByteBuffer buf, T parser,
       
   331                              CompletableFuture<State> cf);
       
   332         // resets this objects state so that it can be reused later on
       
   333         // typically puts the reference to parser and completion to null
       
   334         abstract void reset();
       
   335 
       
   336         // accepts a byte buffer received from upstream
       
   337         // returns true if the buffer is fully parsed and more data can
       
   338         // be accepted, false otherwise.
       
   339         final boolean accept(ByteBuffer buf, T parser,
       
   340                 CompletableFuture<State> cf) {
       
   341             if (cf == null || parser == null || cf.isDone()) return false;
       
   342             handle(buf, parser, cf);
       
   343             return !cf.isDone();
       
   344         }
       
   345         public abstract void onSubscribe(AbstractSubscription s);
       
   346         public abstract AbstractSubscription subscription();
       
   347 
       
   348     }
       
   349 
       
   350     // Invoked with each new ByteBuffer when reading headers...
       
   351     final class HeadersReader extends Receiver<Http1HeaderParser> {
       
   352         final Consumer<State> onComplete;
       
   353         volatile Http1HeaderParser parser;
       
   354         volatile CompletableFuture<State> cf;
       
   355         volatile long count; // bytes parsed (for debug)
       
   356         volatile AbstractSubscription subscription;
       
   357 
       
   358         HeadersReader(Consumer<State> onComplete) {
       
   359             this.onComplete = onComplete;
       
   360         }
       
   361 
       
   362         @Override
       
   363         public AbstractSubscription subscription() {
       
   364             return subscription;
       
   365         }
       
   366 
       
   367         @Override
       
   368         public void onSubscribe(AbstractSubscription s) {
       
   369             this.subscription = s;
       
   370             s.request(1);
       
   371         }
       
   372 
       
   373         @Override
       
   374         void reset() {
       
   375             cf = null;
       
   376             parser = null;
       
   377             count = 0;
       
   378             subscription = null;
       
   379         }
       
   380 
       
   381         // Revisit: do we need to support restarting?
       
   382         @Override
       
   383         final void start(Http1HeaderParser hp) {
       
   384             count = 0;
       
   385             cf = new MinimalFuture<>();
       
   386             parser = hp;
       
   387         }
       
   388 
       
   389         @Override
       
   390         CompletableFuture<State> completion() {
       
   391             return cf;
       
   392         }
       
   393 
       
   394         @Override
       
   395         public final boolean tryAsyncReceive(ByteBuffer ref) {
       
   396             boolean hasDemand = subscription.demand().tryDecrement();
       
   397             assert hasDemand;
       
   398             boolean needsMore = accept(ref, parser, cf);
       
   399             if (needsMore) subscription.request(1);
       
   400             return needsMore;
       
   401         }
       
   402 
       
   403         @Override
       
   404         public final void onReadError(Throwable t) {
       
   405             Http1Response.this.onReadError(t);
       
   406         }
       
   407 
       
   408         @Override
       
   409         final void handle(ByteBuffer b,
       
   410                           Http1HeaderParser parser,
       
   411                           CompletableFuture<State> cf) {
       
   412             assert cf != null : "parsing not started";
       
   413             assert parser != null : "no parser";
       
   414             try {
       
   415                 count += b.remaining();
       
   416                 debug.log(Level.DEBUG, () -> "Sending " + b.remaining()
       
   417                         + "/" + b.capacity() + " bytes to header parser");
       
   418                 if (parser.parse(b)) {
       
   419                     count -= b.remaining();
       
   420                     debug.log(Level.DEBUG, () ->
       
   421                             "Parsing headers completed. bytes=" + count);
       
   422                     onComplete.accept(State.READING_HEADERS);
       
   423                     cf.complete(State.READING_HEADERS);
       
   424                 }
       
   425             } catch (Throwable t) {
       
   426                 debug.log(Level.DEBUG,
       
   427                         () -> "Header parser failed to handle buffer: " + t);
       
   428                 cf.completeExceptionally(t);
       
   429             }
       
   430         }
       
   431     }
       
   432 
       
   433     // Invoked with each new ByteBuffer when reading bodies...
       
   434     final class BodyReader extends Receiver<BodyParser> {
       
   435         final Consumer<State> onComplete;
       
   436         volatile BodyParser parser;
       
   437         volatile CompletableFuture<State> cf;
       
   438         volatile AbstractSubscription subscription;
       
   439         BodyReader(Consumer<State> onComplete) {
       
   440             this.onComplete = onComplete;
       
   441         }
       
   442 
       
   443         @Override
       
   444         void reset() {
       
   445             parser = null;
       
   446             cf = null;
       
   447             subscription = null;
       
   448         }
       
   449 
       
   450         // Revisit: do we need to support restarting?
       
   451         @Override
       
   452         final void start(BodyParser parser) {
       
   453             cf = new MinimalFuture<>();
       
   454             this.parser = parser;
       
   455         }
       
   456 
       
   457         @Override
       
   458         CompletableFuture<State> completion() {
       
   459             return cf;
       
   460         }
       
   461 
       
   462         @Override
       
   463         public final boolean tryAsyncReceive(ByteBuffer b) {
       
   464             return accept(b, parser, cf);
       
   465         }
       
   466 
       
   467         @Override
       
   468         public final void onReadError(Throwable t) {
       
   469             Http1Response.this.onReadError(t);
       
   470         }
       
   471 
       
   472         @Override
       
   473         public AbstractSubscription subscription() {
       
   474             return subscription;
       
   475         }
       
   476 
       
   477         @Override
       
   478         public void onSubscribe(AbstractSubscription s) {
       
   479             this.subscription = s;
       
   480             parser.onSubscribe(s);
       
   481         }
       
   482 
       
   483         @Override
       
   484         final void handle(ByteBuffer b,
       
   485                           BodyParser parser,
       
   486                           CompletableFuture<State> cf) {
       
   487             assert cf != null : "parsing not started";
       
   488             assert parser != null : "no parser";
       
   489             try {
       
   490                 debug.log(Level.DEBUG, () -> "Sending " + b.remaining()
       
   491                         + "/" + b.capacity() + " bytes to body parser");
       
   492                 parser.accept(b);
       
   493             } catch (Throwable t) {
       
   494                 debug.log(Level.DEBUG,
       
   495                         () -> "Body parser failed to handle buffer: " + t);
       
   496                 if (!cf.isDone()) {
       
   497                     cf.completeExceptionally(t);
       
   498                 }
       
   499             }
       
   500         }
       
   501 
       
   502         final void onComplete(Throwable closedExceptionally) {
       
   503             if (cf.isDone()) return;
       
   504             if (closedExceptionally != null) {
       
   505                 cf.completeExceptionally(closedExceptionally);
       
   506             } else {
       
   507                 onComplete.accept(State.READING_BODY);
       
   508                 cf.complete(State.READING_BODY);
       
   509             }
       
   510         }
       
   511 
       
   512         @Override
       
   513         public String toString() {
       
   514             return super.toString() + "/parser=" + String.valueOf(parser);
       
   515         }
       
   516 
       
   517     }
       
   518 }