73 |
73 |
74 boolean contentChunked() throws IOException { |
74 boolean contentChunked() throws IOException { |
75 if (chunkedContentInitialized) { |
75 if (chunkedContentInitialized) { |
76 return chunkedContent; |
76 return chunkedContent; |
77 } |
77 } |
|
78 if (contentLength == -2) { |
|
79 // HTTP/1.0 content |
|
80 chunkedContentInitialized = true; |
|
81 chunkedContent = false; |
|
82 return chunkedContent; |
|
83 } |
78 if (contentLength == -1) { |
84 if (contentLength == -1) { |
79 String tc = headers.firstValue("Transfer-Encoding") |
85 String tc = headers.firstValue("Transfer-Encoding") |
80 .orElse(""); |
86 .orElse(""); |
81 if (!tc.equals("")) { |
87 if (!tc.equals("")) { |
82 if (tc.equalsIgnoreCase("chunked")) { |
88 if (tc.equalsIgnoreCase("chunked")) { |
109 BodyParser getBodyParser(Consumer<Throwable> onComplete) |
115 BodyParser getBodyParser(Consumer<Throwable> onComplete) |
110 throws IOException { |
116 throws IOException { |
111 if (contentChunked()) { |
117 if (contentChunked()) { |
112 return new ChunkedBodyParser(onComplete); |
118 return new ChunkedBodyParser(onComplete); |
113 } else { |
119 } else { |
114 return new FixedLengthBodyParser(contentLength, onComplete); |
120 return contentLength == -2 |
|
121 ? new UnknownLengthBodyParser(onComplete) |
|
122 : new FixedLengthBodyParser(contentLength, onComplete); |
115 } |
123 } |
116 } |
124 } |
117 |
125 |
118 |
126 |
119 static enum ChunkState {READING_LENGTH, READING_DATA, DONE} |
127 static enum ChunkState {READING_LENGTH, READING_DATA, DONE} |
390 throw new IOException("Invalid chunk header byte " + b); |
398 throw new IOException("Invalid chunk header byte " + b); |
391 } |
399 } |
392 |
400 |
393 } |
401 } |
394 |
402 |
|
403 class UnknownLengthBodyParser implements BodyParser { |
|
404 final Consumer<Throwable> onComplete; |
|
405 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
|
406 final String dbgTag = ResponseContent.this.dbgTag + "/UnknownLengthBodyParser"; |
|
407 volatile Throwable closedExceptionally; |
|
408 volatile AbstractSubscription sub; |
|
409 volatile int breceived = 0; |
|
410 |
|
411 UnknownLengthBodyParser(Consumer<Throwable> onComplete) { |
|
412 this.onComplete = onComplete; |
|
413 } |
|
414 |
|
415 String dbgString() { |
|
416 return dbgTag; |
|
417 } |
|
418 |
|
419 @Override |
|
420 public void onSubscribe(AbstractSubscription sub) { |
|
421 if (debug.on()) |
|
422 debug.log("onSubscribe: " + pusher.getClass().getName()); |
|
423 pusher.onSubscribe(this.sub = sub); |
|
424 } |
|
425 |
|
426 @Override |
|
427 public String currentStateMessage() { |
|
428 return format("http1_0 content, bytes received: %d", breceived); |
|
429 } |
|
430 |
|
431 @Override |
|
432 public void accept(ByteBuffer b) { |
|
433 if (closedExceptionally != null) { |
|
434 if (debug.on()) |
|
435 debug.log("already closed: " + closedExceptionally); |
|
436 return; |
|
437 } |
|
438 boolean completed = false; |
|
439 try { |
|
440 if (debug.on()) |
|
441 debug.log("Parser got %d bytes ", b.remaining()); |
|
442 |
|
443 if (b.hasRemaining()) { |
|
444 // only reduce demand if we actually push something. |
|
445 // we would not have come here if there was no |
|
446 // demand. |
|
447 boolean hasDemand = sub.demand().tryDecrement(); |
|
448 assert hasDemand; |
|
449 breceived += b.remaining(); |
|
450 pusher.onNext(List.of(b.asReadOnlyBuffer())); |
|
451 } |
|
452 } catch (Throwable t) { |
|
453 if (debug.on()) debug.log("Unexpected exception", t); |
|
454 closedExceptionally = t; |
|
455 if (!completed) { |
|
456 onComplete.accept(t); |
|
457 } |
|
458 } |
|
459 } |
|
460 |
|
461 /** |
|
462 * Must be called externally when connection has closed |
|
463 * and therefore no more bytes can be read |
|
464 */ |
|
465 public void complete() { |
|
466 // We're done! All data has been received. |
|
467 if (debug.on()) |
|
468 debug.log("Parser got all expected bytes: completing"); |
|
469 assert closedExceptionally == null; |
|
470 onFinished.run(); |
|
471 pusher.onComplete(); |
|
472 onComplete.accept(closedExceptionally); // should be null |
|
473 } |
|
474 } |
|
475 |
395 class FixedLengthBodyParser implements BodyParser { |
476 class FixedLengthBodyParser implements BodyParser { |
396 final int contentLength; |
477 final int contentLength; |
397 final Consumer<Throwable> onComplete; |
478 final Consumer<Throwable> onComplete; |
398 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
479 final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); |
399 final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser"; |
480 final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser"; |