src/java.net.http/share/classes/jdk/internal/net/http/ResponseContent.java
changeset 51462 9d7d74c6f2cb
parent 50681 4254bed3c09d
child 52283 ef0fed0a3953
child 56868 67c7659ecda5
--- a/src/java.net.http/share/classes/jdk/internal/net/http/ResponseContent.java	Thu Aug 16 02:00:31 2018 +0800
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/ResponseContent.java	Thu Aug 16 10:22:48 2018 +0100
@@ -75,6 +75,12 @@
         if (chunkedContentInitialized) {
             return chunkedContent;
         }
+        if (contentLength == -2) {
+            // HTTP/1.0 content
+            chunkedContentInitialized = true;
+            chunkedContent = false;
+            return chunkedContent;
+        }
         if (contentLength == -1) {
             String tc = headers.firstValue("Transfer-Encoding")
                                .orElse("");
@@ -111,7 +117,9 @@
         if (contentChunked()) {
             return new ChunkedBodyParser(onComplete);
         } else {
-            return new FixedLengthBodyParser(contentLength, onComplete);
+            return contentLength == -2
+                ? new UnknownLengthBodyParser(onComplete)
+                : new FixedLengthBodyParser(contentLength, onComplete);
         }
     }
 
@@ -392,6 +400,79 @@
 
     }
 
+    class UnknownLengthBodyParser implements BodyParser {
+        final Consumer<Throwable> onComplete;
+        final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
+        final String dbgTag = ResponseContent.this.dbgTag + "/UnknownLengthBodyParser";
+        volatile Throwable closedExceptionally;
+        volatile AbstractSubscription sub;
+        volatile int breceived = 0;
+
+        UnknownLengthBodyParser(Consumer<Throwable> onComplete) {
+            this.onComplete = onComplete;
+        }
+
+        String dbgString() {
+            return dbgTag;
+        }
+
+        @Override
+        public void onSubscribe(AbstractSubscription sub) {
+            if (debug.on())
+                debug.log("onSubscribe: " + pusher.getClass().getName());
+            pusher.onSubscribe(this.sub = sub);
+        }
+
+        @Override
+        public String currentStateMessage() {
+            return format("http1_0 content, bytes received: %d", breceived);
+        }
+
+        @Override
+        public void accept(ByteBuffer b) {
+            if (closedExceptionally != null) {
+                if (debug.on())
+                    debug.log("already closed: " + closedExceptionally);
+                return;
+            }
+            boolean completed = false;
+            try {
+                if (debug.on())
+                    debug.log("Parser got %d bytes ", b.remaining());
+
+                if (b.hasRemaining()) {
+                    // only reduce demand if we actually push something.
+                    // we would not have come here if there was no
+                    // demand.
+                    boolean hasDemand = sub.demand().tryDecrement();
+                    assert hasDemand;
+                    breceived += b.remaining();
+                    pusher.onNext(List.of(b.asReadOnlyBuffer()));
+                }
+            } catch (Throwable t) {
+                if (debug.on()) debug.log("Unexpected exception", t);
+                closedExceptionally = t;
+                if (!completed) {
+                    onComplete.accept(t);
+                }
+            }
+        }
+
+        /**
+         * Must be called externally when connection has closed
+         * and therefore no more bytes can be read
+         */
+        public void complete() {
+            // We're done! All data has been received.
+            if (debug.on())
+                debug.log("Parser got all expected bytes: completing");
+            assert closedExceptionally == null;
+            onFinished.run();
+            pusher.onComplete();
+            onComplete.accept(closedExceptionally); // should be null
+        }
+    }
+
     class FixedLengthBodyParser implements BodyParser {
         final int contentLength;
         final Consumer<Throwable> onComplete;