37 * Call pushBody() to read the body (blocking). Data and errors are provided |
39 * Call pushBody() to read the body (blocking). Data and errors are provided |
38 * to given Consumers. After final buffer delivered, empty optional delivered |
40 * to given Consumers. After final buffer delivered, empty optional delivered |
39 */ |
41 */ |
40 class ResponseContent { |
42 class ResponseContent { |
41 |
43 |
42 final HttpResponse.BodyProcessor<?> pusher; |
44 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. |
43 final HttpConnection connection; |
45 |
|
46 final HttpResponse.BodySubscriber<?> pusher; |
44 final int contentLength; |
47 final int contentLength; |
45 ByteBuffer buffer; |
48 final HttpHeaders headers; |
46 //ByteBuffer lastBufferUsed; |
|
47 final ResponseHeaders headers; |
|
48 private final Consumer<Optional<ByteBuffer>> dataConsumer; |
|
49 private final Consumer<IOException> errorConsumer; |
|
50 private final HttpClientImpl client; |
|
51 // this needs to run before we complete the body |
49 // this needs to run before we complete the body |
52 // so that connection can be returned to pool |
50 // so that connection can be returned to pool |
53 private final Runnable onFinished; |
51 private final Runnable onFinished; |
|
52 private final String dbgTag; |
54 |
53 |
55 ResponseContent(HttpConnection connection, |
54 ResponseContent(HttpConnection connection, |
56 int contentLength, |
55 int contentLength, |
57 ResponseHeaders h, |
56 HttpHeaders h, |
58 HttpResponse.BodyProcessor<?> userProcessor, |
57 HttpResponse.BodySubscriber<?> userSubscriber, |
59 Consumer<Optional<ByteBuffer>> dataConsumer, |
|
60 Consumer<IOException> errorConsumer, |
|
61 Runnable onFinished) |
58 Runnable onFinished) |
62 { |
59 { |
63 this.pusher = (HttpResponse.BodyProcessor)userProcessor; |
60 this.pusher = userSubscriber; |
64 this.connection = connection; |
|
65 this.contentLength = contentLength; |
61 this.contentLength = contentLength; |
66 this.headers = h; |
62 this.headers = h; |
67 this.dataConsumer = dataConsumer; |
|
68 this.errorConsumer = errorConsumer; |
|
69 this.client = connection.client; |
|
70 this.onFinished = onFinished; |
63 this.onFinished = onFinished; |
|
64 this.dbgTag = connection.dbgString() + "/ResponseContent"; |
71 } |
65 } |
72 |
66 |
73 static final int LF = 10; |
67 static final int LF = 10; |
74 static final int CR = 13; |
68 static final int CR = 13; |
75 static final int SP = 0x20; |
69 |
76 static final int BUF_SIZE = 1024; |
70 private boolean chunkedContent, chunkedContentInitialized; |
77 |
71 |
78 boolean chunkedContent, chunkedContentInitialized; |
72 boolean contentChunked() throws IOException { |
79 |
|
80 private boolean contentChunked() throws IOException { |
|
81 if (chunkedContentInitialized) { |
73 if (chunkedContentInitialized) { |
82 return chunkedContent; |
74 return chunkedContent; |
83 } |
75 } |
84 if (contentLength == -1) { |
76 if (contentLength == -1) { |
85 String tc = headers.firstValue("Transfer-Encoding") |
77 String tc = headers.firstValue("Transfer-Encoding") |
96 } |
88 } |
97 chunkedContentInitialized = true; |
89 chunkedContentInitialized = true; |
98 return chunkedContent; |
90 return chunkedContent; |
99 } |
91 } |
100 |
92 |
101 /** |
93 interface BodyParser extends Consumer<ByteBuffer> { |
102 * Entry point for pusher. b is an initial ByteBuffer that may |
94 void onSubscribe(AbstractSubscription sub); |
103 * have some data in it. When this method returns, the body |
95 } |
104 * has been fully processed. |
96 |
105 */ |
97 // Returns a parser that will take care of parsing the received byte |
106 void pushBody(ByteBuffer b) { |
98 // buffers and forward them to the BodySubscriber. |
107 try { |
99 // When the parser is done, it will call onComplete. |
108 // TODO: check status |
100 // If parsing was successful, the throwable parameter will be null. |
109 if (contentChunked()) { |
101 // Otherwise it will be the exception that occurred |
110 pushBodyChunked(b); |
102 // Note: revisit: it might be better to use a CompletableFuture than |
|
103 // a completion handler. |
|
104 BodyParser getBodyParser(Consumer<Throwable> onComplete) |
|
105 throws IOException { |
|
106 if (contentChunked()) { |
|
107 return new ChunkedBodyParser(onComplete); |
|
108 } else { |
|
109 return new FixedLengthBodyParser(contentLength, onComplete); |
|
110 } |
|
111 } |
|
112 |
|
113 |
|
114 static enum ChunkState {READING_LENGTH, READING_DATA, DONE} |
|
115 class ChunkedBodyParser implements BodyParser { |
|
116 final ByteBuffer READMORE = Utils.EMPTY_BYTEBUFFER; |
|
117 final Consumer<Throwable> onComplete; |
|
118 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); |
|
119 final String dbgTag = ResponseContent.this.dbgTag + "/ChunkedBodyParser"; |
|
120 |
|
121 volatile Throwable closedExceptionally; |
|
122 volatile int partialChunklen = 0; // partially read chunk len |
|
123 volatile int chunklen = -1; // number of bytes in chunk |
|
124 volatile int bytesremaining; // number of bytes in chunk left to be read incl CRLF |
|
125 volatile boolean cr = false; // tryReadChunkLength has found CR |
|
126 volatile int bytesToConsume; // number of bytes that still need to be consumed before proceeding |
|
127 volatile ChunkState state = ChunkState.READING_LENGTH; // current state |
|
128 volatile AbstractSubscription sub; |
|
129 ChunkedBodyParser(Consumer<Throwable> onComplete) { |
|
130 this.onComplete = onComplete; |
|
131 } |
|
132 |
|
133 String dbgString() { |
|
134 return dbgTag; |
|
135 } |
|
136 |
|
137 @Override |
|
138 public void onSubscribe(AbstractSubscription sub) { |
|
139 debug.log(Level.DEBUG, () -> "onSubscribe: " |
|
140 + pusher.getClass().getName()); |
|
141 pusher.onSubscribe(this.sub = sub); |
|
142 } |
|
143 |
|
144 @Override |
|
145 public void accept(ByteBuffer b) { |
|
146 if (closedExceptionally != null) { |
|
147 debug.log(Level.DEBUG, () -> "already closed: " |
|
148 + closedExceptionally); |
|
149 return; |
|
150 } |
|
151 boolean completed = false; |
|
152 try { |
|
153 List<ByteBuffer> out = new ArrayList<>(); |
|
154 do { |
|
155 if (tryPushOneHunk(b, out)) { |
|
156 // We're done! (true if the final chunk was parsed). |
|
157 if (!out.isEmpty()) { |
|
158 // push what we have and complete |
|
159 // only reduce demand if we actually push something. |
|
160 // we would not have come here if there was no |
|
161 // demand. |
|
162 boolean hasDemand = sub.demand().tryDecrement(); |
|
163 assert hasDemand; |
|
164 pusher.onNext(out); |
|
165 } |
|
166 debug.log(Level.DEBUG, () -> "done!"); |
|
167 assert closedExceptionally == null; |
|
168 assert state == ChunkState.DONE; |
|
169 onFinished.run(); |
|
170 pusher.onComplete(); |
|
171 completed = true; |
|
172 onComplete.accept(closedExceptionally); // should be null |
|
173 break; |
|
174 } |
|
175 // the buffer may contain several hunks, and therefore |
|
176 // we must loop while it's not exhausted. |
|
177 } while (b.hasRemaining()); |
|
178 |
|
179 if (!completed && !out.isEmpty()) { |
|
180 // push what we have. |
|
181 // only reduce demand if we actually push something. |
|
182 // we would not have come here if there was no |
|
183 // demand. |
|
184 boolean hasDemand = sub.demand().tryDecrement(); |
|
185 assert hasDemand; |
|
186 pusher.onNext(out); |
|
187 } |
|
188 assert state == ChunkState.DONE || !b.hasRemaining(); |
|
189 } catch(Throwable t) { |
|
190 closedExceptionally = t; |
|
191 if (!completed) onComplete.accept(t); |
|
192 } |
|
193 } |
|
194 |
|
195 // reads and returns chunklen. Position of chunkbuf is first byte |
|
196 // of chunk on return. chunklen includes the CR LF at end of chunk |
|
197 // returns -1 if needs more bytes |
|
198 private int tryReadChunkLen(ByteBuffer chunkbuf) throws IOException { |
|
199 assert state == ChunkState.READING_LENGTH; |
|
200 while (chunkbuf.hasRemaining()) { |
|
201 int c = chunkbuf.get(); |
|
202 if (cr) { |
|
203 if (c == LF) { |
|
204 return partialChunklen; |
|
205 } else { |
|
206 throw new IOException("invalid chunk header"); |
|
207 } |
|
208 } |
|
209 if (c == CR) { |
|
210 cr = true; |
|
211 } else { |
|
212 int digit = toDigit(c); |
|
213 partialChunklen = partialChunklen * 16 + digit; |
|
214 } |
|
215 } |
|
216 return -1; |
|
217 } |
|
218 |
|
219 |
|
220 // try to consume as many bytes as specified by bytesToConsume. |
|
221 // returns the number of bytes that still need to be consumed. |
|
222 // In practice this method is only called to consume one CRLF pair |
|
223 // with bytesToConsume set to 2, so it will only return 0 (if completed), |
|
224 // 1, or 2 (if chunkbuf doesn't have the 2 chars). |
|
225 private int tryConsumeBytes(ByteBuffer chunkbuf) throws IOException { |
|
226 int n = bytesToConsume; |
|
227 if (n > 0) { |
|
228 int e = Math.min(chunkbuf.remaining(), n); |
|
229 |
|
230 // verifies some assertions |
|
231 // this methods is called only to consume CRLF |
|
232 if (Utils.ASSERTIONSENABLED) { |
|
233 assert n <= 2 && e <= 2; |
|
234 ByteBuffer tmp = chunkbuf.slice(); |
|
235 // if n == 2 assert that we will first consume CR |
|
236 assert (n == 2 && e > 0) ? tmp.get() == CR : true; |
|
237 // if n == 1 || n == 2 && e == 2 assert that we then consume LF |
|
238 assert (n == 1 || e == 2) ? tmp.get() == LF : true; |
|
239 } |
|
240 |
|
241 chunkbuf.position(chunkbuf.position() + e); |
|
242 n -= e; |
|
243 bytesToConsume = n; |
|
244 } |
|
245 assert n >= 0; |
|
246 return n; |
|
247 } |
|
248 |
|
249 /** |
|
250 * Returns a ByteBuffer containing chunk of data or a "hunk" of data |
|
251 * (a chunk of a chunk if the chunk size is larger than our ByteBuffers). |
|
252 * If the given chunk does not have enough data this method return |
|
253 * an empty ByteBuffer (READMORE). |
|
254 * If we encounter the final chunk (an empty chunk) this method |
|
255 * returns null. |
|
256 */ |
|
257 ByteBuffer tryReadOneHunk(ByteBuffer chunk) throws IOException { |
|
258 int unfulfilled = bytesremaining; |
|
259 int toconsume = bytesToConsume; |
|
260 ChunkState st = state; |
|
261 if (st == ChunkState.READING_LENGTH && chunklen == -1) { |
|
262 debug.log(Level.DEBUG, () -> "Trying to read chunk len" |
|
263 + " (remaining in buffer:"+chunk.remaining()+")"); |
|
264 int clen = chunklen = tryReadChunkLen(chunk); |
|
265 if (clen == -1) return READMORE; |
|
266 debug.log(Level.DEBUG, "Got chunk len %d", clen); |
|
267 cr = false; partialChunklen = 0; |
|
268 unfulfilled = bytesremaining = clen; |
|
269 if (clen == 0) toconsume = bytesToConsume = 2; // that was the last chunk |
|
270 else st = state = ChunkState.READING_DATA; // read the data |
|
271 } |
|
272 |
|
273 if (toconsume > 0) { |
|
274 debug.log(Level.DEBUG, |
|
275 "Trying to consume bytes: %d (remaining in buffer: %s)", |
|
276 toconsume, chunk.remaining()); |
|
277 if (tryConsumeBytes(chunk) > 0) { |
|
278 return READMORE; |
|
279 } |
|
280 } |
|
281 |
|
282 toconsume = bytesToConsume; |
|
283 assert toconsume == 0; |
|
284 |
|
285 |
|
286 if (st == ChunkState.READING_LENGTH) { |
|
287 // we will come here only if chunklen was 0, after having |
|
288 // consumed the trailing CRLF |
|
289 int clen = chunklen; |
|
290 assert clen == 0; |
|
291 debug.log(Level.DEBUG, "No more chunks: %d", clen); |
|
292 // the DONE state is not really needed but it helps with |
|
293 // assertions... |
|
294 state = ChunkState.DONE; |
|
295 return null; |
|
296 } |
|
297 |
|
298 int clen = chunklen; |
|
299 assert clen > 0; |
|
300 assert st == ChunkState.READING_DATA; |
|
301 |
|
302 ByteBuffer returnBuffer = READMORE; // May be a hunk or a chunk |
|
303 if (unfulfilled > 0) { |
|
304 int bytesread = chunk.remaining(); |
|
305 debug.log(Level.DEBUG, "Reading chunk: available %d, needed %d", |
|
306 bytesread, unfulfilled); |
|
307 |
|
308 int bytes2return = Math.min(bytesread, unfulfilled); |
|
309 debug.log(Level.DEBUG, "Returning chunk bytes: %d", bytes2return); |
|
310 returnBuffer = Utils.slice(chunk, bytes2return); |
|
311 unfulfilled = bytesremaining -= bytes2return; |
|
312 if (unfulfilled == 0) bytesToConsume = 2; |
|
313 } |
|
314 |
|
315 assert unfulfilled >= 0; |
|
316 |
|
317 if (unfulfilled == 0) { |
|
318 debug.log(Level.DEBUG, |
|
319 "No more bytes to read - %d yet to consume.", |
|
320 unfulfilled); |
|
321 // check whether the trailing CRLF is consumed, try to |
|
322 // consume it if not. If tryConsumeBytes needs more bytes |
|
323 // then we will come back here later - skipping the block |
|
324 // that reads data because remaining==0, and finding |
|
325 // that the two bytes are now consumed. |
|
326 if (tryConsumeBytes(chunk) == 0) { |
|
327 // we're done for this chunk! reset all states and |
|
328 // prepare to read the next chunk. |
|
329 chunklen = -1; |
|
330 partialChunklen = 0; |
|
331 cr = false; |
|
332 state = ChunkState.READING_LENGTH; |
|
333 debug.log(Level.DEBUG, "Ready to read next chunk"); |
|
334 } |
|
335 } |
|
336 if (returnBuffer == READMORE) { |
|
337 debug.log(Level.DEBUG, "Need more data"); |
|
338 } |
|
339 return returnBuffer; |
|
340 } |
|
341 |
|
342 |
|
343 // Attempt to parse and push one hunk from the buffer. |
|
344 // Returns true if the final chunk was parsed. |
|
345 // Returns false if we need to push more chunks. |
|
346 private boolean tryPushOneHunk(ByteBuffer b, List<ByteBuffer> out) |
|
347 throws IOException { |
|
348 assert state != ChunkState.DONE; |
|
349 ByteBuffer b1 = tryReadOneHunk(b); |
|
350 if (b1 != null) { |
|
351 //assert b1.hasRemaining() || b1 == READMORE; |
|
352 if (b1.hasRemaining()) { |
|
353 debug.log(Level.DEBUG, "Sending chunk to consumer (%d)", |
|
354 b1.remaining()); |
|
355 out.add(b1); |
|
356 debug.log(Level.DEBUG, "Chunk sent."); |
|
357 } |
|
358 return false; // we haven't parsed the final chunk yet. |
111 } else { |
359 } else { |
112 pushBodyFixed(b); |
360 return true; // we're done! the final chunk was parsed. |
113 } |
361 } |
114 } catch (IOException t) { |
362 } |
115 errorConsumer.accept(t); |
363 |
116 } |
364 private int toDigit(int b) throws IOException { |
117 } |
365 if (b >= 0x30 && b <= 0x39) { |
118 |
366 return b - 0x30; |
119 // reads and returns chunklen. Position of chunkbuf is first byte |
367 } |
120 // of chunk on return. chunklen includes the CR LF at end of chunk |
368 if (b >= 0x41 && b <= 0x46) { |
121 int readChunkLen() throws IOException { |
369 return b - 0x41 + 10; |
122 chunklen = 0; |
370 } |
123 boolean cr = false; |
371 if (b >= 0x61 && b <= 0x66) { |
124 while (true) { |
372 return b - 0x61 + 10; |
125 getHunk(); |
373 } |
126 int c = chunkbuf.get(); |
374 throw new IOException("Invalid chunk header byte " + b); |
127 if (cr) { |
375 } |
128 if (c == LF) { |
376 |
129 return chunklen + 2; |
377 } |
|
378 |
|
379 class FixedLengthBodyParser implements BodyParser { |
|
380 final int contentLength; |
|
381 final Consumer<Throwable> onComplete; |
|
382 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); |
|
383 final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser"; |
|
384 volatile int remaining; |
|
385 volatile Throwable closedExceptionally; |
|
386 volatile AbstractSubscription sub; |
|
387 FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) { |
|
388 this.contentLength = this.remaining = contentLength; |
|
389 this.onComplete = onComplete; |
|
390 } |
|
391 |
|
392 String dbgString() { |
|
393 return dbgTag; |
|
394 } |
|
395 |
|
396 @Override |
|
397 public void onSubscribe(AbstractSubscription sub) { |
|
398 debug.log(Level.DEBUG, () -> "length=" |
|
399 + contentLength +", onSubscribe: " |
|
400 + pusher.getClass().getName()); |
|
401 pusher.onSubscribe(this.sub = sub); |
|
402 try { |
|
403 if (contentLength == 0) { |
|
404 pusher.onComplete(); |
|
405 onFinished.run(); |
|
406 onComplete.accept(null); |
|
407 } |
|
408 } catch (Throwable t) { |
|
409 closedExceptionally = t; |
|
410 try { |
|
411 pusher.onError(t); |
|
412 } finally { |
|
413 onComplete.accept(t); |
|
414 } |
|
415 } |
|
416 } |
|
417 |
|
418 @Override |
|
419 public void accept(ByteBuffer b) { |
|
420 if (closedExceptionally != null) { |
|
421 debug.log(Level.DEBUG, () -> "already closed: " |
|
422 + closedExceptionally); |
|
423 return; |
|
424 } |
|
425 boolean completed = false; |
|
426 try { |
|
427 int unfulfilled = remaining; |
|
428 debug.log(Level.DEBUG, "Parser got %d bytes (%d remaining / %d)", |
|
429 b.remaining(), unfulfilled, contentLength); |
|
430 assert unfulfilled != 0 || contentLength == 0 || b.remaining() == 0; |
|
431 |
|
432 if (unfulfilled == 0 && contentLength > 0) return; |
|
433 |
|
434 if (b.hasRemaining() && unfulfilled > 0) { |
|
435 // only reduce demand if we actually push something. |
|
436 // we would not have come here if there was no |
|
437 // demand. |
|
438 boolean hasDemand = sub.demand().tryDecrement(); |
|
439 assert hasDemand; |
|
440 int amount = Math.min(b.remaining(), unfulfilled); |
|
441 unfulfilled = remaining -= amount; |
|
442 ByteBuffer buffer = Utils.slice(b, amount); |
|
443 pusher.onNext(List.of(buffer)); |
|
444 } |
|
445 if (unfulfilled == 0) { |
|
446 // We're done! All data has been received. |
|
447 assert closedExceptionally == null; |
|
448 onFinished.run(); |
|
449 pusher.onComplete(); |
|
450 completed = true; |
|
451 onComplete.accept(closedExceptionally); // should be null |
130 } else { |
452 } else { |
131 throw new IOException("invalid chunk header"); |
453 assert b.remaining() == 0; |
132 } |
454 } |
133 } |
455 } catch (Throwable t) { |
134 if (c == CR) { |
456 debug.log(Level.DEBUG, "Unexpected exception", t); |
135 cr = true; |
457 closedExceptionally = t; |
136 } else { |
458 if (!completed) { |
137 int digit = toDigit(c); |
459 onComplete.accept(t); |
138 chunklen = chunklen * 16 + digit; |
460 } |
139 } |
461 } |
140 } |
462 } |
141 } |
|
142 |
|
143 int chunklen = -1; // number of bytes in chunk (fixed) |
|
144 int bytesremaining; // number of bytes in chunk left to be read incl CRLF |
|
145 int bytesread; |
|
146 ByteBuffer chunkbuf; // initialise |
|
147 |
|
148 // make sure we have at least 1 byte to look at |
|
149 private void getHunk() throws IOException { |
|
150 if (chunkbuf == null || !chunkbuf.hasRemaining()) { |
|
151 chunkbuf = connection.read(); |
|
152 } |
|
153 } |
|
154 |
|
155 private void consumeBytes(int n) throws IOException { |
|
156 getHunk(); |
|
157 while (n > 0) { |
|
158 int e = Math.min(chunkbuf.remaining(), n); |
|
159 chunkbuf.position(chunkbuf.position() + e); |
|
160 n -= e; |
|
161 if (n > 0) { |
|
162 getHunk(); |
|
163 } |
|
164 } |
|
165 } |
|
166 |
|
167 /** |
|
168 * Returns a ByteBuffer containing a chunk of data or a "hunk" of data |
|
169 * (a chunk of a chunk if the chunk size is larger than our ByteBuffers). |
|
170 * ByteBuffer returned is obtained from response processor. |
|
171 */ |
|
172 ByteBuffer readChunkedBuffer() throws IOException { |
|
173 if (chunklen == -1) { |
|
174 // new chunk |
|
175 chunklen = readChunkLen() - 2; |
|
176 bytesremaining = chunklen; |
|
177 if (chunklen == 0) { |
|
178 consumeBytes(2); |
|
179 return null; |
|
180 } |
|
181 } |
|
182 |
|
183 getHunk(); |
|
184 bytesread = chunkbuf.remaining(); |
|
185 ByteBuffer returnBuffer = Utils.getBuffer(); |
|
186 int space = returnBuffer.remaining(); |
|
187 |
|
188 int bytes2Copy = Math.min(bytesread, Math.min(bytesremaining, space)); |
|
189 Utils.copy(chunkbuf, returnBuffer, bytes2Copy); |
|
190 returnBuffer.flip(); |
|
191 bytesremaining -= bytes2Copy; |
|
192 if (bytesremaining == 0) { |
|
193 consumeBytes(2); |
|
194 chunklen = -1; |
|
195 } |
|
196 return returnBuffer; |
|
197 } |
|
198 |
|
199 ByteBuffer initialBuffer; |
|
200 int fixedBytesReturned; |
|
201 |
|
202 //ByteBuffer getResidue() { |
|
203 //return lastBufferUsed; |
|
204 //} |
|
205 |
|
206 private void compactBuffer(ByteBuffer buf) { |
|
207 buf.compact() |
|
208 .flip(); |
|
209 } |
|
210 |
|
211 /** |
|
212 * Copies inbuf (numBytes from its position) to new buffer. The returned |
|
213 * buffer's position is zero and limit is at end (numBytes) |
|
214 */ |
|
215 private ByteBuffer copyBuffer(ByteBuffer inbuf, int numBytes) { |
|
216 ByteBuffer b1 = Utils.getBuffer(); |
|
217 assert b1.remaining() >= numBytes; |
|
218 byte[] b = b1.array(); |
|
219 inbuf.get(b, 0, numBytes); |
|
220 b1.limit(numBytes); |
|
221 return b1; |
|
222 } |
|
223 |
|
224 private void pushBodyChunked(ByteBuffer b) throws IOException { |
|
225 chunkbuf = b; |
|
226 while (true) { |
|
227 ByteBuffer b1 = readChunkedBuffer(); |
|
228 if (b1 != null) { |
|
229 if (b1.hasRemaining()) { |
|
230 dataConsumer.accept(Optional.of(b1)); |
|
231 } |
|
232 } else { |
|
233 onFinished.run(); |
|
234 dataConsumer.accept(Optional.empty()); |
|
235 return; |
|
236 } |
|
237 } |
|
238 } |
|
239 |
|
240 private int toDigit(int b) throws IOException { |
|
241 if (b >= 0x30 && b <= 0x39) { |
|
242 return b - 0x30; |
|
243 } |
|
244 if (b >= 0x41 && b <= 0x46) { |
|
245 return b - 0x41 + 10; |
|
246 } |
|
247 if (b >= 0x61 && b <= 0x66) { |
|
248 return b - 0x61 + 10; |
|
249 } |
|
250 throw new IOException("Invalid chunk header byte " + b); |
|
251 } |
|
252 |
|
253 private void pushBodyFixed(ByteBuffer b) throws IOException { |
|
254 int remaining = contentLength; |
|
255 while (b.hasRemaining() && remaining > 0) { |
|
256 ByteBuffer buffer = Utils.getBuffer(); |
|
257 int amount = Math.min(b.remaining(), remaining); |
|
258 Utils.copy(b, buffer, amount); |
|
259 remaining -= amount; |
|
260 buffer.flip(); |
|
261 dataConsumer.accept(Optional.of(buffer)); |
|
262 } |
|
263 while (remaining > 0) { |
|
264 ByteBuffer buffer = connection.read(); |
|
265 if (buffer == null) |
|
266 throw new IOException("connection closed"); |
|
267 |
|
268 int bytesread = buffer.remaining(); |
|
269 // assume for now that pipelining not implemented |
|
270 if (bytesread > remaining) { |
|
271 throw new IOException("too many bytes read"); |
|
272 } |
|
273 remaining -= bytesread; |
|
274 dataConsumer.accept(Optional.of(buffer)); |
|
275 } |
|
276 onFinished.run(); |
|
277 dataConsumer.accept(Optional.empty()); |
|
278 } |
463 } |
279 } |
464 } |