src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java
branchhttp-client-branch
changeset 56008 bbd688c6fbbb
parent 55973 4d9b002587db
parent 48408 4f830b447edf
equal deleted inserted replaced
55993:087a6b6d4955 56008:bbd688c6fbbb
    41 import java.util.ArrayList;
    41 import java.util.ArrayList;
    42 import java.util.Collections;
    42 import java.util.Collections;
    43 import java.util.Iterator;
    43 import java.util.Iterator;
    44 import java.util.List;
    44 import java.util.List;
    45 import java.util.NoSuchElementException;
    45 import java.util.NoSuchElementException;
       
    46 import java.util.Objects;
    46 import java.util.concurrent.ConcurrentLinkedQueue;
    47 import java.util.concurrent.ConcurrentLinkedQueue;
    47 import java.util.concurrent.Flow;
    48 import java.util.concurrent.Flow;
       
    49 import java.util.concurrent.Flow.Publisher;
    48 import java.util.function.Supplier;
    50 import java.util.function.Supplier;
    49 import jdk.incubator.http.HttpRequest.BodyPublisher;
    51 import jdk.incubator.http.HttpRequest.BodyPublisher;
    50 import jdk.incubator.http.internal.common.Utils;
    52 import jdk.incubator.http.internal.common.Utils;
    51 
    53 
    52 class RequestPublishers {
    54 class RequestPublishers {
   107         private volatile Flow.Publisher<ByteBuffer> delegate;
   109         private volatile Flow.Publisher<ByteBuffer> delegate;
   108         private final Iterable<byte[]> content;
   110         private final Iterable<byte[]> content;
   109         private volatile long contentLength;
   111         private volatile long contentLength;
   110 
   112 
   111         IterablePublisher(Iterable<byte[]> content) {
   113         IterablePublisher(Iterable<byte[]> content) {
   112             this.content = content;
   114             this.content = Objects.requireNonNull(content);
   113         }
   115         }
   114 
   116 
   115         // The ByteBufferIterator will iterate over the byte[] arrays in
   117         // The ByteBufferIterator will iterate over the byte[] arrays in
   116         // the content one at the time.
   118         // the content one at the time.
   117         //
   119         //
   321 
   323 
   322     static class InputStreamPublisher implements BodyPublisher {
   324     static class InputStreamPublisher implements BodyPublisher {
   323         private final Supplier<? extends InputStream> streamSupplier;
   325         private final Supplier<? extends InputStream> streamSupplier;
   324 
   326 
   325         InputStreamPublisher(Supplier<? extends InputStream> streamSupplier) {
   327         InputStreamPublisher(Supplier<? extends InputStream> streamSupplier) {
   326             this.streamSupplier = streamSupplier;
   328             this.streamSupplier = Objects.requireNonNull(streamSupplier);
   327         }
   329         }
   328 
   330 
   329         @Override
   331         @Override
   330         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
   332         public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
   331             PullPublisher<ByteBuffer> publisher;
   333             PullPublisher<ByteBuffer> publisher;
   346         @Override
   348         @Override
   347         public long contentLength() {
   349         public long contentLength() {
   348             return -1;
   350             return -1;
   349         }
   351         }
   350     }
   352     }
       
   353 
       
   354     static final class PublisherAdapter implements BodyPublisher {
       
   355 
       
   356         private final Publisher<? extends ByteBuffer> publisher;
       
   357         private final long contentLength;
       
   358 
       
   359         PublisherAdapter(Publisher<? extends ByteBuffer> publisher,
       
   360                          long contentLength) {
       
   361             this.publisher = Objects.requireNonNull(publisher);
       
   362             this.contentLength = contentLength;
       
   363         }
       
   364 
       
   365         @Override
       
   366         public final long contentLength() {
       
   367             return contentLength;
       
   368         }
       
   369 
       
   370         @Override
       
   371         public final void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
       
   372             publisher.subscribe(subscriber);
       
   373         }
       
   374     }
   351 }
   375 }