diff -r aedd6133e7a0 -r fd85b2bf2b0d src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java Wed Feb 07 21:45:37 2018 +0000 @@ -0,0 +1,377 @@ +/* + * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.internal.net.http; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.file.Path; +import java.security.AccessControlContext; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Publisher; +import java.util.function.Supplier; +import java.net.http.HttpRequest.BodyPublisher; +import jdk.internal.net.http.common.Utils; + +public final class RequestPublishers { + + private RequestPublishers() { } + + public static class ByteArrayPublisher implements BodyPublisher { + private volatile Flow.Publisher delegate; + private final int length; + private final byte[] content; + private final int offset; + private final int bufSize; + + public ByteArrayPublisher(byte[] content) { + this(content, 0, content.length); + } + + public ByteArrayPublisher(byte[] content, int offset, int length) { + this(content, offset, length, Utils.BUFSIZE); + } + + /* bufSize exposed for testing purposes */ + ByteArrayPublisher(byte[] content, int offset, int length, int bufSize) { + this.content = content; + this.offset = offset; + this.length = length; + this.bufSize = bufSize; + } + + List copy(byte[] content, int offset, int length) { + List bufs = new ArrayList<>(); + while (length > 0) { + ByteBuffer b = ByteBuffer.allocate(Math.min(bufSize, length)); + int max = b.capacity(); + int tocopy = Math.min(max, length); + b.put(content, offset, tocopy); + offset += tocopy; + length -= tocopy; + b.flip(); + bufs.add(b); + } + return bufs; + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + List copy = copy(content, offset, length); + this.delegate = new PullPublisher<>(copy); + delegate.subscribe(subscriber); + } + + @Override + public long contentLength() { + return length; + } + } + + // This implementation has lots of room for improvement. + public static class IterablePublisher implements BodyPublisher { + private volatile Flow.Publisher delegate; + private final Iterable content; + private volatile long contentLength; + + public IterablePublisher(Iterable content) { + this.content = Objects.requireNonNull(content); + } + + // The ByteBufferIterator will iterate over the byte[] arrays in + // the content one at the time. + // + class ByteBufferIterator implements Iterator { + final ConcurrentLinkedQueue buffers = new ConcurrentLinkedQueue<>(); + final Iterator iterator = content.iterator(); + @Override + public boolean hasNext() { + return !buffers.isEmpty() || iterator.hasNext(); + } + + @Override + public ByteBuffer next() { + ByteBuffer buffer = buffers.poll(); + while (buffer == null) { + copy(); + buffer = buffers.poll(); + } + return buffer; + } + + ByteBuffer getBuffer() { + return Utils.getBuffer(); + } + + void copy() { + byte[] bytes = iterator.next(); + int length = bytes.length; + if (length == 0 && iterator.hasNext()) { + // avoid inserting empty buffers, except + // if that's the last. + return; + } + int offset = 0; + do { + ByteBuffer b = getBuffer(); + int max = b.capacity(); + + int tocopy = Math.min(max, length); + b.put(bytes, offset, tocopy); + offset += tocopy; + length -= tocopy; + b.flip(); + buffers.add(b); + } while (length > 0); + } + } + + public Iterator iterator() { + return new ByteBufferIterator(); + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + Iterable iterable = this::iterator; + this.delegate = new PullPublisher<>(iterable); + delegate.subscribe(subscriber); + } + + static long computeLength(Iterable bytes) { + long len = 0; + for (byte[] b : bytes) { + len = Math.addExact(len, (long)b.length); + } + return len; + } + + @Override + public long contentLength() { + if (contentLength == 0) { + synchronized(this) { + if (contentLength == 0) { + contentLength = computeLength(content); + } + } + } + return contentLength; + } + } + + public static class StringPublisher extends ByteArrayPublisher { + public StringPublisher(String content, Charset charset) { + super(content.getBytes(charset)); + } + } + + public static class EmptyPublisher implements BodyPublisher { + private final Flow.Publisher delegate = + new PullPublisher(Collections.emptyList(), null); + + @Override + public long contentLength() { + return 0; + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + delegate.subscribe(subscriber); + } + } + + public static class FilePublisher implements BodyPublisher { + private final File file; + private volatile AccessControlContext acc; + + public FilePublisher(Path name) { + file = name.toFile(); + } + + void setAccessControlContext(AccessControlContext acc) { + this.acc = acc; + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + if (System.getSecurityManager() != null && acc == null) + throw new InternalError( + "Unexpected null acc when security manager has been installed"); + + InputStream is; + try { + PrivilegedExceptionAction pa = + () -> new FileInputStream(file); + is = AccessController.doPrivileged(pa, acc); + } catch (PrivilegedActionException pae) { + throw new UncheckedIOException((IOException)pae.getCause()); + } + PullPublisher publisher = + new PullPublisher<>(() -> new StreamIterator(is)); + publisher.subscribe(subscriber); + } + + @Override + public long contentLength() { + assert System.getSecurityManager() != null ? acc != null: true; + PrivilegedAction pa = () -> file.length(); + return AccessController.doPrivileged(pa, acc); + } + } + + /** + * Reads one buffer ahead all the time, blocking in hasNext() + */ + public static class StreamIterator implements Iterator { + final InputStream is; + final Supplier bufSupplier; + volatile ByteBuffer nextBuffer; + volatile boolean need2Read = true; + volatile boolean haveNext; + + StreamIterator(InputStream is) { + this(is, Utils::getBuffer); + } + + StreamIterator(InputStream is, Supplier bufSupplier) { + this.is = is; + this.bufSupplier = bufSupplier; + } + +// Throwable error() { +// return error; +// } + + private int read() { + nextBuffer = bufSupplier.get(); + nextBuffer.clear(); + byte[] buf = nextBuffer.array(); + int offset = nextBuffer.arrayOffset(); + int cap = nextBuffer.capacity(); + try { + int n = is.read(buf, offset, cap); + if (n == -1) { + is.close(); + return -1; + } + //flip + nextBuffer.limit(n); + nextBuffer.position(0); + return n; + } catch (IOException ex) { + return -1; + } + } + + @Override + public synchronized boolean hasNext() { + if (need2Read) { + haveNext = read() != -1; + if (haveNext) { + need2Read = false; + } + return haveNext; + } + return haveNext; + } + + @Override + public synchronized ByteBuffer next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + need2Read = true; + return nextBuffer; + } + + } + + public static class InputStreamPublisher implements BodyPublisher { + private final Supplier streamSupplier; + + public InputStreamPublisher(Supplier streamSupplier) { + this.streamSupplier = Objects.requireNonNull(streamSupplier); + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + PullPublisher publisher; + InputStream is = streamSupplier.get(); + if (is == null) { + Throwable t = new IOException("streamSupplier returned null"); + publisher = new PullPublisher<>(null, t); + } else { + publisher = new PullPublisher<>(iterableOf(is), null); + } + publisher.subscribe(subscriber); + } + + protected Iterable iterableOf(InputStream is) { + return () -> new StreamIterator(is); + } + + @Override + public long contentLength() { + return -1; + } + } + + public static final class PublisherAdapter implements BodyPublisher { + + private final Publisher publisher; + private final long contentLength; + + public PublisherAdapter(Publisher publisher, + long contentLength) { + this.publisher = Objects.requireNonNull(publisher); + this.contentLength = contentLength; + } + + @Override + public final long contentLength() { + return contentLength; + } + + @Override + public final void subscribe(Flow.Subscriber subscriber) { + publisher.subscribe(subscriber); + } + } +}