src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java
equal
deleted
inserted
replaced
41 import java.util.ArrayList; |
41 import java.util.ArrayList; |
42 import java.util.Collections; |
42 import java.util.Collections; |
43 import java.util.Iterator; |
43 import java.util.Iterator; |
44 import java.util.List; |
44 import java.util.List; |
45 import java.util.NoSuchElementException; |
45 import java.util.NoSuchElementException; |
|
46 import java.util.Objects; |
46 import java.util.concurrent.ConcurrentLinkedQueue; |
47 import java.util.concurrent.ConcurrentLinkedQueue; |
47 import java.util.concurrent.Flow; |
48 import java.util.concurrent.Flow; |
|
49 import java.util.concurrent.Flow.Publisher; |
48 import java.util.function.Supplier; |
50 import java.util.function.Supplier; |
49 import jdk.incubator.http.HttpRequest.BodyPublisher; |
51 import jdk.incubator.http.HttpRequest.BodyPublisher; |
50 import jdk.incubator.http.internal.common.Utils; |
52 import jdk.incubator.http.internal.common.Utils; |
51 |
53 |
52 class RequestPublishers { |
54 class RequestPublishers { |
107 private volatile Flow.Publisher<ByteBuffer> delegate; |
109 private volatile Flow.Publisher<ByteBuffer> delegate; |
108 private final Iterable<byte[]> content; |
110 private final Iterable<byte[]> content; |
109 private volatile long contentLength; |
111 private volatile long contentLength; |
110 |
112 |
111 IterablePublisher(Iterable<byte[]> content) { |
113 IterablePublisher(Iterable<byte[]> content) { |
112 this.content = content; |
114 this.content = Objects.requireNonNull(content); |
113 } |
115 } |
114 |
116 |
115 // The ByteBufferIterator will iterate over the byte[] arrays in |
117 // The ByteBufferIterator will iterate over the byte[] arrays in |
116 // the content one at the time. |
118 // the content one at the time. |
117 // |
119 // |
321 |
323 |
322 static class InputStreamPublisher implements BodyPublisher { |
324 static class InputStreamPublisher implements BodyPublisher { |
323 private final Supplier<? extends InputStream> streamSupplier; |
325 private final Supplier<? extends InputStream> streamSupplier; |
324 |
326 |
325 InputStreamPublisher(Supplier<? extends InputStream> streamSupplier) { |
327 InputStreamPublisher(Supplier<? extends InputStream> streamSupplier) { |
326 this.streamSupplier = streamSupplier; |
328 this.streamSupplier = Objects.requireNonNull(streamSupplier); |
327 } |
329 } |
328 |
330 |
329 @Override |
331 @Override |
330 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { |
332 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { |
331 PullPublisher<ByteBuffer> publisher; |
333 PullPublisher<ByteBuffer> publisher; |
346 @Override |
348 @Override |
347 public long contentLength() { |
349 public long contentLength() { |
348 return -1; |
350 return -1; |
349 } |
351 } |
350 } |
352 } |
|
353 |
|
354 static final class PublisherAdapter implements BodyPublisher { |
|
355 |
|
356 private final Publisher<? extends ByteBuffer> publisher; |
|
357 private final long contentLength; |
|
358 |
|
359 PublisherAdapter(Publisher<? extends ByteBuffer> publisher, |
|
360 long contentLength) { |
|
361 this.publisher = Objects.requireNonNull(publisher); |
|
362 this.contentLength = contentLength; |
|
363 } |
|
364 |
|
365 @Override |
|
366 public final long contentLength() { |
|
367 return contentLength; |
|
368 } |
|
369 |
|
370 @Override |
|
371 public final void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { |
|
372 publisher.subscribe(subscriber); |
|
373 } |
|
374 } |
351 } |
375 } |