1 /* |
1 /* |
2 * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. |
2 * Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
4 * |
4 * |
5 * This code is free software; you can redistribute it and/or modify it |
5 * This code is free software; you can redistribute it and/or modify it |
6 * under the terms of the GNU General Public License version 2 only, as |
6 * under the terms of the GNU General Public License version 2 only, as |
7 * published by the Free Software Foundation. Oracle designates this |
7 * published by the Free Software Foundation. Oracle designates this |
57 public final class RequestPublishers { |
57 public final class RequestPublishers { |
58 |
58 |
59 private RequestPublishers() { } |
59 private RequestPublishers() { } |
60 |
60 |
61 public static class ByteArrayPublisher implements BodyPublisher { |
61 public static class ByteArrayPublisher implements BodyPublisher { |
62 private volatile Flow.Publisher<ByteBuffer> delegate; |
|
63 private final int length; |
62 private final int length; |
64 private final byte[] content; |
63 private final byte[] content; |
65 private final int offset; |
64 private final int offset; |
66 private final int bufSize; |
65 private final int bufSize; |
67 |
66 |
97 } |
96 } |
98 |
97 |
99 @Override |
98 @Override |
100 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { |
99 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { |
101 List<ByteBuffer> copy = copy(content, offset, length); |
100 List<ByteBuffer> copy = copy(content, offset, length); |
102 this.delegate = new PullPublisher<>(copy); |
101 var delegate = new PullPublisher<>(copy); |
103 delegate.subscribe(subscriber); |
102 delegate.subscribe(subscriber); |
104 } |
103 } |
105 |
104 |
106 @Override |
105 @Override |
107 public long contentLength() { |
106 public long contentLength() { |
109 } |
108 } |
110 } |
109 } |
111 |
110 |
112 // This implementation has lots of room for improvement. |
111 // This implementation has lots of room for improvement. |
113 public static class IterablePublisher implements BodyPublisher { |
112 public static class IterablePublisher implements BodyPublisher { |
114 private volatile Flow.Publisher<ByteBuffer> delegate; |
|
115 private final Iterable<byte[]> content; |
113 private final Iterable<byte[]> content; |
116 private volatile long contentLength; |
114 private volatile long contentLength; |
117 |
115 |
118 public IterablePublisher(Iterable<byte[]> content) { |
116 public IterablePublisher(Iterable<byte[]> content) { |
119 this.content = Objects.requireNonNull(content); |
117 this.content = Objects.requireNonNull(content); |
172 } |
170 } |
173 |
171 |
174 @Override |
172 @Override |
175 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { |
173 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { |
176 Iterable<ByteBuffer> iterable = this::iterator; |
174 Iterable<ByteBuffer> iterable = this::iterator; |
177 this.delegate = new PullPublisher<>(iterable); |
175 var delegate = new PullPublisher<>(iterable); |
178 delegate.subscribe(subscriber); |
176 delegate.subscribe(subscriber); |
179 } |
177 } |
180 |
178 |
181 static long computeLength(Iterable<byte[]> bytes) { |
179 static long computeLength(Iterable<byte[]> bytes) { |
182 long len = 0; |
180 long len = 0; |
269 : new FilePermission[] { filePermission }; |
267 : new FilePermission[] { filePermission }; |
270 } |
268 } |
271 |
269 |
272 @Override |
270 @Override |
273 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { |
271 public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { |
274 InputStream is; |
272 InputStream is = null; |
|
273 Throwable t = null; |
275 if (System.getSecurityManager() == null) { |
274 if (System.getSecurityManager() == null) { |
276 try { |
275 try { |
277 is = new FileInputStream(file); |
276 is = new FileInputStream(file); |
278 } catch (IOException ioe) { |
277 } catch (IOException ioe) { |
279 throw new UncheckedIOException(ioe); |
278 t = ioe; |
280 } |
279 } |
281 } else { |
280 } else { |
282 try { |
281 try { |
283 PrivilegedExceptionAction<FileInputStream> pa = |
282 PrivilegedExceptionAction<FileInputStream> pa = |
284 () -> new FileInputStream(file); |
283 () -> new FileInputStream(file); |
285 is = AccessController.doPrivileged(pa, null, filePermissions); |
284 is = AccessController.doPrivileged(pa, null, filePermissions); |
286 } catch (PrivilegedActionException pae) { |
285 } catch (PrivilegedActionException pae) { |
287 throw new UncheckedIOException((IOException) pae.getCause()); |
286 t = pae.getCause(); |
288 } |
287 } |
289 } |
288 } |
290 PullPublisher<ByteBuffer> publisher = |
289 final InputStream fis = is; |
291 new PullPublisher<>(() -> new StreamIterator(is)); |
290 PullPublisher<ByteBuffer> publisher; |
|
291 if (t == null) { |
|
292 publisher = new PullPublisher<>(() -> new StreamIterator(fis)); |
|
293 } else { |
|
294 publisher = new PullPublisher<>(null, t); |
|
295 } |
292 publisher.subscribe(subscriber); |
296 publisher.subscribe(subscriber); |
293 } |
297 } |
294 |
298 |
295 @Override |
299 @Override |
296 public long contentLength() { |
300 public long contentLength() { |