src/java.net.http/share/classes/jdk/internal/net/http/ResponseContent.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56089 42208b2f224e
child 56255 39e28481492d
equal deleted inserted replaced
56091:aedd6133e7a0 56092:fd85b2bf2b0d
       
     1 /*
       
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.internal.net.http;
       
    27 
       
    28 import java.io.IOException;
       
    29 import java.lang.System.Logger.Level;
       
    30 import java.nio.ByteBuffer;
       
    31 import java.util.ArrayList;
       
    32 import java.util.Collections;
       
    33 import java.util.List;
       
    34 import java.util.function.Consumer;
       
    35 import java.net.http.HttpHeaders;
       
    36 import java.net.http.HttpResponse;
       
    37 import jdk.internal.net.http.common.Utils;
       
    38 
       
    39 /**
       
    40  * Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
       
    41  *
       
    42  * Call pushBody() to read the body (blocking). Data and errors are provided
       
    43  * to given Consumers. After final buffer delivered, empty optional delivered
       
    44  */
       
    45 class ResponseContent {
       
    46 
       
    47     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    48 
       
    49     final HttpResponse.BodySubscriber<?> pusher;
       
    50     final int contentLength;
       
    51     final HttpHeaders headers;
       
    52     // this needs to run before we complete the body
       
    53     // so that connection can be returned to pool
       
    54     private final Runnable onFinished;
       
    55     private final String dbgTag;
       
    56 
       
    57     ResponseContent(HttpConnection connection,
       
    58                     int contentLength,
       
    59                     HttpHeaders h,
       
    60                     HttpResponse.BodySubscriber<?> userSubscriber,
       
    61                     Runnable onFinished)
       
    62     {
       
    63         this.pusher = userSubscriber;
       
    64         this.contentLength = contentLength;
       
    65         this.headers = h;
       
    66         this.onFinished = onFinished;
       
    67         this.dbgTag = connection.dbgString() + "/ResponseContent";
       
    68     }
       
    69 
       
    70     static final int LF = 10;
       
    71     static final int CR = 13;
       
    72 
       
    73     private boolean chunkedContent, chunkedContentInitialized;
       
    74 
       
    75     boolean contentChunked() throws IOException {
       
    76         if (chunkedContentInitialized) {
       
    77             return chunkedContent;
       
    78         }
       
    79         if (contentLength == -1) {
       
    80             String tc = headers.firstValue("Transfer-Encoding")
       
    81                                .orElse("");
       
    82             if (!tc.equals("")) {
       
    83                 if (tc.equalsIgnoreCase("chunked")) {
       
    84                     chunkedContent = true;
       
    85                 } else {
       
    86                     throw new IOException("invalid content");
       
    87                 }
       
    88             } else {
       
    89                 chunkedContent = false;
       
    90             }
       
    91         }
       
    92         chunkedContentInitialized = true;
       
    93         return chunkedContent;
       
    94     }
       
    95 
       
    96     interface BodyParser extends Consumer<ByteBuffer> {
       
    97         void onSubscribe(AbstractSubscription sub);
       
    98     }
       
    99 
       
   100     // Returns a parser that will take care of parsing the received byte
       
   101     // buffers and forward them to the BodySubscriber.
       
   102     // When the parser is done, it will call onComplete.
       
   103     // If parsing was successful, the throwable parameter will be null.
       
   104     // Otherwise it will be the exception that occurred
       
   105     // Note: revisit: it might be better to use a CompletableFuture than
       
   106     //       a completion handler.
       
   107     BodyParser getBodyParser(Consumer<Throwable> onComplete)
       
   108         throws IOException {
       
   109         if (contentChunked()) {
       
   110             return new ChunkedBodyParser(onComplete);
       
   111         } else {
       
   112             return new FixedLengthBodyParser(contentLength, onComplete);
       
   113         }
       
   114     }
       
   115 
       
   116 
       
   117     static enum ChunkState {READING_LENGTH, READING_DATA, DONE}
       
   118     class ChunkedBodyParser implements BodyParser {
       
   119         final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;
       
   120         final Consumer<Throwable> onComplete;
       
   121         final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
   122         final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";
       
   123 
       
   124         volatile Throwable closedExceptionally;
       
   125         volatile int partialChunklen = 0; // partially read chunk len
       
   126         volatile int chunklen = -1;  // number of bytes in chunk
       
   127         volatile int bytesremaining;  // number of bytes in chunk left to be read incl CRLF
       
   128         volatile boolean cr = false;  // tryReadChunkLength has found CR
       
   129         volatile int bytesToConsume;  // number of bytes that still need to be consumed before proceeding
       
   130         volatile ChunkState state = ChunkState.READING_LENGTH; // current state
       
   131         volatile AbstractSubscription sub;
       
   132         ChunkedBodyParser(Consumer<Throwable> onComplete) {
       
   133             this.onComplete = onComplete;
       
   134         }
       
   135 
       
   136         String dbgString() {
       
   137             return dbgTag;
       
   138         }
       
   139 
       
   140         @Override
       
   141         public void onSubscribe(AbstractSubscription sub) {
       
   142             debug.log(Level.DEBUG, () ->  "onSubscribe: "
       
   143                         + pusher.getClass().getName());
       
   144             pusher.onSubscribe(this.sub = sub);
       
   145         }
       
   146 
       
   147         @Override
       
   148         public void accept(ByteBuffer b) {
       
   149             if (closedExceptionally != null) {
       
   150                 debug.log(Level.DEBUG, () ->  "already closed: "
       
   151                             + closedExceptionally);
       
   152                 return;
       
   153             }
       
   154             boolean completed = false;
       
   155             try {
       
   156                 List<ByteBuffer> out = new ArrayList<>();
       
   157                 do {
       
   158                     if (tryPushOneHunk(b, out))  {
       
   159                         // We're done! (true if the final chunk was parsed).
       
   160                         if (!out.isEmpty()) {
       
   161                             // push what we have and complete
       
   162                             // only reduce demand if we actually push something.
       
   163                             // we would not have come here if there was no
       
   164                             // demand.
       
   165                             boolean hasDemand = sub.demand().tryDecrement();
       
   166                             assert hasDemand;
       
   167                             pusher.onNext(Collections.unmodifiableList(out));
       
   168                         }
       
   169                         debug.log(Level.DEBUG, () ->  "done!");
       
   170                         assert closedExceptionally == null;
       
   171                         assert state == ChunkState.DONE;
       
   172                         onFinished.run();
       
   173                         pusher.onComplete();
       
   174                         completed = true;
       
   175                         onComplete.accept(closedExceptionally); // should be null
       
   176                         break;
       
   177                     }
       
   178                     // the buffer may contain several hunks, and therefore
       
   179                     // we must loop while it's not exhausted.
       
   180                 } while (b.hasRemaining());
       
   181 
       
   182                 if (!completed && !out.isEmpty()) {
       
   183                     // push what we have.
       
   184                     // only reduce demand if we actually push something.
       
   185                     // we would not have come here if there was no
       
   186                     // demand.
       
   187                     boolean hasDemand = sub.demand().tryDecrement();
       
   188                     assert hasDemand;
       
   189                     pusher.onNext(Collections.unmodifiableList(out));
       
   190                 }
       
   191                 assert state == ChunkState.DONE || !b.hasRemaining();
       
   192             } catch(Throwable t) {
       
   193                 closedExceptionally = t;
       
   194                 if (!completed) onComplete.accept(t);
       
   195             }
       
   196         }
       
   197 
       
   198         // reads and returns chunklen. Position of chunkbuf is first byte
       
   199         // of chunk on return. chunklen includes the CR LF at end of chunk
       
   200         // returns -1 if needs more bytes
       
   201         private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException {
       
   202             assert state == ChunkState.READING_LENGTH;
       
   203             while (chunkbuf.hasRemaining()) {
       
   204                 int c = chunkbuf.get();
       
   205                 if (cr) {
       
   206                     if (c == LF) {
       
   207                         return partialChunklen;
       
   208                     } else {
       
   209                         throw new IOException("invalid chunk header");
       
   210                     }
       
   211                 }
       
   212                 if (c == CR) {
       
   213                     cr = true;
       
   214                 } else {
       
   215                     int digit = toDigit(c);
       
   216                     partialChunklen = partialChunklen * 16 + digit;
       
   217                 }
       
   218             }
       
   219             return -1;
       
   220         }
       
   221 
       
   222 
       
   223         // try to consume as many bytes as specified by bytesToConsume.
       
   224         // returns the number of bytes that still need to be consumed.
       
   225         // In practice this method is only called to consume one CRLF pair
       
   226         // with bytesToConsume set to 2, so it will only return 0 (if completed),
       
   227         // 1, or 2 (if chunkbuf doesn't have the 2 chars).
       
   228         private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException {
       
   229             int n = bytesToConsume;
       
   230             if (n > 0) {
       
   231                 int e = Math.min(chunkbuf.remaining(), n);
       
   232 
       
   233                 // verifies some assertions
       
   234                 // this methods is called only to consume CRLF
       
   235                 if (Utils.ASSERTIONSENABLED) {
       
   236                     assert n <= 2 && e <= 2;
       
   237                     ByteBuffer tmp = chunkbuf.slice();
       
   238                     // if n == 2 assert that we will first consume CR
       
   239                     assert (n == 2 && e > 0) ? tmp.get() == CR : true;
       
   240                     // if n == 1 || n == 2 && e == 2 assert that we then consume LF
       
   241                     assert (n == 1 || e == 2) ? tmp.get() == LF : true;
       
   242                 }
       
   243 
       
   244                 chunkbuf.position(chunkbuf.position() + e);
       
   245                 n -= e;
       
   246                 bytesToConsume = n;
       
   247             }
       
   248             assert n >= 0;
       
   249             return n;
       
   250         }
       
   251 
       
   252         /**
       
   253          * Returns a ByteBuffer containing chunk of data or a "hunk" of data
       
   254          * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
       
   255          * If the given chunk does not have enough data this method return
       
   256          * an empty ByteBuffer (READMORE).
       
   257          * If we encounter the final chunk (an empty chunk) this method
       
   258          * returns null.
       
   259          */
       
   260         ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException {
       
   261             int unfulfilled = bytesremaining;
       
   262             int toconsume = bytesToConsume;
       
   263             ChunkState st = state;
       
   264             if (st == ChunkState.READING_LENGTH && chunklen == -1) {
       
   265                 debug.log(Level.DEBUG, () ->  "Trying to read chunk len"
       
   266                         + " (remaining in buffer:"+chunk.remaining()+")");
       
   267                 int clen = chunklen = tryReadChunkLen(chunk);
       
   268                 if (clen == -1) return READMORE;
       
   269                 debug.log(Level.DEBUG, "Got chunk len %d", clen);
       
   270                 cr = false; partialChunklen = 0;
       
   271                 unfulfilled = bytesremaining =  clen;
       
   272                 if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk
       
   273                 else st = state = ChunkState.READING_DATA; // read the data
       
   274             }
       
   275 
       
   276             if (toconsume > 0) {
       
   277                 debug.log(Level.DEBUG,
       
   278                         "Trying to consume bytes: %d (remaining in buffer: %s)",
       
   279                         toconsume, chunk.remaining());
       
   280                 if (tryConsumeBytes(chunk) > 0) {
       
   281                     return READMORE;
       
   282                 }
       
   283             }
       
   284 
       
   285             toconsume = bytesToConsume;
       
   286             assert toconsume == 0;
       
   287 
       
   288 
       
   289             if (st == ChunkState.READING_LENGTH) {
       
   290                 // we will come here only if chunklen was 0, after having
       
   291                 // consumed the trailing CRLF
       
   292                 int clen = chunklen;
       
   293                 assert clen == 0;
       
   294                 debug.log(Level.DEBUG, "No more chunks: %d", clen);
       
   295                 // the DONE state is not really needed but it helps with
       
   296                 // assertions...
       
   297                 state = ChunkState.DONE;
       
   298                 return null;
       
   299             }
       
   300 
       
   301             int clen = chunklen;
       
   302             assert clen > 0;
       
   303             assert st == ChunkState.READING_DATA;
       
   304 
       
   305             ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk
       
   306             if (unfulfilled > 0) {
       
   307                 int bytesread = chunk.remaining();
       
   308                 debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d",
       
   309                           bytesread, unfulfilled);
       
   310 
       
   311                 int bytes2return = Math.min(bytesread, unfulfilled);
       
   312                 debug.log(Level.DEBUG,  "Returning chunk bytes: %d", bytes2return);
       
   313                 returnBuffer = Utils.sliceWithLimitedCapacity(chunk, bytes2return).asReadOnlyBuffer();
       
   314                 unfulfilled = bytesremaining -= bytes2return;
       
   315                 if (unfulfilled == 0) bytesToConsume = 2;
       
   316             }
       
   317 
       
   318             assert unfulfilled >= 0;
       
   319 
       
   320             if (unfulfilled == 0) {
       
   321                 debug.log(Level.DEBUG,
       
   322                         "No more bytes to read - %d yet to consume.",
       
   323                         unfulfilled);
       
   324                 // check whether the trailing CRLF is consumed, try to
       
   325                 // consume it if not. If tryConsumeBytes needs more bytes
       
   326                 // then we will come back here later - skipping the block
       
   327                 // that reads data because remaining==0, and finding
       
   328                 // that the two bytes are now consumed.
       
   329                 if (tryConsumeBytes(chunk) == 0) {
       
   330                     // we're done for this chunk! reset all states and
       
   331                     // prepare to read the next chunk.
       
   332                     chunklen = -1;
       
   333                     partialChunklen = 0;
       
   334                     cr = false;
       
   335                     state = ChunkState.READING_LENGTH;
       
   336                     debug.log(Level.DEBUG, "Ready to read next chunk");
       
   337                 }
       
   338             }
       
   339             if (returnBuffer == READMORE) {
       
   340                 debug.log(Level.DEBUG, "Need more data");
       
   341             }
       
   342             return returnBuffer;
       
   343         }
       
   344 
       
   345 
       
   346         // Attempt to parse and push one hunk from the buffer.
       
   347         // Returns true if the final chunk was parsed.
       
   348         // Returns false if we need to push more chunks.
       
   349         private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out)
       
   350                 throws IOException {
       
   351             assert state != ChunkState.DONE;
       
   352             ByteBuffer b1 = tryReadOneHunk(b);
       
   353             if (b1 != null) {
       
   354                 //assert b1.hasRemaining() || b1 == READMORE;
       
   355                 if (b1.hasRemaining()) {
       
   356                     debug.log(Level.DEBUG, "Sending chunk to consumer (%d)",
       
   357                               b1.remaining());
       
   358                     out.add(b1);
       
   359                     debug.log(Level.DEBUG, "Chunk sent.");
       
   360                 }
       
   361                 return false; // we haven't parsed the final chunk yet.
       
   362             } else {
       
   363                 return true; // we're done! the final chunk was parsed.
       
   364             }
       
   365         }
       
   366 
       
   367         private int toDigit(int b) throws IOException {
       
   368             if (b >= 0x30 && b <= 0x39) {
       
   369                 return b - 0x30;
       
   370             }
       
   371             if (b >= 0x41 && b <= 0x46) {
       
   372                 return b - 0x41 + 10;
       
   373             }
       
   374             if (b >= 0x61 && b <= 0x66) {
       
   375                 return b - 0x61 + 10;
       
   376             }
       
   377             throw new IOException("Invalid chunk header byte " + b);
       
   378         }
       
   379 
       
   380     }
       
   381 
       
   382     class FixedLengthBodyParser implements BodyParser {
       
   383         final int contentLength;
       
   384         final Consumer<Throwable> onComplete;
       
   385         final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
   386         final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser";
       
   387         volatile int remaining;
       
   388         volatile Throwable closedExceptionally;
       
   389         volatile AbstractSubscription sub;
       
   390         FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) {
       
   391             this.contentLength = this.remaining = contentLength;
       
   392             this.onComplete = onComplete;
       
   393         }
       
   394 
       
   395         String dbgString() {
       
   396             return dbgTag;
       
   397         }
       
   398 
       
   399         @Override
       
   400         public void onSubscribe(AbstractSubscription sub) {
       
   401             debug.log(Level.DEBUG, () -> "length="
       
   402                         + contentLength +", onSubscribe: "
       
   403                         + pusher.getClass().getName());
       
   404             pusher.onSubscribe(this.sub = sub);
       
   405             try {
       
   406                 if (contentLength == 0) {
       
   407                     onFinished.run();
       
   408                     pusher.onComplete();
       
   409                     onComplete.accept(null);
       
   410                 }
       
   411             } catch (Throwable t) {
       
   412                 closedExceptionally = t;
       
   413                 try {
       
   414                     pusher.onError(t);
       
   415                 } finally {
       
   416                     onComplete.accept(t);
       
   417                 }
       
   418             }
       
   419         }
       
   420 
       
   421         @Override
       
   422         public void accept(ByteBuffer b) {
       
   423             if (closedExceptionally != null) {
       
   424                 debug.log(Level.DEBUG, () -> "already closed: "
       
   425                             + closedExceptionally);
       
   426                 return;
       
   427             }
       
   428             boolean completed = false;
       
   429             try {
       
   430                 int unfulfilled = remaining;
       
   431                 debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)",
       
   432                         b.remaining(), unfulfilled, contentLength);
       
   433                 assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0;
       
   434 
       
   435                 if (unfulfilled == 0 && contentLength > 0) return;
       
   436 
       
   437                 if (b.hasRemaining() && unfulfilled > 0) {
       
   438                     // only reduce demand if we actually push something.
       
   439                     // we would not have come here if there was no
       
   440                     // demand.
       
   441                     boolean hasDemand = sub.demand().tryDecrement();
       
   442                     assert hasDemand;
       
   443                     int amount = Math.min(b.remaining(), unfulfilled);
       
   444                     unfulfilled = remaining -= amount;
       
   445                     ByteBuffer buffer = Utils.sliceWithLimitedCapacity(b, amount);
       
   446                     pusher.onNext(List.of(buffer.asReadOnlyBuffer()));
       
   447                 }
       
   448                 if (unfulfilled == 0) {
       
   449                     // We're done! All data has been received.
       
   450                     assert closedExceptionally == null;
       
   451                     onFinished.run();
       
   452                     pusher.onComplete();
       
   453                     completed = true;
       
   454                     onComplete.accept(closedExceptionally); // should be null
       
   455                 } else {
       
   456                     assert b.remaining() == 0;
       
   457                 }
       
   458             } catch (Throwable t) {
       
   459                 debug.log(Level.DEBUG, "Unexpected exception", t);
       
   460                 closedExceptionally = t;
       
   461                 if (!completed) {
       
   462                     onComplete.accept(t);
       
   463                 }
       
   464             }
       
   465         }
       
   466     }
       
   467 }