123 return result; |
123 return result; |
124 } |
124 } |
125 |
125 |
126 @Override |
126 @Override |
127 public void onSubscribe(Flow.Subscription subscription) { |
127 public void onSubscribe(Flow.Subscription subscription) { |
|
128 Objects.requireNonNull(subscription); |
128 if (!subscribed.compareAndSet(false, true)) { |
129 if (!subscribed.compareAndSet(false, true)) { |
129 subscription.cancel(); |
130 subscription.cancel(); |
130 } else { |
131 } else { |
131 this.subscription = subscription; |
132 this.subscription = subscription; |
132 subscription.request(1); |
133 subscription.request(1); |
133 } |
134 } |
134 } |
135 } |
135 |
136 |
136 @Override |
137 @Override |
137 public void onNext(List<ByteBuffer> items) { |
138 public void onNext(List<ByteBuffer> items) { |
|
139 Objects.requireNonNull(items); |
138 for (ByteBuffer item : items) { |
140 for (ByteBuffer item : items) { |
139 byte[] buf = new byte[item.remaining()]; |
141 byte[] buf = new byte[item.remaining()]; |
140 item.get(buf); |
142 item.get(buf); |
141 consumer.accept(Optional.of(buf)); |
143 consumer.accept(Optional.of(buf)); |
142 } |
144 } |
143 subscription.request(1); |
145 subscription.request(1); |
144 } |
146 } |
145 |
147 |
146 @Override |
148 @Override |
147 public void onError(Throwable throwable) { |
149 public void onError(Throwable throwable) { |
|
150 Objects.requireNonNull(throwable); |
148 result.completeExceptionally(throwable); |
151 result.completeExceptionally(throwable); |
149 } |
152 } |
150 |
153 |
151 @Override |
154 @Override |
152 public void onComplete() { |
155 public void onComplete() { |
170 private final Path file; |
173 private final Path file; |
171 private final OpenOption[] options; |
174 private final OpenOption[] options; |
172 private final FilePermission[] filePermissions; |
175 private final FilePermission[] filePermissions; |
173 private final CompletableFuture<Path> result = new MinimalFuture<>(); |
176 private final CompletableFuture<Path> result = new MinimalFuture<>(); |
174 |
177 |
|
178 private final AtomicBoolean subscribed = new AtomicBoolean(); |
175 private volatile Flow.Subscription subscription; |
179 private volatile Flow.Subscription subscription; |
176 private volatile FileChannel out; |
180 private volatile FileChannel out; |
177 |
181 |
178 private static final String pathForSecurityCheck(Path path) { |
182 private static final String pathForSecurityCheck(Path path) { |
179 return path.toFile().getPath(); |
183 return path.toFile().getPath(); |
209 filePermissions == null ? EMPTY_FILE_PERMISSIONS : filePermissions; |
213 filePermissions == null ? EMPTY_FILE_PERMISSIONS : filePermissions; |
210 } |
214 } |
211 |
215 |
212 @Override |
216 @Override |
213 public void onSubscribe(Flow.Subscription subscription) { |
217 public void onSubscribe(Flow.Subscription subscription) { |
|
218 Objects.requireNonNull(subscription); |
|
219 if (!subscribed.compareAndSet(false, true)) { |
|
220 subscription.cancel(); |
|
221 return; |
|
222 } |
|
223 |
214 this.subscription = subscription; |
224 this.subscription = subscription; |
215 if (System.getSecurityManager() == null) { |
225 if (System.getSecurityManager() == null) { |
216 try { |
226 try { |
217 out = FileChannel.open(file, options); |
227 out = FileChannel.open(file, options); |
218 } catch (IOException ioe) { |
228 } catch (IOException ioe) { |
426 return currentBuffer; |
436 return currentBuffer; |
427 } |
437 } |
428 |
438 |
429 @Override |
439 @Override |
430 public int read(byte[] bytes, int off, int len) throws IOException { |
440 public int read(byte[] bytes, int off, int len) throws IOException { |
|
441 Objects.checkFromIndexSize(off, len, bytes.length); |
|
442 if (len == 0) { |
|
443 return 0; |
|
444 } |
431 // get the buffer to read from, possibly blocking if |
445 // get the buffer to read from, possibly blocking if |
432 // none is available |
446 // none is available |
433 ByteBuffer buffer; |
447 ByteBuffer buffer; |
434 if ((buffer = current()) == LAST_BUFFER) return -1; |
448 if ((buffer = current()) == LAST_BUFFER) return -1; |
435 |
449 |
598 this.result = result; |
613 this.result = result; |
599 } |
614 } |
600 |
615 |
601 @Override |
616 @Override |
602 public void onSubscribe(Flow.Subscription subscription) { |
617 public void onSubscribe(Flow.Subscription subscription) { |
|
618 Objects.requireNonNull(subscription); |
603 if (!subscribed.compareAndSet(false, true)) { |
619 if (!subscribed.compareAndSet(false, true)) { |
604 subscription.cancel(); |
620 subscription.cancel(); |
605 } else { |
621 } else { |
606 subscription.request(Long.MAX_VALUE); |
622 subscription.request(Long.MAX_VALUE); |
607 } |
623 } |
905 subscriber.onError(new IllegalStateException( |
922 subscriber.onError(new IllegalStateException( |
906 "This publisher has already one subscriber")); |
923 "This publisher has already one subscriber")); |
907 } |
924 } |
908 } |
925 } |
909 |
926 |
|
927 private final AtomicBoolean subscribed = new AtomicBoolean(); |
|
928 |
910 @Override |
929 @Override |
911 public void onSubscribe(Flow.Subscription subscription) { |
930 public void onSubscribe(Flow.Subscription subscription) { |
912 subscriptionCF.complete(subscription); |
931 Objects.requireNonNull(subscription); |
|
932 if (!subscribed.compareAndSet(false, true)) { |
|
933 subscription.cancel(); |
|
934 } else { |
|
935 subscriptionCF.complete(subscription); |
|
936 } |
913 } |
937 } |
914 |
938 |
915 @Override |
939 @Override |
916 public void onNext(List<ByteBuffer> item) { |
940 public void onNext(List<ByteBuffer> item) { |
|
941 Objects.requireNonNull(item); |
917 try { |
942 try { |
918 // cannot be called before onSubscribe() |
943 // cannot be called before onSubscribe() |
919 assert subscriptionCF.isDone(); |
944 assert subscriptionCF.isDone(); |
920 SubscriberRef ref = subscriberRef.get(); |
945 SubscriberRef ref = subscriberRef.get(); |
921 // cannot be called before subscriber calls request(1) |
946 // cannot be called before subscriber calls request(1) |
939 "onError called before onSubscribe", |
964 "onError called before onSubscribe", |
940 throwable); |
965 throwable); |
941 // onError can be called before request(1), and therefore can |
966 // onError can be called before request(1), and therefore can |
942 // be called before subscriberRef is set. |
967 // be called before subscriberRef is set. |
943 signalError(throwable); |
968 signalError(throwable); |
|
969 Objects.requireNonNull(throwable); |
944 } |
970 } |
945 |
971 |
946 @Override |
972 @Override |
947 public void onComplete() { |
973 public void onComplete() { |
948 // cannot be called before onSubscribe() |
974 // cannot be called before onSubscribe() |