src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java
changeset 48083 b1c1b4ef4be2
parent 47216 71c04702a3d5
child 48376 41ae5c69b09c
child 55973 4d9b002587db
equal deleted inserted replaced
48081:89829dd3cc54 48083:b1c1b4ef4be2
    24  */
    24  */
    25 
    25 
    26 package jdk.incubator.http;
    26 package jdk.incubator.http;
    27 
    27 
    28 import java.io.IOException;
    28 import java.io.IOException;
       
    29 import java.lang.System.Logger.Level;
    29 import java.nio.ByteBuffer;
    30 import java.nio.ByteBuffer;
    30 import java.util.Optional;
    31 import java.util.ArrayList;
       
    32 import java.util.List;
    31 import java.util.function.Consumer;
    33 import java.util.function.Consumer;
    32 import jdk.incubator.http.internal.common.Utils;
    34 import jdk.incubator.http.internal.common.Utils;
    33 
    35 
    34 /**
    36 /**
    35  * Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
    37  * Implements chunked/fixed transfer encodings of HTTP/1.1 responses.
    37  * Call pushBody() to read the body (blocking). Data and errors are provided
    39  * Call pushBody() to read the body (blocking). Data and errors are provided
    38  * to given Consumers. After final buffer delivered, empty optional delivered
    40  * to given Consumers. After final buffer delivered, empty optional delivered
    39  */
    41  */
    40 class ResponseContent {
    42 class ResponseContent {
    41 
    43 
    42     final HttpResponse.BodyProcessor<?> pusher;
    44     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
    43     final HttpConnection connection;
    45 
       
    46     final HttpResponse.BodySubscriber<?> pusher;
    44     final int contentLength;
    47     final int contentLength;
    45     ByteBuffer buffer;
    48     final HttpHeaders headers;
    46     //ByteBuffer lastBufferUsed;
       
    47     final ResponseHeaders headers;
       
    48     private final Consumer<Optional<ByteBuffer>> dataConsumer;
       
    49     private final Consumer<IOException> errorConsumer;
       
    50     private final HttpClientImpl client;
       
    51     // this needs to run before we complete the body
    49     // this needs to run before we complete the body
    52     // so that connection can be returned to pool
    50     // so that connection can be returned to pool
    53     private final Runnable onFinished;
    51     private final Runnable onFinished;
       
    52     private final String dbgTag;
    54 
    53 
    55     ResponseContent(HttpConnection connection,
    54     ResponseContent(HttpConnection connection,
    56                     int contentLength,
    55                     int contentLength,
    57                     ResponseHeaders h,
    56                     HttpHeaders h,
    58                     HttpResponse.BodyProcessor<?> userProcessor,
    57                     HttpResponse.BodySubscriber<?> userSubscriber,
    59                     Consumer<Optional<ByteBuffer>> dataConsumer,
       
    60                     Consumer<IOException> errorConsumer,
       
    61                     Runnable onFinished)
    58                     Runnable onFinished)
    62     {
    59     {
    63         this.pusher = (HttpResponse.BodyProcessor)userProcessor;
    60         this.pusher = userSubscriber;
    64         this.connection = connection;
       
    65         this.contentLength = contentLength;
    61         this.contentLength = contentLength;
    66         this.headers = h;
    62         this.headers = h;
    67         this.dataConsumer = dataConsumer;
       
    68         this.errorConsumer = errorConsumer;
       
    69         this.client = connection.client;
       
    70         this.onFinished = onFinished;
    63         this.onFinished = onFinished;
       
    64         this.dbgTag = connection.dbgString() + "/ResponseContent";
    71     }
    65     }
    72 
    66 
    73     static final int LF = 10;
    67     static final int LF = 10;
    74     static final int CR = 13;
    68     static final int CR = 13;
    75     static final int SP = 0x20;
    69 
    76     static final int BUF_SIZE = 1024;
    70     private boolean chunkedContent, chunkedContentInitialized;
    77 
    71 
    78     boolean chunkedContent, chunkedContentInitialized;
    72     boolean contentChunked() throws IOException {
    79 
       
    80     private boolean contentChunked() throws IOException {
       
    81         if (chunkedContentInitialized) {
    73         if (chunkedContentInitialized) {
    82             return chunkedContent;
    74             return chunkedContent;
    83         }
    75         }
    84         if (contentLength == -1) {
    76         if (contentLength == -1) {
    85             String tc = headers.firstValue("Transfer-Encoding")
    77             String tc = headers.firstValue("Transfer-Encoding")
    96         }
    88         }
    97         chunkedContentInitialized = true;
    89         chunkedContentInitialized = true;
    98         return chunkedContent;
    90         return chunkedContent;
    99     }
    91     }
   100 
    92 
   101     /**
    93     interface BodyParser extends Consumer<ByteBuffer> {
   102      * Entry point for pusher. b is an initial ByteBuffer that may
    94         void onSubscribe(AbstractSubscription sub);
   103      * have some data in it. When this method returns, the body
    95     }
   104      * has been fully processed.
    96 
   105      */
    97     // Returns a parser that will take care of parsing the received byte
   106     void pushBody(ByteBuffer b) {
    98     // buffers and forward them to the BodySubscriber.
   107         try {
    99     // When the parser is done, it will call onComplete.
   108             // TODO: check status
   100     // If parsing was successful, the throwable parameter will be null.
   109             if (contentChunked()) {
   101     // Otherwise it will be the exception that occurred
   110                 pushBodyChunked(b);
   102     // Note: revisit: it might be better to use a CompletableFuture than
       
   103     //       a completion handler.
       
   104     BodyParser getBodyParser(Consumer<Throwable> onComplete)
       
   105         throws IOException {
       
   106         if (contentChunked()) {
       
   107             return new ChunkedBodyParser(onComplete);
       
   108         } else {
       
   109             return new FixedLengthBodyParser(contentLength, onComplete);
       
   110         }
       
   111     }
       
   112 
       
   113 
       
   114     static enum ChunkState {READING_LENGTH, READING_DATA, DONE}
       
   115     class ChunkedBodyParser implements BodyParser {
       
   116         final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER;
       
   117         final Consumer<Throwable> onComplete;
       
   118         final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
   119         final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser";
       
   120 
       
   121         volatile Throwable closedExceptionally;
       
   122         volatile int partialChunklen = 0; // partially read chunk len
       
   123         volatile int chunklen = -1;  // number of bytes in chunk
       
   124         volatile int bytesremaining;  // number of bytes in chunk left to be read incl CRLF
       
   125         volatile boolean cr = false;  // tryReadChunkLength has found CR
       
   126         volatile int bytesToConsume;  // number of bytes that still need to be consumed before proceeding
       
   127         volatile ChunkState state = ChunkState.READING_LENGTH; // current state
       
   128         volatile AbstractSubscription sub;
       
   129         ChunkedBodyParser(Consumer<Throwable> onComplete) {
       
   130             this.onComplete = onComplete;
       
   131         }
       
   132 
       
   133         String dbgString() {
       
   134             return dbgTag;
       
   135         }
       
   136 
       
   137         @Override
       
   138         public void onSubscribe(AbstractSubscription sub) {
       
   139             debug.log(Level.DEBUG, () ->  "onSubscribe: "
       
   140                         + pusher.getClass().getName());
       
   141             pusher.onSubscribe(this.sub = sub);
       
   142         }
       
   143 
       
   144         @Override
       
   145         public void accept(ByteBuffer b) {
       
   146             if (closedExceptionally != null) {
       
   147                 debug.log(Level.DEBUG, () ->  "already closed: "
       
   148                             + closedExceptionally);
       
   149                 return;
       
   150             }
       
   151             boolean completed = false;
       
   152             try {
       
   153                 List<ByteBuffer> out = new ArrayList<>();
       
   154                 do {
       
   155                     if (tryPushOneHunk(b, out))  {
       
   156                         // We're done! (true if the final chunk was parsed).
       
   157                         if (!out.isEmpty()) {
       
   158                             // push what we have and complete
       
   159                             // only reduce demand if we actually push something.
       
   160                             // we would not have come here if there was no
       
   161                             // demand.
       
   162                             boolean hasDemand = sub.demand().tryDecrement();
       
   163                             assert hasDemand;
       
   164                             pusher.onNext(out);
       
   165                         }
       
   166                         debug.log(Level.DEBUG, () ->  "done!");
       
   167                         assert closedExceptionally == null;
       
   168                         assert state == ChunkState.DONE;
       
   169                         onFinished.run();
       
   170                         pusher.onComplete();
       
   171                         completed = true;
       
   172                         onComplete.accept(closedExceptionally); // should be null
       
   173                         break;
       
   174                     }
       
   175                     // the buffer may contain several hunks, and therefore
       
   176                     // we must loop while it's not exhausted.
       
   177                 } while (b.hasRemaining());
       
   178 
       
   179                 if (!completed && !out.isEmpty()) {
       
   180                     // push what we have.
       
   181                     // only reduce demand if we actually push something.
       
   182                     // we would not have come here if there was no
       
   183                     // demand.
       
   184                     boolean hasDemand = sub.demand().tryDecrement();
       
   185                     assert hasDemand;
       
   186                     pusher.onNext(out);
       
   187                 }
       
   188                 assert state == ChunkState.DONE || !b.hasRemaining();
       
   189             } catch(Throwable t) {
       
   190                 closedExceptionally = t;
       
   191                 if (!completed) onComplete.accept(t);
       
   192             }
       
   193         }
       
   194 
       
   195         // reads and returns chunklen. Position of chunkbuf is first byte
       
   196         // of chunk on return. chunklen includes the CR LF at end of chunk
       
   197         // returns -1 if needs more bytes
       
   198         private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException {
       
   199             assert state == ChunkState.READING_LENGTH;
       
   200             while (chunkbuf.hasRemaining()) {
       
   201                 int c = chunkbuf.get();
       
   202                 if (cr) {
       
   203                     if (c == LF) {
       
   204                         return partialChunklen;
       
   205                     } else {
       
   206                         throw new IOException("invalid chunk header");
       
   207                     }
       
   208                 }
       
   209                 if (c == CR) {
       
   210                     cr = true;
       
   211                 } else {
       
   212                     int digit = toDigit(c);
       
   213                     partialChunklen = partialChunklen * 16 + digit;
       
   214                 }
       
   215             }
       
   216             return -1;
       
   217         }
       
   218 
       
   219 
       
   220         // try to consume as many bytes as specified by bytesToConsume.
       
   221         // returns the number of bytes that still need to be consumed.
       
   222         // In practice this method is only called to consume one CRLF pair
       
   223         // with bytesToConsume set to 2, so it will only return 0 (if completed),
       
   224         // 1, or 2 (if chunkbuf doesn't have the 2 chars).
       
   225         private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException {
       
   226             int n = bytesToConsume;
       
   227             if (n > 0) {
       
   228                 int e = Math.min(chunkbuf.remaining(), n);
       
   229 
       
   230                 // verifies some assertions
       
   231                 // this methods is called only to consume CRLF
       
   232                 if (Utils.ASSERTIONSENABLED) {
       
   233                     assert n <= 2 && e <= 2;
       
   234                     ByteBuffer tmp = chunkbuf.slice();
       
   235                     // if n == 2 assert that we will first consume CR
       
   236                     assert (n == 2 && e > 0) ? tmp.get() == CR : true;
       
   237                     // if n == 1 || n == 2 && e == 2 assert that we then consume LF
       
   238                     assert (n == 1 || e == 2) ? tmp.get() == LF : true;
       
   239                 }
       
   240 
       
   241                 chunkbuf.position(chunkbuf.position() + e);
       
   242                 n -= e;
       
   243                 bytesToConsume = n;
       
   244             }
       
   245             assert n >= 0;
       
   246             return n;
       
   247         }
       
   248 
       
   249         /**
       
   250          * Returns a ByteBuffer containing chunk of data or a "hunk" of data
       
   251          * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
       
   252          * If the given chunk does not have enough data this method return
       
   253          * an empty ByteBuffer (READMORE).
       
   254          * If we encounter the final chunk (an empty chunk) this method
       
   255          * returns null.
       
   256          */
       
   257         ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException {
       
   258             int unfulfilled = bytesremaining;
       
   259             int toconsume = bytesToConsume;
       
   260             ChunkState st = state;
       
   261             if (st == ChunkState.READING_LENGTH && chunklen == -1) {
       
   262                 debug.log(Level.DEBUG, () ->  "Trying to read chunk len"
       
   263                         + " (remaining in buffer:"+chunk.remaining()+")");
       
   264                 int clen = chunklen = tryReadChunkLen(chunk);
       
   265                 if (clen == -1) return READMORE;
       
   266                 debug.log(Level.DEBUG, "Got chunk len %d", clen);
       
   267                 cr = false; partialChunklen = 0;
       
   268                 unfulfilled = bytesremaining =  clen;
       
   269                 if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk
       
   270                 else st = state = ChunkState.READING_DATA; // read the data
       
   271             }
       
   272 
       
   273             if (toconsume > 0) {
       
   274                 debug.log(Level.DEBUG,
       
   275                         "Trying to consume bytes: %d (remaining in buffer: %s)",
       
   276                         toconsume, chunk.remaining());
       
   277                 if (tryConsumeBytes(chunk) > 0) {
       
   278                     return READMORE;
       
   279                 }
       
   280             }
       
   281 
       
   282             toconsume = bytesToConsume;
       
   283             assert toconsume == 0;
       
   284 
       
   285 
       
   286             if (st == ChunkState.READING_LENGTH) {
       
   287                 // we will come here only if chunklen was 0, after having
       
   288                 // consumed the trailing CRLF
       
   289                 int clen = chunklen;
       
   290                 assert clen == 0;
       
   291                 debug.log(Level.DEBUG, "No more chunks: %d", clen);
       
   292                 // the DONE state is not really needed but it helps with
       
   293                 // assertions...
       
   294                 state = ChunkState.DONE;
       
   295                 return null;
       
   296             }
       
   297 
       
   298             int clen = chunklen;
       
   299             assert clen > 0;
       
   300             assert st == ChunkState.READING_DATA;
       
   301 
       
   302             ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk
       
   303             if (unfulfilled > 0) {
       
   304                 int bytesread = chunk.remaining();
       
   305                 debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d",
       
   306                           bytesread, unfulfilled);
       
   307 
       
   308                 int bytes2return = Math.min(bytesread, unfulfilled);
       
   309                 debug.log(Level.DEBUG,  "Returning chunk bytes: %d", bytes2return);
       
   310                 returnBuffer = Utils.slice(chunk, bytes2return);
       
   311                 unfulfilled = bytesremaining -= bytes2return;
       
   312                 if (unfulfilled == 0) bytesToConsume = 2;
       
   313             }
       
   314 
       
   315             assert unfulfilled >= 0;
       
   316 
       
   317             if (unfulfilled == 0) {
       
   318                 debug.log(Level.DEBUG,
       
   319                         "No more bytes to read - %d yet to consume.",
       
   320                         unfulfilled);
       
   321                 // check whether the trailing CRLF is consumed, try to
       
   322                 // consume it if not. If tryConsumeBytes needs more bytes
       
   323                 // then we will come back here later - skipping the block
       
   324                 // that reads data because remaining==0, and finding
       
   325                 // that the two bytes are now consumed.
       
   326                 if (tryConsumeBytes(chunk) == 0) {
       
   327                     // we're done for this chunk! reset all states and
       
   328                     // prepare to read the next chunk.
       
   329                     chunklen = -1;
       
   330                     partialChunklen = 0;
       
   331                     cr = false;
       
   332                     state = ChunkState.READING_LENGTH;
       
   333                     debug.log(Level.DEBUG, "Ready to read next chunk");
       
   334                 }
       
   335             }
       
   336             if (returnBuffer == READMORE) {
       
   337                 debug.log(Level.DEBUG, "Need more data");
       
   338             }
       
   339             return returnBuffer;
       
   340         }
       
   341 
       
   342 
       
   343         // Attempt to parse and push one hunk from the buffer.
       
   344         // Returns true if the final chunk was parsed.
       
   345         // Returns false if we need to push more chunks.
       
   346         private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out)
       
   347                 throws IOException {
       
   348             assert state != ChunkState.DONE;
       
   349             ByteBuffer b1 = tryReadOneHunk(b);
       
   350             if (b1 != null) {
       
   351                 //assert b1.hasRemaining() || b1 == READMORE;
       
   352                 if (b1.hasRemaining()) {
       
   353                     debug.log(Level.DEBUG, "Sending chunk to consumer (%d)",
       
   354                               b1.remaining());
       
   355                     out.add(b1);
       
   356                     debug.log(Level.DEBUG, "Chunk sent.");
       
   357                 }
       
   358                 return false; // we haven't parsed the final chunk yet.
   111             } else {
   359             } else {
   112                 pushBodyFixed(b);
   360                 return true; // we're done! the final chunk was parsed.
   113             }
   361             }
   114         } catch (IOException t) {
   362         }
   115             errorConsumer.accept(t);
   363 
   116         }
   364         private int toDigit(int b) throws IOException {
   117     }
   365             if (b >= 0x30 && b <= 0x39) {
   118 
   366                 return b - 0x30;
   119     // reads and returns chunklen. Position of chunkbuf is first byte
   367             }
   120     // of chunk on return. chunklen includes the CR LF at end of chunk
   368             if (b >= 0x41 && b <= 0x46) {
   121     int readChunkLen() throws IOException {
   369                 return b - 0x41 + 10;
   122         chunklen = 0;
   370             }
   123         boolean cr = false;
   371             if (b >= 0x61 && b <= 0x66) {
   124         while (true) {
   372                 return b - 0x61 + 10;
   125             getHunk();
   373             }
   126             int c = chunkbuf.get();
   374             throw new IOException("Invalid chunk header byte " + b);
   127             if (cr) {
   375         }
   128                 if (c == LF) {
   376 
   129                     return chunklen + 2;
   377     }
       
   378 
       
   379     class FixedLengthBodyParser implements BodyParser {
       
   380         final int contentLength;
       
   381         final Consumer<Throwable> onComplete;
       
   382         final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
   383         final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser";
       
   384         volatile int remaining;
       
   385         volatile Throwable closedExceptionally;
       
   386         volatile AbstractSubscription sub;
       
   387         FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) {
       
   388             this.contentLength = this.remaining = contentLength;
       
   389             this.onComplete = onComplete;
       
   390         }
       
   391 
       
   392         String dbgString() {
       
   393             return dbgTag;
       
   394         }
       
   395 
       
   396         @Override
       
   397         public void onSubscribe(AbstractSubscription sub) {
       
   398             debug.log(Level.DEBUG, () -> "length="
       
   399                         + contentLength +", onSubscribe: "
       
   400                         + pusher.getClass().getName());
       
   401             pusher.onSubscribe(this.sub = sub);
       
   402             try {
       
   403                 if (contentLength == 0) {
       
   404                     pusher.onComplete();
       
   405                     onFinished.run();
       
   406                     onComplete.accept(null);
       
   407                 }
       
   408             } catch (Throwable t) {
       
   409                 closedExceptionally = t;
       
   410                 try {
       
   411                     pusher.onError(t);
       
   412                 } finally {
       
   413                     onComplete.accept(t);
       
   414                 }
       
   415             }
       
   416         }
       
   417 
       
   418         @Override
       
   419         public void accept(ByteBuffer b) {
       
   420             if (closedExceptionally != null) {
       
   421                 debug.log(Level.DEBUG, () -> "already closed: "
       
   422                             + closedExceptionally);
       
   423                 return;
       
   424             }
       
   425             boolean completed = false;
       
   426             try {
       
   427                 int unfulfilled = remaining;
       
   428                 debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)",
       
   429                         b.remaining(), unfulfilled, contentLength);
       
   430                 assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0;
       
   431 
       
   432                 if (unfulfilled == 0 && contentLength > 0) return;
       
   433 
       
   434                 if (b.hasRemaining() && unfulfilled > 0) {
       
   435                     // only reduce demand if we actually push something.
       
   436                     // we would not have come here if there was no
       
   437                     // demand.
       
   438                     boolean hasDemand = sub.demand().tryDecrement();
       
   439                     assert hasDemand;
       
   440                     int amount = Math.min(b.remaining(), unfulfilled);
       
   441                     unfulfilled = remaining -= amount;
       
   442                     ByteBuffer buffer = Utils.slice(b, amount);
       
   443                     pusher.onNext(List.of(buffer));
       
   444                 }
       
   445                 if (unfulfilled == 0) {
       
   446                     // We're done! All data has been received.
       
   447                     assert closedExceptionally == null;
       
   448                     onFinished.run();
       
   449                     pusher.onComplete();
       
   450                     completed = true;
       
   451                     onComplete.accept(closedExceptionally); // should be null
   130                 } else {
   452                 } else {
   131                     throw new IOException("invalid chunk header");
   453                     assert b.remaining() == 0;
   132                 }
   454                 }
   133             }
   455             } catch (Throwable t) {
   134             if (c == CR) {
   456                 debug.log(Level.DEBUG, "Unexpected exception", t);
   135                 cr = true;
   457                 closedExceptionally = t;
   136             } else {
   458                 if (!completed) {
   137                 int digit = toDigit(c);
   459                     onComplete.accept(t);
   138                 chunklen = chunklen * 16 + digit;
   460                 }
   139             }
   461             }
   140         }
   462         }
   141     }
       
   142 
       
   143     int chunklen = -1;      // number of bytes in chunk (fixed)
       
   144     int bytesremaining;     // number of bytes in chunk left to be read incl CRLF
       
   145     int bytesread;
       
   146     ByteBuffer chunkbuf;    // initialise
       
   147 
       
   148     // make sure we have at least 1 byte to look at
       
   149     private void getHunk() throws IOException {
       
   150         if (chunkbuf == null || !chunkbuf.hasRemaining()) {
       
   151             chunkbuf = connection.read();
       
   152         }
       
   153     }
       
   154 
       
   155     private void consumeBytes(int n) throws IOException {
       
   156         getHunk();
       
   157         while (n > 0) {
       
   158             int e = Math.min(chunkbuf.remaining(), n);
       
   159             chunkbuf.position(chunkbuf.position() + e);
       
   160             n -= e;
       
   161             if (n > 0) {
       
   162                 getHunk();
       
   163             }
       
   164         }
       
   165     }
       
   166 
       
   167     /**
       
   168      * Returns a ByteBuffer containing a chunk of data or a "hunk" of data
       
   169      * (a chunk of a chunk if the chunk size is larger than our ByteBuffers).
       
   170      * ByteBuffer returned is obtained from response processor.
       
   171      */
       
   172     ByteBuffer readChunkedBuffer() throws IOException {
       
   173         if (chunklen == -1) {
       
   174             // new chunk
       
   175             chunklen = readChunkLen() - 2;
       
   176             bytesremaining =  chunklen;
       
   177             if (chunklen == 0) {
       
   178                 consumeBytes(2);
       
   179                 return null;
       
   180             }
       
   181         }
       
   182 
       
   183         getHunk();
       
   184         bytesread = chunkbuf.remaining();
       
   185         ByteBuffer returnBuffer = Utils.getBuffer();
       
   186         int space = returnBuffer.remaining();
       
   187 
       
   188         int bytes2Copy = Math.min(bytesread, Math.min(bytesremaining, space));
       
   189         Utils.copy(chunkbuf, returnBuffer, bytes2Copy);
       
   190         returnBuffer.flip();
       
   191         bytesremaining -= bytes2Copy;
       
   192         if (bytesremaining == 0) {
       
   193             consumeBytes(2);
       
   194             chunklen = -1;
       
   195         }
       
   196         return returnBuffer;
       
   197     }
       
   198 
       
   199     ByteBuffer initialBuffer;
       
   200     int fixedBytesReturned;
       
   201 
       
   202     //ByteBuffer getResidue() {
       
   203         //return lastBufferUsed;
       
   204     //}
       
   205 
       
   206     private void compactBuffer(ByteBuffer buf) {
       
   207         buf.compact()
       
   208            .flip();
       
   209     }
       
   210 
       
   211     /**
       
   212      * Copies inbuf (numBytes from its position) to new buffer. The returned
       
   213      * buffer's position is zero and limit is at end (numBytes)
       
   214      */
       
   215     private ByteBuffer copyBuffer(ByteBuffer inbuf, int numBytes) {
       
   216         ByteBuffer b1 = Utils.getBuffer();
       
   217         assert b1.remaining() >= numBytes;
       
   218         byte[] b = b1.array();
       
   219         inbuf.get(b, 0, numBytes);
       
   220         b1.limit(numBytes);
       
   221         return b1;
       
   222     }
       
   223 
       
   224     private void pushBodyChunked(ByteBuffer b) throws IOException {
       
   225         chunkbuf = b;
       
   226         while (true) {
       
   227             ByteBuffer b1 = readChunkedBuffer();
       
   228             if (b1 != null) {
       
   229                 if (b1.hasRemaining()) {
       
   230                     dataConsumer.accept(Optional.of(b1));
       
   231                 }
       
   232             } else {
       
   233                 onFinished.run();
       
   234                 dataConsumer.accept(Optional.empty());
       
   235                 return;
       
   236             }
       
   237         }
       
   238     }
       
   239 
       
   240     private int toDigit(int b) throws IOException {
       
   241         if (b >= 0x30 && b <= 0x39) {
       
   242             return b - 0x30;
       
   243         }
       
   244         if (b >= 0x41 && b <= 0x46) {
       
   245             return b - 0x41 + 10;
       
   246         }
       
   247         if (b >= 0x61 && b <= 0x66) {
       
   248             return b - 0x61 + 10;
       
   249         }
       
   250         throw new IOException("Invalid chunk header byte " + b);
       
   251     }
       
   252 
       
   253     private void pushBodyFixed(ByteBuffer b) throws IOException {
       
   254         int remaining = contentLength;
       
   255         while (b.hasRemaining() && remaining > 0) {
       
   256             ByteBuffer buffer = Utils.getBuffer();
       
   257             int amount = Math.min(b.remaining(), remaining);
       
   258             Utils.copy(b, buffer, amount);
       
   259             remaining -= amount;
       
   260             buffer.flip();
       
   261             dataConsumer.accept(Optional.of(buffer));
       
   262         }
       
   263         while (remaining > 0) {
       
   264             ByteBuffer buffer = connection.read();
       
   265             if (buffer == null)
       
   266                 throw new IOException("connection closed");
       
   267 
       
   268             int bytesread = buffer.remaining();
       
   269             // assume for now that pipelining not implemented
       
   270             if (bytesread > remaining) {
       
   271                 throw new IOException("too many bytes read");
       
   272             }
       
   273             remaining -= bytesread;
       
   274             dataConsumer.accept(Optional.of(buffer));
       
   275         }
       
   276         onFinished.run();
       
   277         dataConsumer.accept(Optional.empty());
       
   278     }
   463     }
   279 }
   464 }