equal
deleted
inserted
replaced
57 public final class RequestPublishers { |
57 public final class RequestPublishers { |
58 |
58 |
59 private RequestPublishers() { } |
59 private RequestPublishers() { } |
60 |
60 |
61 public static class ByteArrayPublisher implements BodyPublisher { |
61 public static class ByteArrayPublisher implements BodyPublisher { |
62 private volatile Flow.Publisher<ByteBuffer> delegate; |
|
63 private final int length; |
62 private final int length; |
64 private final byte[] content; |
63 private final byte[] content; |
65 private final int offset; |
64 private final int offset; |
66 private final int bufSize; |
65 private final int bufSize; |
67 |
66 |
97 } |
96 } |
98 |
97 |
99 @Override |
98 @Override |
100 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { |
99 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { |
101 List<ByteBuffer> copy = copy(content, offset, length); |
100 List<ByteBuffer> copy = copy(content, offset, length); |
102 this.delegate = new PullPublisher<>(copy); |
101 var delegate = new PullPublisher<>(copy); |
103 delegate.subscribe(subscriber); |
102 delegate.subscribe(subscriber); |
104 } |
103 } |
105 |
104 |
106 @Override |
105 @Override |
107 public long contentLength() { |
106 public long contentLength() { |
109 } |
108 } |
110 } |
109 } |
111 |
110 |
112 // This implementation has lots of room for improvement. |
111 // This implementation has lots of room for improvement. |
113 public static class IterablePublisher implements BodyPublisher { |
112 public static class IterablePublisher implements BodyPublisher { |
114 private volatile Flow.Publisher<ByteBuffer> delegate; |
|
115 private final Iterable<byte[]> content; |
113 private final Iterable<byte[]> content; |
116 private volatile long contentLength; |
114 private volatile long contentLength; |
117 |
115 |
118 public IterablePublisher(Iterable<byte[]> content) { |
116 public IterablePublisher(Iterable<byte[]> content) { |
119 this.content = Objects.requireNonNull(content); |
117 this.content = Objects.requireNonNull(content); |
172 } |
170 } |
173 |
171 |
174 @Override |
172 @Override |
175 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { |
173 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { |
176 Iterable<ByteBuffer> iterable = this::iterator; |
174 Iterable<ByteBuffer> iterable = this::iterator; |
177 this.delegate = new PullPublisher<>(iterable); |
175 var delegate = new PullPublisher<>(iterable); |
178 delegate.subscribe(subscriber); |
176 delegate.subscribe(subscriber); |
179 } |
177 } |
180 |
178 |
181 static long computeLength(Iterable<byte[]> bytes) { |
179 static long computeLength(Iterable<byte[]> bytes) { |
182 long len = 0; |
180 long len = 0; |