src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java
branchhttp-client-branch
changeset 56008 bbd688c6fbbb
parent 55973 4d9b002587db
parent 48408 4f830b447edf
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java	Fri Dec 15 14:26:23 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java	Fri Jan 05 14:11:48 2018 +0000
@@ -43,8 +43,10 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Publisher;
 import java.util.function.Supplier;
 import jdk.incubator.http.HttpRequest.BodyPublisher;
 import jdk.incubator.http.internal.common.Utils;
@@ -109,7 +111,7 @@
         private volatile long contentLength;
 
         IterablePublisher(Iterable<byte[]> content) {
-            this.content = content;
+            this.content = Objects.requireNonNull(content);
         }
 
         // The ByteBufferIterator will iterate over the byte[] arrays in
@@ -323,7 +325,7 @@
         private final Supplier<? extends InputStream> streamSupplier;
 
         InputStreamPublisher(Supplier<? extends InputStream> streamSupplier) {
-            this.streamSupplier = streamSupplier;
+            this.streamSupplier = Objects.requireNonNull(streamSupplier);
         }
 
         @Override
@@ -348,4 +350,26 @@
             return -1;
         }
     }
+
+    static final class PublisherAdapter implements BodyPublisher {
+
+        private final Publisher<? extends ByteBuffer> publisher;
+        private final long contentLength;
+
+        PublisherAdapter(Publisher<? extends ByteBuffer> publisher,
+                         long contentLength) {
+            this.publisher = Objects.requireNonNull(publisher);
+            this.contentLength = contentLength;
+        }
+
+        @Override
+        public final long contentLength() {
+            return contentLength;
+        }
+
+        @Override
+        public final void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
+            publisher.subscribe(subscriber);
+        }
+    }
 }