src/java.net.http/share/classes/java/net/http/internal/Http1Response.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56091 aedd6133e7a0
child 56093 22d94c4a3641
equal deleted inserted replaced
56091:aedd6133e7a0 56092:fd85b2bf2b0d
     1 /*
       
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package java.net.http.internal;
       
    27 
       
    28 import java.io.EOFException;
       
    29 import java.lang.System.Logger.Level;
       
    30 import java.nio.ByteBuffer;
       
    31 import java.util.concurrent.CompletableFuture;
       
    32 import java.util.concurrent.CompletionStage;
       
    33 import java.util.concurrent.Executor;
       
    34 import java.util.function.BiConsumer;
       
    35 import java.util.function.Consumer;
       
    36 import java.util.function.Function;
       
    37 import java.net.http.HttpHeaders;
       
    38 import java.net.http.HttpResponse;
       
    39 import java.net.http.internal.ResponseContent.BodyParser;
       
    40 import java.net.http.internal.common.Log;
       
    41 import java.net.http.internal.common.MinimalFuture;
       
    42 import java.net.http.internal.common.Utils;
       
    43 import static java.net.http.HttpClient.Version.HTTP_1_1;
       
    44 
       
    45 /**
       
    46  * Handles a HTTP/1.1 response (headers + body).
       
    47  * There can be more than one of these per Http exchange.
       
    48  */
       
    49 class Http1Response<T> {
       
    50 
       
    51     private volatile ResponseContent content;
       
    52     private final HttpRequestImpl request;
       
    53     private Response response;
       
    54     private final HttpConnection connection;
       
    55     private HttpHeaders headers;
       
    56     private int responseCode;
       
    57     private final Http1Exchange<T> exchange;
       
    58     private boolean return2Cache; // return connection to cache when finished
       
    59     private final HeadersReader headersReader; // used to read the headers
       
    60     private final BodyReader bodyReader; // used to read the body
       
    61     private final Http1AsyncReceiver asyncReceiver;
       
    62     private volatile EOFException eof;
       
    63     // max number of bytes of (fixed length) body to ignore on redirect
       
    64     private final static int MAX_IGNORE = 1024;
       
    65 
       
    66     // Revisit: can we get rid of this?
       
    67     static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
       
    68     private volatile State readProgress = State.INITIAL;
       
    69     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    70     final System.Logger  debug = Utils.getDebugLogger(this.getClass()::getSimpleName, DEBUG);
       
    71 
       
    72 
       
    73     Http1Response(HttpConnection conn,
       
    74                   Http1Exchange<T> exchange,
       
    75                   Http1AsyncReceiver asyncReceiver) {
       
    76         this.readProgress = State.INITIAL;
       
    77         this.request = exchange.request();
       
    78         this.exchange = exchange;
       
    79         this.connection = conn;
       
    80         this.asyncReceiver = asyncReceiver;
       
    81         headersReader = new HeadersReader(this::advance);
       
    82         bodyReader = new BodyReader(this::advance);
       
    83     }
       
    84 
       
    85    public CompletableFuture<Response> readHeadersAsync(Executor executor) {
       
    86         debug.log(Level.DEBUG, () -> "Reading Headers: (remaining: "
       
    87                 + asyncReceiver.remaining() +") "  + readProgress);
       
    88         // with expect continue we will resume reading headers + body.
       
    89         asyncReceiver.unsubscribe(bodyReader);
       
    90         bodyReader.reset();
       
    91         Http1HeaderParser hd = new Http1HeaderParser();
       
    92         readProgress = State.READING_HEADERS;
       
    93         headersReader.start(hd);
       
    94         asyncReceiver.subscribe(headersReader);
       
    95         CompletableFuture<State> cf = headersReader.completion();
       
    96         assert cf != null : "parsing not started";
       
    97 
       
    98         Function<State, Response> lambda = (State completed) -> {
       
    99                 assert completed == State.READING_HEADERS;
       
   100                 debug.log(Level.DEBUG, () ->
       
   101                             "Reading Headers: creating Response object;"
       
   102                             + " state is now " + readProgress);
       
   103                 asyncReceiver.unsubscribe(headersReader);
       
   104                 responseCode = hd.responseCode();
       
   105                 headers = hd.headers();
       
   106 
       
   107                 response = new Response(request,
       
   108                                         exchange.getExchange(),
       
   109                                         headers,
       
   110                                         responseCode,
       
   111                                         HTTP_1_1);
       
   112                 return response;
       
   113             };
       
   114 
       
   115         if (executor != null) {
       
   116             return cf.thenApplyAsync(lambda, executor);
       
   117         } else {
       
   118             return cf.thenApply(lambda);
       
   119         }
       
   120     }
       
   121 
       
   122     private boolean finished;
       
   123 
       
   124     synchronized void completed() {
       
   125         finished = true;
       
   126     }
       
   127 
       
   128     synchronized boolean finished() {
       
   129         return finished;
       
   130     }
       
   131 
       
   132     int fixupContentLen(int clen) {
       
   133         if (request.method().equalsIgnoreCase("HEAD")) {
       
   134             return 0;
       
   135         }
       
   136         if (clen == -1) {
       
   137             if (headers.firstValue("Transfer-encoding").orElse("")
       
   138                        .equalsIgnoreCase("chunked")) {
       
   139                 return -1;
       
   140             }
       
   141             return 0;
       
   142         }
       
   143         return clen;
       
   144     }
       
   145 
       
   146     /**
       
   147      * Read up to MAX_IGNORE bytes discarding
       
   148      */
       
   149     public CompletableFuture<Void> ignoreBody(Executor executor) {
       
   150         int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
       
   151         if (clen == -1 || clen > MAX_IGNORE) {
       
   152             connection.close();
       
   153             return MinimalFuture.completedFuture(null); // not treating as error
       
   154         } else {
       
   155             return readBody(HttpResponse.BodySubscriber.discard(), true, executor);
       
   156         }
       
   157     }
       
   158 
       
   159     public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
       
   160                                          boolean return2Cache,
       
   161                                          Executor executor) {
       
   162         this.return2Cache = return2Cache;
       
   163         final HttpResponse.BodySubscriber<U> pusher = p;
       
   164 
       
   165         final CompletableFuture<U> cf = new MinimalFuture<>();
       
   166 
       
   167         int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
       
   168 
       
   169         final int clen = fixupContentLen(clen0);
       
   170 
       
   171         // expect-continue reads headers and body twice.
       
   172         // if we reach here, we must reset the headersReader state.
       
   173         asyncReceiver.unsubscribe(headersReader);
       
   174         headersReader.reset();
       
   175 
       
   176         executor.execute(() -> {
       
   177             try {
       
   178                 HttpClientImpl client = connection.client();
       
   179                 content = new ResponseContent(
       
   180                         connection, clen, headers, pusher,
       
   181                         this::onFinished
       
   182                 );
       
   183                 if (cf.isCompletedExceptionally()) {
       
   184                     // if an error occurs during subscription
       
   185                     connection.close();
       
   186                     return;
       
   187                 }
       
   188                 // increment the reference count on the HttpClientImpl
       
   189                 // to prevent the SelectorManager thread from exiting until
       
   190                 // the body is fully read.
       
   191                 client.reference();
       
   192                 bodyReader.start(content.getBodyParser(
       
   193                     (t) -> {
       
   194                         try {
       
   195                             if (t != null) {
       
   196                                 pusher.onError(t);
       
   197                                 connection.close();
       
   198                                 if (!cf.isDone())
       
   199                                     cf.completeExceptionally(t);
       
   200                             }
       
   201                         } finally {
       
   202                             // decrement the reference count on the HttpClientImpl
       
   203                             // to allow the SelectorManager thread to exit if no
       
   204                             // other operation is pending and the facade is no
       
   205                             // longer referenced.
       
   206                             client.unreference();
       
   207                             bodyReader.onComplete(t);
       
   208                         }
       
   209                     }));
       
   210                 CompletableFuture<State> bodyReaderCF = bodyReader.completion();
       
   211                 asyncReceiver.subscribe(bodyReader);
       
   212                 assert bodyReaderCF != null : "parsing not started";
       
   213                 // Make sure to keep a reference to asyncReceiver from
       
   214                 // within this
       
   215                 CompletableFuture<?> trailingOp = bodyReaderCF.whenComplete((s,t) ->  {
       
   216                     t = Utils.getCompletionCause(t);
       
   217                     try {
       
   218                         if (t != null) {
       
   219                             debug.log(Level.DEBUG, () ->
       
   220                                     "Finished reading body: " + s);
       
   221                             assert s == State.READING_BODY;
       
   222                         }
       
   223                         if (t != null && !cf.isDone()) {
       
   224                             pusher.onError(t);
       
   225                             cf.completeExceptionally(t);
       
   226                         }
       
   227                     } catch (Throwable x) {
       
   228                         // not supposed to happen
       
   229                         asyncReceiver.onReadError(x);
       
   230                     }
       
   231                 });
       
   232                 connection.addTrailingOperation(trailingOp);
       
   233             } catch (Throwable t) {
       
   234                debug.log(Level.DEBUG, () -> "Failed reading body: " + t);
       
   235                 try {
       
   236                     if (!cf.isDone()) {
       
   237                         pusher.onError(t);
       
   238                         cf.completeExceptionally(t);
       
   239                     }
       
   240                 } finally {
       
   241                     asyncReceiver.onReadError(t);
       
   242                 }
       
   243             }
       
   244         });
       
   245         p.getBody().whenComplete((U u, Throwable t) -> {
       
   246             if (t == null)
       
   247                 cf.complete(u);
       
   248             else
       
   249                 cf.completeExceptionally(t);
       
   250         });
       
   251 
       
   252         return cf;
       
   253     }
       
   254 
       
   255 
       
   256     private void onFinished() {
       
   257         asyncReceiver.clear();
       
   258         if (return2Cache) {
       
   259             Log.logTrace("Attempting to return connection to the pool: {0}", connection);
       
   260             // TODO: need to do something here?
       
   261             // connection.setAsyncCallbacks(null, null, null);
       
   262 
       
   263             // don't return the connection to the cache if EOF happened.
       
   264             debug.log(Level.DEBUG, () -> connection.getConnectionFlow()
       
   265                                    + ": return to HTTP/1.1 pool");
       
   266             connection.closeOrReturnToCache(eof == null ? headers : null);
       
   267         }
       
   268     }
       
   269 
       
   270     HttpHeaders responseHeaders() {
       
   271         return headers;
       
   272     }
       
   273 
       
   274     int responseCode() {
       
   275         return responseCode;
       
   276     }
       
   277 
       
   278 // ================ Support for plugging into Http1Receiver   =================
       
   279 // ============================================================================
       
   280 
       
   281     // Callback: Error receiver: Consumer of Throwable.
       
   282     void onReadError(Throwable t) {
       
   283         Log.logError(t);
       
   284         Receiver<?> receiver = receiver(readProgress);
       
   285         if (t instanceof EOFException) {
       
   286             debug.log(Level.DEBUG, "onReadError: received EOF");
       
   287             eof = (EOFException) t;
       
   288         }
       
   289         CompletableFuture<?> cf = receiver == null ? null : receiver.completion();
       
   290         debug.log(Level.DEBUG, () -> "onReadError: cf is "
       
   291                 + (cf == null  ? "null"
       
   292                 : (cf.isDone() ? "already completed"
       
   293                                : "not yet completed")));
       
   294         if (cf != null && !cf.isDone()) cf.completeExceptionally(t);
       
   295         else { debug.log(Level.DEBUG, "onReadError", t); }
       
   296         debug.log(Level.DEBUG, () -> "closing connection: cause is " + t);
       
   297         connection.close();
       
   298     }
       
   299 
       
   300     // ========================================================================
       
   301 
       
   302     private State advance(State previous) {
       
   303         assert readProgress == previous;
       
   304         switch(previous) {
       
   305             case READING_HEADERS:
       
   306                 asyncReceiver.unsubscribe(headersReader);
       
   307                 return readProgress = State.READING_BODY;
       
   308             case READING_BODY:
       
   309                 asyncReceiver.unsubscribe(bodyReader);
       
   310                 return readProgress = State.DONE;
       
   311             default:
       
   312                 throw new InternalError("can't advance from " + previous);
       
   313         }
       
   314     }
       
   315 
       
   316     Receiver<?> receiver(State state) {
       
   317         switch(state) {
       
   318             case READING_HEADERS: return headersReader;
       
   319             case READING_BODY: return bodyReader;
       
   320             default: return null;
       
   321         }
       
   322 
       
   323     }
       
   324 
       
   325     static abstract class Receiver<T>
       
   326             implements Http1AsyncReceiver.Http1AsyncDelegate {
       
   327         abstract void start(T parser);
       
   328         abstract CompletableFuture<State> completion();
       
   329         // accepts a buffer from upstream.
       
   330         // this should be implemented as a simple call to
       
   331         // accept(ref, parser, cf)
       
   332         public abstract boolean tryAsyncReceive(ByteBuffer buffer);
       
   333         public abstract void onReadError(Throwable t);
       
   334         // handle a byte buffer received from upstream.
       
   335         // this method should set the value of Http1Response.buffer
       
   336         // to ref.get() before beginning parsing.
       
   337         abstract void handle(ByteBuffer buf, T parser,
       
   338                              CompletableFuture<State> cf);
       
   339         // resets this objects state so that it can be reused later on
       
   340         // typically puts the reference to parser and completion to null
       
   341         abstract void reset();
       
   342 
       
   343         // accepts a byte buffer received from upstream
       
   344         // returns true if the buffer is fully parsed and more data can
       
   345         // be accepted, false otherwise.
       
   346         final boolean accept(ByteBuffer buf, T parser,
       
   347                 CompletableFuture<State> cf) {
       
   348             if (cf == null || parser == null || cf.isDone()) return false;
       
   349             handle(buf, parser, cf);
       
   350             return !cf.isDone();
       
   351         }
       
   352         public abstract void onSubscribe(AbstractSubscription s);
       
   353         public abstract AbstractSubscription subscription();
       
   354 
       
   355     }
       
   356 
       
   357     // Invoked with each new ByteBuffer when reading headers...
       
   358     final class HeadersReader extends Receiver<Http1HeaderParser> {
       
   359         final Consumer<State> onComplete;
       
   360         volatile Http1HeaderParser parser;
       
   361         volatile CompletableFuture<State> cf;
       
   362         volatile long count; // bytes parsed (for debug)
       
   363         volatile AbstractSubscription subscription;
       
   364 
       
   365         HeadersReader(Consumer<State> onComplete) {
       
   366             this.onComplete = onComplete;
       
   367         }
       
   368 
       
   369         @Override
       
   370         public AbstractSubscription subscription() {
       
   371             return subscription;
       
   372         }
       
   373 
       
   374         @Override
       
   375         public void onSubscribe(AbstractSubscription s) {
       
   376             this.subscription = s;
       
   377             s.request(1);
       
   378         }
       
   379 
       
   380         @Override
       
   381         void reset() {
       
   382             cf = null;
       
   383             parser = null;
       
   384             count = 0;
       
   385             subscription = null;
       
   386         }
       
   387 
       
   388         // Revisit: do we need to support restarting?
       
   389         @Override
       
   390         final void start(Http1HeaderParser hp) {
       
   391             count = 0;
       
   392             cf = new MinimalFuture<>();
       
   393             parser = hp;
       
   394         }
       
   395 
       
   396         @Override
       
   397         CompletableFuture<State> completion() {
       
   398             return cf;
       
   399         }
       
   400 
       
   401         @Override
       
   402         public final boolean tryAsyncReceive(ByteBuffer ref) {
       
   403             boolean hasDemand = subscription.demand().tryDecrement();
       
   404             assert hasDemand;
       
   405             boolean needsMore = accept(ref, parser, cf);
       
   406             if (needsMore) subscription.request(1);
       
   407             return needsMore;
       
   408         }
       
   409 
       
   410         @Override
       
   411         public final void onReadError(Throwable t) {
       
   412             Http1Response.this.onReadError(t);
       
   413         }
       
   414 
       
   415         @Override
       
   416         final void handle(ByteBuffer b,
       
   417                           Http1HeaderParser parser,
       
   418                           CompletableFuture<State> cf) {
       
   419             assert cf != null : "parsing not started";
       
   420             assert parser != null : "no parser";
       
   421             try {
       
   422                 count += b.remaining();
       
   423                 debug.log(Level.DEBUG, () -> "Sending " + b.remaining()
       
   424                         + "/" + b.capacity() + " bytes to header parser");
       
   425                 if (parser.parse(b)) {
       
   426                     count -= b.remaining();
       
   427                     debug.log(Level.DEBUG, () ->
       
   428                             "Parsing headers completed. bytes=" + count);
       
   429                     onComplete.accept(State.READING_HEADERS);
       
   430                     cf.complete(State.READING_HEADERS);
       
   431                 }
       
   432             } catch (Throwable t) {
       
   433                 debug.log(Level.DEBUG,
       
   434                         () -> "Header parser failed to handle buffer: " + t);
       
   435                 cf.completeExceptionally(t);
       
   436             }
       
   437         }
       
   438     }
       
   439 
       
   440     // Invoked with each new ByteBuffer when reading bodies...
       
   441     final class BodyReader extends Receiver<BodyParser> {
       
   442         final Consumer<State> onComplete;
       
   443         volatile BodyParser parser;
       
   444         volatile CompletableFuture<State> cf;
       
   445         volatile AbstractSubscription subscription;
       
   446         BodyReader(Consumer<State> onComplete) {
       
   447             this.onComplete = onComplete;
       
   448         }
       
   449 
       
   450         @Override
       
   451         void reset() {
       
   452             parser = null;
       
   453             cf = null;
       
   454             subscription = null;
       
   455         }
       
   456 
       
   457         // Revisit: do we need to support restarting?
       
   458         @Override
       
   459         final void start(BodyParser parser) {
       
   460             cf = new MinimalFuture<>();
       
   461             this.parser = parser;
       
   462         }
       
   463 
       
   464         @Override
       
   465         CompletableFuture<State> completion() {
       
   466             return cf;
       
   467         }
       
   468 
       
   469         @Override
       
   470         public final boolean tryAsyncReceive(ByteBuffer b) {
       
   471             return accept(b, parser, cf);
       
   472         }
       
   473 
       
   474         @Override
       
   475         public final void onReadError(Throwable t) {
       
   476             Http1Response.this.onReadError(t);
       
   477         }
       
   478 
       
   479         @Override
       
   480         public AbstractSubscription subscription() {
       
   481             return subscription;
       
   482         }
       
   483 
       
   484         @Override
       
   485         public void onSubscribe(AbstractSubscription s) {
       
   486             this.subscription = s;
       
   487             parser.onSubscribe(s);
       
   488         }
       
   489 
       
   490         @Override
       
   491         final void handle(ByteBuffer b,
       
   492                           BodyParser parser,
       
   493                           CompletableFuture<State> cf) {
       
   494             assert cf != null : "parsing not started";
       
   495             assert parser != null : "no parser";
       
   496             try {
       
   497                 debug.log(Level.DEBUG, () -> "Sending " + b.remaining()
       
   498                         + "/" + b.capacity() + " bytes to body parser");
       
   499                 parser.accept(b);
       
   500             } catch (Throwable t) {
       
   501                 debug.log(Level.DEBUG,
       
   502                         () -> "Body parser failed to handle buffer: " + t);
       
   503                 if (!cf.isDone()) {
       
   504                     cf.completeExceptionally(t);
       
   505                 }
       
   506             }
       
   507         }
       
   508 
       
   509         final void onComplete(Throwable closedExceptionally) {
       
   510             if (cf.isDone()) return;
       
   511             if (closedExceptionally != null) {
       
   512                 cf.completeExceptionally(closedExceptionally);
       
   513             } else {
       
   514                 onComplete.accept(State.READING_BODY);
       
   515                 cf.complete(State.READING_BODY);
       
   516             }
       
   517         }
       
   518 
       
   519         @Override
       
   520         public String toString() {
       
   521             return super.toString() + "/parser=" + String.valueOf(parser);
       
   522         }
       
   523 
       
   524     }
       
   525 }