--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/AsyncConnection.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+package java.net.http;
+
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implemented by classes that offer an asynchronous interface.
+ *
+ * PlainHttpConnection, AsyncSSLConnection AsyncSSLDelegate.
+ *
+ * setAsyncCallbacks() is called to set the callback for reading
+ * and error notification. Reads all happen on the selector thread, which
+ * must not block.
+ *
+ * Writing uses the same write() methods as used in blocking mode.
+ * Queues are employed on the writing side to buffer data while it is waiting
+ * to be sent. This strategy relies on HTTP/2 protocol flow control to stop
+ * outgoing queue from continually growing. Writes can be initiated by the
+ * calling thread, but if socket becomes full then the queue is emptied by
+ * the selector thread
+ *
+ */
+interface AsyncConnection {
+
+ /**
+ * Enables asynchronous sending and receiving mode. The given async
+ * receiver will receive all incoming data. asyncInput() will be called
+ * to trigger reads. asyncOutput() will be called to drive writes.
+ *
+ * The errorReceiver callback must be called when any fatal exception
+ * occurs. Connection is assumed to be closed afterwards.
+ *
+ * @param asyncReceiver
+ * @param errorReceiver
+ */
+ void setAsyncCallbacks(
+ Consumer<ByteBuffer> asyncReceiver,
+ Consumer<Throwable> errorReceiver);
+
+ /**
+ * Does whatever is required to start reading. Usually registers
+ * an event with the selector thread.
+ */
+ void startReading();
+}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/AsyncEvent.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/AsyncEvent.java Sat Apr 30 00:30:31 2016 +0100
@@ -25,24 +25,27 @@
package java.net.http;
import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
/**
* Event handling interface from HttpClientImpl's selector.
*
- * <p> If blockingChannel is true, then the channel will be put in blocking
+ * If BLOCKING is set, then the channel will be put in blocking
* mode prior to handle() being called. If false, then it remains non-blocking.
+ *
+ * If REPEATING is set then the event is not cancelled after being posted.
*/
abstract class AsyncEvent {
- /**
- * Implement this if channel should be made blocking before calling handle()
- */
- public interface Blocking { }
+ public static final int BLOCKING = 0x1; // non blocking if not set
+ public static final int REPEATING = 0x2; // one off event if not set
- /**
- * Implement this if channel should remain non-blocking before calling handle()
- */
- public interface NonBlocking { }
+ protected final int flags;
+
+ AsyncEvent(int flags) {
+ this.flags = flags;
+ }
/** Returns the channel */
public abstract SelectableChannel channel();
@@ -55,4 +58,12 @@
/** Called when selector is shutting down. Abort all exchanges. */
public abstract void abort();
+
+ public boolean blocking() {
+ return (flags & BLOCKING) != 0;
+ }
+
+ public boolean repeating() {
+ return (flags & REPEATING) != 0;
+ }
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/AsyncSSLConnection.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,129 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+package java.net.http;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Asynchronous version of SSLConnection.
+ */
+class AsyncSSLConnection extends HttpConnection implements AsyncConnection {
+ final AsyncSSLDelegate sslDelegate;
+ final PlainHttpConnection delegate;
+
+ AsyncSSLConnection(InetSocketAddress addr, HttpClientImpl client, String[] ap) {
+ super(addr, client);
+ delegate = new PlainHttpConnection(addr, client);
+ sslDelegate = new AsyncSSLDelegate(delegate, client, ap);
+ }
+
+ @Override
+ public void connect() throws IOException, InterruptedException {
+ delegate.connect();
+ }
+
+ @Override
+ public CompletableFuture<Void> connectAsync() {
+ return delegate.connectAsync();
+ }
+
+ @Override
+ boolean connected() {
+ return delegate.connected();
+ }
+
+ @Override
+ boolean isSecure() {
+ return true;
+ }
+
+ @Override
+ boolean isProxied() {
+ return false;
+ }
+
+ @Override
+ SocketChannel channel() {
+ return delegate.channel();
+ }
+
+ @Override
+ ConnectionPool.CacheKey cacheKey() {
+ return ConnectionPool.cacheKey(address, null);
+ }
+
+ @Override
+ synchronized long write(ByteBuffer[] buffers, int start, int number) throws IOException {
+ ByteBuffer[] bufs = Utils.reduce(buffers, start, number);
+ long n = Utils.remaining(bufs);
+ sslDelegate.write(bufs);
+ return n;
+ }
+
+ @Override
+ long write(ByteBuffer buffer) throws IOException {
+ long n = buffer.remaining();
+ sslDelegate.write(buffer);
+ return n;
+ }
+
+ @Override
+ public void close() {
+ Utils.close(sslDelegate, delegate.channel());
+ }
+
+ @Override
+ public void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver, Consumer<Throwable> errorReceiver) {
+ sslDelegate.setAsyncCallbacks(asyncReceiver, errorReceiver);
+ delegate.setAsyncCallbacks(sslDelegate::lowerRead, errorReceiver);
+ }
+
+ // Blocking read functions not used here
+
+ @Override
+ protected ByteBuffer readImpl(int length) throws IOException {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ protected int readImpl(ByteBuffer buffer) throws IOException {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ CompletableFuture<Void> whenReceivingResponse() {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ public void startReading() {
+ delegate.startReading();
+ sslDelegate.startReading();
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/AsyncSSLDelegate.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,598 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+package java.net.http;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import static javax.net.ssl.SSLEngineResult.Status.*;
+import javax.net.ssl.*;
+import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
+
+/**
+ * Asynchronous wrapper around SSLEngine. send and receive is fully non
+ * blocking. When handshaking is required, a thread is created to perform
+ * the handshake and application level sends do not take place during this time.
+ *
+ * Is implemented using queues and functions operating on the receiving end
+ * of each queue.
+ *
+ * Application writes to:
+ * ||
+ * \/
+ * appOutputQ
+ * ||
+ * \/
+ * appOutputQ read by "upperWrite" method which does SSLEngine.wrap
+ * and writes to
+ * ||
+ * \/
+ * channelOutputQ
+ * ||
+ * \/
+ * channelOutputQ is read by "lowerWrite" method which is invoked from
+ * OP_WRITE events on the socket (from selector thread)
+ *
+ * Reading side is as follows
+ * --------------------------
+ *
+ * "upperRead" method reads off channelInputQ and calls SSLEngine.unwrap and
+ * when decrypted data is returned, it is passed to the user's Consumer<ByteBuffer>
+ * /\
+ * ||
+ * channelInputQ
+ * /\
+ * ||
+ * "lowerRead" method puts buffers into channelInputQ. It is invoked from
+ * OP_READ events from the selector.
+ *
+ * Whenever handshaking is required, the doHandshaking() method is called
+ * which creates a thread to complete the handshake. It takes over the
+ * channelInputQ from upperRead, and puts outgoing packets on channelOutputQ.
+ * Selector events are delivered to lowerRead and lowerWrite as normal.
+ *
+ * Errors
+ *
+ * Any exception thrown by the engine or channel, causes all Queues to be closed
+ * the channel to be closed, and the error is reported to the user's
+ * Consumer<Throwable>
+ */
+public class AsyncSSLDelegate implements Closeable, AsyncConnection {
+
+ // outgoing buffers put in this queue first and may remain here
+ // while SSL handshaking happening.
+ final Queue<ByteBuffer> appOutputQ;
+
+ // queue of wrapped ByteBuffers waiting to be sent on socket channel
+ //final Queue<ByteBuffer> channelOutputQ;
+
+ // Bytes read into this queue before being unwrapped. Backup on this
+ // Q should only happen when the engine is stalled due to delegated tasks
+ final Queue<ByteBuffer> channelInputQ;
+
+ // input occurs through the read() method which is expected to be called
+ // when the selector signals some data is waiting to be read. All incoming
+ // handshake data is handled in this method, which means some calls to
+ // read() may return zero bytes of user data. This is not a sign of spinning,
+ // just that the handshake mechanics are being executed.
+
+ final SSLEngine engine;
+ final SSLParameters sslParameters;
+ //final SocketChannel chan;
+ final HttpConnection lowerOutput;
+ final HttpClientImpl client;
+ final ExecutorService executor;
+ final BufferHandler bufPool;
+ Consumer<ByteBuffer> receiver;
+ Consumer<Throwable> errorHandler;
+ // Locks.
+ final Object reader = new Object();
+ final Object writer = new Object();
+ // synchronizing handshake state
+ final Object handshaker = new Object();
+ // flag set when reader or writer is blocked waiting for handshake to finish
+ boolean writerBlocked;
+ boolean readerBlocked;
+
+ // some thread is currently doing the handshake
+ boolean handshaking;
+
+ // alpn[] may be null. upcall is callback which receives incoming decoded bytes off socket
+
+ AsyncSSLDelegate(HttpConnection lowerOutput, HttpClientImpl client, String[] alpn)
+ {
+ SSLContext context = client.sslContext();
+ executor = client.executorService();
+ bufPool = client;
+ appOutputQ = new Queue<>();
+ appOutputQ.registerPutCallback(this::upperWrite);
+ //channelOutputQ = new Queue<>();
+ //channelOutputQ.registerPutCallback(this::lowerWrite);
+ engine = context.createSSLEngine();
+ engine.setUseClientMode(true);
+ SSLParameters sslp = client.sslParameters().orElse(null);
+ if (sslp == null) {
+ sslp = context.getSupportedSSLParameters();
+ //sslp = context.getDefaultSSLParameters();
+ //printParams(sslp);
+ }
+ sslParameters = Utils.copySSLParameters(sslp);
+ if (alpn != null) {
+ sslParameters.setApplicationProtocols(alpn);
+ Log.logSSL("Setting application protocols: " + Arrays.toString(alpn));
+ } else {
+ Log.logSSL("No application protocols proposed");
+ }
+ engine.setSSLParameters(sslParameters);
+ engine.setEnabledCipherSuites(sslp.getCipherSuites());
+ engine.setEnabledProtocols(sslp.getProtocols());
+ this.lowerOutput = lowerOutput;
+ this.client = client;
+ this.channelInputQ = new Queue<>();
+ this.channelInputQ.registerPutCallback(this::upperRead);
+ }
+
+ /**
+ * Put buffers to appOutputQ, and call upperWrite() if q was empty.
+ *
+ * @param src
+ */
+ public void write(ByteBuffer[] src) throws IOException {
+ appOutputQ.putAll(src);
+ }
+
+ public void write(ByteBuffer buf) throws IOException {
+ ByteBuffer[] a = new ByteBuffer[1];
+ a[0] = buf;
+ write(a);
+ }
+
+ @Override
+ public void close() {
+ Utils.close(appOutputQ, channelInputQ, lowerOutput);
+ }
+
+ /**
+ * Attempts to wrap buffers from appOutputQ and place them on the
+ * channelOutputQ for writing. If handshaking is happening, then the
+ * process stalls and last buffers taken off the appOutputQ are put back
+ * into it until handshaking completes.
+ *
+ * This same method is called to try and resume output after a blocking
+ * handshaking operation has completed.
+ */
+ private void upperWrite() {
+ try {
+ EngineResult r = null;
+ ByteBuffer[] buffers = appOutputQ.pollAll(Utils.EMPTY_BB_ARRAY);
+ int bytes = Utils.remaining(buffers);
+ while (bytes > 0) {
+ synchronized (writer) {
+ r = wrapBuffers(buffers);
+ int bytesProduced = r.bytesProduced();
+ int bytesConsumed = r.bytesConsumed();
+ bytes -= bytesConsumed;
+ if (bytesProduced > 0) {
+ // pass destination buffer to channelOutputQ.
+ lowerOutput.write(r.destBuffer);
+ }
+ synchronized (handshaker) {
+ if (r.handshaking()) {
+ // handshaking is happening or is needed
+ // so we put the buffers back on Q to process again
+ // later. It's possible that some may have already
+ // been processed, which is ok.
+ appOutputQ.pushbackAll(buffers);
+ writerBlocked = true;
+ if (!handshaking()) {
+ // execute the handshake in another thread.
+ // This method will be called again to resume sending
+ // later
+ doHandshake(r);
+ }
+ return;
+ }
+ }
+ }
+ }
+ returnBuffers(buffers);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ close();
+ }
+ }
+
+ private void doHandshake(EngineResult r) {
+ handshaking = true;
+ channelInputQ.registerPutCallback(null);
+ executor.execute(() -> {
+ try {
+ doHandshakeImpl(r);
+ channelInputQ.registerPutCallback(this::upperRead);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ close();
+ }
+ });
+ }
+
+ private void returnBuffers(ByteBuffer[] bufs) {
+ for (ByteBuffer buf : bufs)
+ client.returnBuffer(buf);
+ }
+
+ /**
+ * Return true if some thread is currently doing the handshake
+ *
+ * @return
+ */
+ boolean handshaking() {
+ synchronized(handshaker) {
+ return handshaking;
+ }
+ }
+
+ /**
+ * Executes entire handshake in calling thread.
+ * Returns after handshake is completed or error occurs
+ * @param r
+ * @throws IOException
+ */
+ private void doHandshakeImpl(EngineResult r) throws IOException {
+ while (true) {
+ SSLEngineResult.HandshakeStatus status = r.handshakeStatus();
+ if (status == NEED_TASK) {
+ LinkedList<Runnable> tasks = obtainTasks();
+ for (Runnable task : tasks)
+ task.run();
+ r = handshakeWrapAndSend();
+ } else if (status == NEED_WRAP) {
+ r = handshakeWrapAndSend();
+ } else if (status == NEED_UNWRAP) {
+ r = handshakeReceiveAndUnWrap();
+ }
+ if (!r.handshaking())
+ break;
+ }
+ boolean dowrite = false;
+ boolean doread = false;
+ // Handshake is finished. Now resume reading and/or writing
+ synchronized(handshaker) {
+ handshaking = false;
+ if (writerBlocked) {
+ writerBlocked = false;
+ dowrite = true;
+ }
+ if (readerBlocked) {
+ readerBlocked = false;
+ doread = true;
+ }
+ }
+ if (dowrite)
+ upperWrite();
+ if (doread)
+ upperRead();
+ }
+
+ // acknowledge a received CLOSE request from peer
+ void doClosure() throws IOException {
+ //while (!wrapAndSend(emptyArray))
+ //;
+ }
+
+ LinkedList<Runnable> obtainTasks() {
+ LinkedList<Runnable> l = new LinkedList<>();
+ Runnable r;
+ while ((r = engine.getDelegatedTask()) != null)
+ l.add(r);
+ return l;
+ }
+
+ @Override
+ public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver, Consumer<Throwable> errorReceiver) {
+ this.receiver = asyncReceiver;
+ this.errorHandler = errorReceiver;
+ }
+
+ @Override
+ public void startReading() {
+ // maybe this class does not need to implement AsyncConnection
+ }
+
+ static class EngineResult {
+ ByteBuffer destBuffer;
+ ByteBuffer srcBuffer;
+ SSLEngineResult result;
+ Throwable t;
+
+ boolean handshaking() {
+ SSLEngineResult.HandshakeStatus s = result.getHandshakeStatus();
+ return s != FINISHED && s != NOT_HANDSHAKING;
+ }
+
+ int bytesConsumed() {
+ return result.bytesConsumed();
+ }
+
+ int bytesProduced() {
+ return result.bytesProduced();
+ }
+
+ Throwable exception() {
+ return t;
+ }
+
+ SSLEngineResult.HandshakeStatus handshakeStatus() {
+ return result.getHandshakeStatus();
+ }
+
+ SSLEngineResult.Status status() {
+ return result.getStatus();
+ }
+ }
+
+ EngineResult handshakeWrapAndSend() throws IOException {
+ EngineResult r = wrapBuffer(Utils.EMPTY_BYTEBUFFER);
+ if (r.bytesProduced() > 0) {
+ lowerOutput.write(r.destBuffer);
+ }
+ return r;
+ }
+
+ // called during handshaking. It blocks until a complete packet
+ // is available, unwraps it and returns.
+ EngineResult handshakeReceiveAndUnWrap() throws IOException {
+ ByteBuffer buf = channelInputQ.take();
+ while (true) {
+ // block waiting for input
+ EngineResult r = unwrapBuffer(buf);
+ SSLEngineResult.Status status = r.status();
+ if (status == BUFFER_UNDERFLOW) {
+ // wait for another buffer to arrive
+ ByteBuffer buf1 = channelInputQ.take();
+ buf = combine (buf, buf1);
+ continue;
+ }
+ // OK
+ // theoretically possible we could receive some user data
+ if (r.bytesProduced() > 0) {
+ receiver.accept(r.destBuffer);
+ }
+ if (!buf.hasRemaining())
+ return r;
+ }
+ }
+
+ EngineResult wrapBuffer(ByteBuffer src) throws SSLException {
+ ByteBuffer[] bufs = new ByteBuffer[1];
+ bufs[0] = src;
+ return wrapBuffers(bufs);
+ }
+
+ EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
+ EngineResult r = new EngineResult();
+ ByteBuffer dst = bufPool.getBuffer();
+ while (true) {
+ r.result = engine.wrap(src, dst);
+ switch (r.result.getStatus()) {
+ case BUFFER_OVERFLOW:
+ dst = getPacketBuffer();
+ break;
+ case CLOSED:
+ case OK:
+ dst.flip();
+ r.destBuffer = dst;
+ return r;
+ case BUFFER_UNDERFLOW:
+ // underflow handled externally
+ bufPool.returnBuffer(dst);
+ return r;
+ default:
+ assert false;
+ }
+ }
+ }
+
+ EngineResult unwrapBuffer(ByteBuffer srcbuf) throws IOException {
+ EngineResult r = new EngineResult();
+ r.srcBuffer = srcbuf;
+
+ ByteBuffer dst = bufPool.getBuffer();
+ while (true) {
+ r.result = engine.unwrap(srcbuf, dst);
+ switch (r.result.getStatus()) {
+ case BUFFER_OVERFLOW:
+ // dest buffer not big enough. Reallocate
+ int oldcap = dst.capacity();
+ dst = getApplicationBuffer();
+ assert dst.capacity() > oldcap;
+ break;
+ case CLOSED:
+ doClosure();
+ throw new IOException("Engine closed");
+ case BUFFER_UNDERFLOW:
+ bufPool.returnBuffer(dst);
+ return r;
+ case OK:
+ dst.flip();
+ r.destBuffer = dst;
+ return r;
+ }
+ }
+ }
+
+ /**
+ * Asynchronous read input. Call this when selector fires.
+ * Unwrap done in upperRead because it also happens in
+ * doHandshake() when handshake taking place
+ */
+ public void lowerRead(ByteBuffer buffer) {
+ try {
+ channelInputQ.put(buffer);
+ } catch (Throwable t) {
+ close();
+ errorHandler.accept(t);
+ }
+ }
+
+ public void upperRead() {
+ EngineResult r;
+ ByteBuffer srcbuf;
+ synchronized (reader) {
+ try {
+ srcbuf = channelInputQ.poll();
+ if (srcbuf == null) {
+ return;
+ }
+ while (true) {
+ r = unwrapBuffer(srcbuf);
+ switch (r.result.getStatus()) {
+ case BUFFER_UNDERFLOW:
+ // Buffer too small. Need to combine with next buf
+ ByteBuffer nextBuf = channelInputQ.poll();
+ if (nextBuf == null) {
+ // no data available. push buffer back until more data available
+ channelInputQ.pushback(srcbuf);
+ return;
+ } else {
+ srcbuf = combine(srcbuf, nextBuf);
+ }
+ break;
+ case OK:
+ // check for any handshaking work
+ synchronized (handshaker) {
+ if (r.handshaking()) {
+ // handshaking is happening or is needed
+ // so we put the buffer back on Q to process again
+ // later.
+ channelInputQ.pushback(srcbuf);
+ readerBlocked = true;
+ if (!handshaking()) {
+ // execute the handshake in another thread.
+ // This method will be called again to resume sending
+ // later
+ doHandshake(r);
+ }
+ return;
+ }
+ }
+ ByteBuffer dst = r.destBuffer;
+ if (dst.hasRemaining()) {
+ receiver.accept(dst);
+ }
+ }
+ if (srcbuf.hasRemaining()) {
+ continue;
+ }
+ srcbuf = channelInputQ.poll();
+ if (srcbuf == null) {
+ return;
+ }
+ }
+ } catch (Throwable t) {
+ Utils.close(lowerOutput);
+ errorHandler.accept(t);
+ }
+ }
+ }
+
+ /**
+ * Get a new buffer that is the right size for application buffers.
+ *
+ * @return
+ */
+ ByteBuffer getApplicationBuffer() {
+ SSLSession session = engine.getSession();
+ int appBufsize = session.getApplicationBufferSize();
+ bufPool.setMinBufferSize(appBufsize);
+ return bufPool.getBuffer(appBufsize);
+ }
+
+ ByteBuffer getPacketBuffer() {
+ SSLSession session = engine.getSession();
+ int packetBufSize = session.getPacketBufferSize();
+ bufPool.setMinBufferSize(packetBufSize);
+ return bufPool.getBuffer(packetBufSize);
+ }
+
+ ByteBuffer combine(ByteBuffer buf1, ByteBuffer buf2) {
+ int avail1 = buf1.capacity() - buf1.remaining();
+ if (buf2.remaining() < avail1) {
+ buf1.compact();
+ buf1.put(buf2);
+ buf1.flip();
+ return buf1;
+ }
+ int newsize = buf1.remaining() + buf2.remaining();
+ ByteBuffer newbuf = bufPool.getBuffer(newsize);
+ newbuf.put(buf1);
+ newbuf.put(buf2);
+ newbuf.flip();
+ return newbuf;
+ }
+
+ SSLParameters getSSLParameters() {
+ return sslParameters;
+ }
+
+ static void printParams(SSLParameters p) {
+ System.out.println("SSLParameters:");
+ if (p == null) {
+ System.out.println("Null params");
+ return;
+ }
+ for (String cipher : p.getCipherSuites()) {
+ System.out.printf("cipher: %s\n", cipher);
+ }
+ for (String approto : p.getApplicationProtocols()) {
+ System.out.printf("application protocol: %s\n", approto);
+ }
+ for (String protocol : p.getProtocols()) {
+ System.out.printf("protocol: %s\n", protocol);
+ }
+ if (p.getServerNames() != null)
+ for (SNIServerName sname : p.getServerNames()) {
+ System.out.printf("server name: %s\n", sname.toString());
+ }
+ }
+
+ String getSessionInfo() {
+ StringBuilder sb = new StringBuilder();
+ String application = engine.getApplicationProtocol();
+ SSLSession sess = engine.getSession();
+ String cipher = sess.getCipherSuite();
+ String protocol = sess.getProtocol();
+ sb.append("Handshake complete alpn: ")
+ .append(application)
+ .append(", Cipher: ")
+ .append(cipher)
+ .append(", Protocol: ")
+ .append(protocol);
+ return sb.toString();
+ }
+}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/AuthenticationFilter.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/AuthenticationFilter.java Sat Apr 30 00:30:31 2016 +0100
@@ -45,7 +45,7 @@
static final int DEFAULT_RETRY_LIMIT = 3;
static final int retry_limit = Utils.getIntegerNetProperty(
- "sun.net.httpclient.auth.retrylimit", DEFAULT_RETRY_LIMIT);
+ "java.net.httpclient.auth.retrylimit", DEFAULT_RETRY_LIMIT);
static final int UNAUTHORIZED = 401;
static final int PROXY_UNAUTHORIZED = 407;
--- a/jdk/src/java.httpclient/share/classes/java/net/http/BufferHandler.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/BufferHandler.java Sat Apr 30 00:30:31 2016 +0100
@@ -27,11 +27,23 @@
import java.nio.ByteBuffer;
/**
- * Implemented by buffer pools.
+ * Implemented by buffer pools. A buffer pool has a current buffer size
+ * (number of bytes in each buffer) which may increase over time.
*/
interface BufferHandler {
- ByteBuffer getBuffer();
+ default ByteBuffer getBuffer() {
+ return getBuffer(-1);
+ }
+
+ void setMinBufferSize(int size);
+
+ /**
+ * size == -1 means return any sized buffer. Any other value means
+ * @param size
+ * @return
+ */
+ ByteBuffer getBuffer(int size);
void returnBuffer(ByteBuffer buffer);
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/ByteBufferConsumer.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,188 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.function.Supplier;
+
+/**
+ * Takes a List<ByteBuffer> which is assumed to contain at least one HTTP/2
+ * frame and allows it to be processed supplying bytes, ints, shorts, byte[] etc.
+ * from the list. As each ByteBuffer is consumed it is removed from the List<>.
+ *
+ * NOTE. shorts and bytes returned are UNSIGNED ints
+ *
+ * When finished processing the frame, the List may be empty or may contain
+ * partially read or unread ByteBuffers. A new ByteBufferConsumer can be
+ * created with the List<>
+ */
+class ByteBufferConsumer {
+
+ ByteBuffer currentBuffer;
+
+ final List<ByteBuffer> buffers;
+ final ListIterator<ByteBuffer> iterator;
+ final Supplier<ByteBuffer> newBufferSupplier;
+
+ ByteBufferConsumer(List<ByteBuffer> buffers,
+ Supplier<ByteBuffer> newBufferSupplier) {
+ this.buffers = buffers;
+ this.newBufferSupplier = newBufferSupplier;
+ this.iterator = buffers.listIterator();
+ if (!iterator.hasNext()) {
+ throw new IllegalArgumentException("Empty buffer list");
+ }
+ currentBuffer = iterator.next();
+ }
+
+ private void dump() {
+ int l = 0;
+ System.err.printf("ByteBufferConsumer:\n");
+ for (ByteBuffer buf : buffers) {
+ System.err.printf("\t%s\n", buf.toString());
+ l+= buf.remaining();
+ }
+ System.err.printf("BBC contains %d bytes\n", l);
+ }
+
+ private synchronized ByteBuffer getBuffer(boolean exception) throws IOException {
+ while (currentBuffer == null || !currentBuffer.hasRemaining()) {
+ if (currentBuffer != null) {
+ iterator.remove();
+ }
+ if (!iterator.hasNext()) {
+ currentBuffer = null;
+ if (exception) {
+ throw new IOException ("Connection closed unexpectedly");
+ }
+ return null;
+ }
+ currentBuffer = iterator.next();
+ }
+ return currentBuffer;
+ }
+
+ // call this to check if the data has all been consumed
+
+ public boolean consumed() {
+ try {
+ return getBuffer(false) == null;
+ } catch (IOException e) {
+ /* CAN'T HAPPEN */
+ throw new InternalError();
+ }
+ }
+
+ public int getByte() throws IOException {
+ // TODO: what to do if connection is closed. Throw NPE?
+ ByteBuffer buf = getBuffer(true);
+ return buf.get() & 0xff;
+ }
+
+ public byte[] getBytes(int n) throws IOException {
+ return getBytes(n, null);
+ }
+
+ public byte[] getBytes(int n, byte[] buf) throws IOException {
+ if (buf == null) {
+ buf = new byte[n];
+ } else if (buf.length < n) {
+ throw new IllegalArgumentException("getBytes: buffer too small");
+ }
+ int offset = 0;
+ while (n > 0) {
+ ByteBuffer b = getBuffer(true);
+ int length = Math.min(n, b.remaining());
+ b.get(buf, offset, length);
+ offset += length;
+ n -= length;
+ }
+ return buf;
+ }
+
+ public int getShort() throws IOException {
+ ByteBuffer buf = getBuffer(true);
+ int rem = buf.remaining();
+ if (rem >= 2) {
+ return buf.getShort() & 0xffff;
+ }
+ // Slow path. Not common
+ int val = 0;
+ val = (val << 8) + getByte();
+ val = (val << 8) + getByte();
+ return val;
+ }
+
+ public int getInt() throws IOException {
+ ByteBuffer buf = getBuffer(true);
+ int rem = buf.remaining();
+ if (rem >= 4) {
+ return buf.getInt();
+ }
+ // Slow path. Not common
+ int val = 0;
+ for (int nbytes = 0; nbytes < 4; nbytes++) {
+ val = (val << 8) + getByte();
+ }
+ return val;
+ }
+
+ private static final ByteBuffer[] EMPTY = new ByteBuffer[0];
+
+ /**
+ * Extracts whatever number of ByteBuffers from list to get required number
+ * of bytes. Any remaining buffers are 'tidied up' so reading can continue.
+ */
+ public ByteBuffer[] getBuffers(int bytecount) throws IOException {
+ LinkedList<ByteBuffer> l = new LinkedList<>();
+ while (bytecount > 0) {
+ ByteBuffer buffer = getBuffer(true);
+ int remaining = buffer.remaining();
+ if (remaining > bytecount) {
+ int difference = remaining - bytecount;
+ // split
+ ByteBuffer newb = newBufferSupplier.get();
+ newb.clear();
+ int limit = buffer.limit();
+ buffer.limit(limit - difference);
+ newb.put(buffer);
+ newb.flip();
+ buffer.limit(limit);
+ l.add(newb);
+ bytecount = 0;
+ } else {
+ l.add(buffer);
+ currentBuffer = null;
+ iterator.remove();
+ bytecount -= remaining;
+ }
+ }
+ return l.toArray(EMPTY);
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/ByteBufferGenerator.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,136 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+/**
+ * Manages a ByteBuffer[] for writing frames into for output. The last
+ * ByteBuffer in the list is always unflipped (able to receive more bytes for
+ * sending) until getBufferArray() is called, which calls finish().
+ *
+ * This allows multiple frames to be written to the same BBG.
+ *
+ * Buffers added with addByteBuffer() must be already flipped.
+ */
+class ByteBufferGenerator {
+
+ ByteBuffer currentBuffer;
+ // source is assumed to always return the same sized buffer
+ final BufferHandler pool;
+ final ArrayList<ByteBuffer> buflist;
+ final int bufsize;
+ boolean finished;
+
+ ByteBufferGenerator(BufferHandler pool) {
+ this.buflist = new ArrayList<>();
+ this.pool = pool;
+ this.currentBuffer = pool.getBuffer();
+ this.bufsize = currentBuffer.capacity();
+ }
+
+ private static final ByteBuffer[] EMPTY = new ByteBuffer[0];
+
+ public ByteBuffer[] getBufferArray() {
+ finish();
+ return buflist.toArray(EMPTY);
+ }
+
+ public ArrayList<ByteBuffer> getBufferList() {
+ finish();
+ return buflist;
+ }
+
+ private synchronized void finish() {
+ if (finished) {
+ return;
+ }
+ finished = true;
+ currentBuffer.flip();
+ if (currentBuffer.hasRemaining()) {
+ buflist.add(currentBuffer);
+ } else {
+ pool.returnBuffer(currentBuffer);
+ }
+ }
+
+ // only used for SettingsFrame: offset is number of bytes to
+ // ignore at start (we only want the payload of the settings frame)
+ public byte[] asByteArray(int offset) {
+ ByteBuffer[] bufs = getBufferArray();
+ int size = 0;
+ for (ByteBuffer buf : bufs) {
+ size += buf.remaining();
+ }
+ byte[] bytes = new byte[size-offset];
+ int pos = 0;
+ for (ByteBuffer buf : bufs) {
+ int rem = buf.remaining();
+ int ignore = Math.min(rem, offset);
+ buf.position(buf.position()+ignore);
+ rem -= ignore;
+ offset -= ignore;
+ buf.get(bytes, pos, rem);
+ pos += rem;
+ }
+ return bytes;
+ }
+
+ ByteBuffer getBuffer(long n) {
+ if (currentBuffer.remaining() < n) {
+ getNewBuffer();
+ if (n > currentBuffer.capacity()) {
+ throw new IllegalArgumentException("requested buffer too large");
+ }
+ }
+ return currentBuffer;
+ }
+
+ void getNewBuffer() {
+ currentBuffer.flip();
+ if (currentBuffer.hasRemaining()) {
+ buflist.add(currentBuffer);
+ } else {
+ pool.returnBuffer(currentBuffer);
+ }
+ currentBuffer = pool.getBuffer();
+ }
+
+ void addByteBuffer(ByteBuffer buf) {
+ getNewBuffer();
+ buflist.add(buf);
+ }
+
+ void addPadding(int length) {
+ while (length > 0) {
+ int n = Math.min(length, bufsize);
+ ByteBuffer b = getBuffer(n);
+ // TODO: currently zeroed?
+ b.position(b.position() + n);
+ length -= n;
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/CharsetToolkit.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,159 @@
+/*
+ * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+package java.net.http;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CoderResult;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+// The purpose of this class is to separate charset-related tasks from the main
+// WebSocket logic, simplifying where possible.
+//
+// * Coders hide the differences between coding and flushing stages on the
+// API level
+// * Verifier abstracts the way the verification is performed
+// (spoiler: it's a decoding into a throw-away buffer)
+//
+// Coding methods throw exceptions instead of returning coding result denoting
+// errors, since any kind of handling and recovery is not expected.
+final class CharsetToolkit {
+
+ private CharsetToolkit() { }
+
+ static final class Verifier {
+
+ private final CharsetDecoder decoder = UTF_8.newDecoder();
+ // A buffer used to check validity of UTF-8 byte stream by decoding it.
+ // The contents of this buffer are never used.
+ // The size is arbitrary, though it should probably be chosen from the
+ // performance perspective since it affects the total number of calls to
+ // decoder.decode() and amount of work in each of these calls
+ private final CharBuffer blackHole = CharBuffer.allocate(1024);
+
+ void verify(ByteBuffer in, boolean endOfInput)
+ throws CharacterCodingException {
+ while (true) {
+ // Since decoder.flush() cannot produce an error, it's not
+ // helpful for verification. Therefore this step is skipped.
+ CoderResult r = decoder.decode(in, blackHole, endOfInput);
+ if (r.isOverflow()) {
+ blackHole.clear();
+ } else if (r.isUnderflow()) {
+ break;
+ } else if (r.isError()) {
+ r.throwException();
+ } else {
+ // Should not happen
+ throw new InternalError();
+ }
+ }
+ }
+
+ Verifier reset() {
+ decoder.reset();
+ return this;
+ }
+ }
+
+ static final class Encoder {
+
+ private final CharsetEncoder encoder = UTF_8.newEncoder();
+ private boolean coding = true;
+
+ CoderResult encode(CharBuffer in, ByteBuffer out, boolean endOfInput)
+ throws CharacterCodingException {
+
+ if (coding) {
+ CoderResult r = encoder.encode(in, out, endOfInput);
+ if (r.isOverflow()) {
+ return r;
+ } else if (r.isUnderflow()) {
+ if (endOfInput) {
+ coding = false;
+ } else {
+ return r;
+ }
+ } else if (r.isError()) {
+ r.throwException();
+ } else {
+ // Should not happen
+ throw new InternalError();
+ }
+ }
+ assert !coding;
+ return encoder.flush(out);
+ }
+
+ Encoder reset() {
+ coding = true;
+ encoder.reset();
+ return this;
+ }
+ }
+
+ static CharBuffer decode(ByteBuffer in) throws CharacterCodingException {
+ return UTF_8.newDecoder().decode(in);
+ }
+
+ static final class Decoder {
+
+ private final CharsetDecoder decoder = UTF_8.newDecoder();
+ private boolean coding = true; // Either coding or flushing
+
+ CoderResult decode(ByteBuffer in, CharBuffer out, boolean endOfInput)
+ throws CharacterCodingException {
+
+ if (coding) {
+ CoderResult r = decoder.decode(in, out, endOfInput);
+ if (r.isOverflow()) {
+ return r;
+ } else if (r.isUnderflow()) {
+ if (endOfInput) {
+ coding = false;
+ } else {
+ return r;
+ }
+ } else if (r.isError()) {
+ r.throwException();
+ } else {
+ // Should not happen
+ throw new InternalError();
+ }
+ }
+ assert !coding;
+ return decoder.flush(out);
+ }
+
+ Decoder reset() {
+ coding = true;
+ decoder.reset();
+ return this;
+ }
+ }
+}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/ConnectionPool.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/ConnectionPool.java Sat Apr 30 00:30:31 2016 +0100
@@ -35,7 +35,7 @@
class ConnectionPool {
static final long KEEP_ALIVE = Utils.getIntegerNetProperty(
- "sun.net.httpclient.keepalive.timeout", 1200); // seconds
+ "java.net.httpclient.keepalive.timeout", 1200); // seconds
// Pools of idle connections
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/ContinuationFrame.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.io.IOException;
+
+class ContinuationFrame extends HeaderFrame {
+
+ public static final int TYPE = 0x9;
+
+ ContinuationFrame() {
+ type = TYPE;
+ }
+
+ @Override
+ void readIncomingImpl(ByteBufferConsumer bc) throws IOException {
+ headerBlocks = bc.getBuffers(length);
+ }
+
+ @Override
+ void writeOutgoing(ByteBufferGenerator bg) {
+ super.writeOutgoing(bg);
+ for (int i=0; i<headerBlocks.length; i++) {
+ bg.addByteBuffer(headerBlocks[i]);
+ }
+ }
+
+ @Override
+ public boolean endHeaders() {
+ return getFlag(END_HEADERS);
+ }
+
+ @Override
+ void computeLength() {
+ length = headerLength;
+ }
+}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/CookieFilter.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/CookieFilter.java Sat Apr 30 00:30:31 2016 +0100
@@ -44,7 +44,7 @@
@Override
public void request(HttpRequestImpl r) throws IOException {
Map<String,List<String>> userheaders, cookies;
- userheaders = r.getUserHeaders().directMap();
+ userheaders = r.getUserHeaders().map();
cookies = cookieMan.get(r.uri(), userheaders);
// add the returned cookies
HttpHeadersImpl systemHeaders = r.getSystemHeaders();
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/DataFrame.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,126 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+class DataFrame extends Http2Frame {
+
+ public final static int TYPE = 0x0;
+
+ DataFrame() {
+ type = TYPE;
+ }
+
+ // Flags
+ public static final int END_STREAM = 0x1;
+ public static final int PADDED = 0x8;
+
+ int padLength;
+ int dataLength;
+ ByteBuffer[] data;
+
+ public void setData(ByteBuffer[] data) {
+ this.data = data;
+ setDataLength();
+ }
+
+ @Override
+ String flagAsString(int flag) {
+ switch (flag) {
+ case END_STREAM:
+ return "END_STREAM";
+ case PADDED:
+ return "PADDED";
+ }
+ return super.flagAsString(flag);
+ }
+
+ public synchronized void setData(ByteBuffer data) {
+ ByteBuffer[] bb;
+ if (data == null) {
+ bb = new ByteBuffer[0];
+ } else {
+ bb = new ByteBuffer[1];
+ bb[0] = data;
+ }
+ setData(bb);
+ }
+
+ public synchronized ByteBuffer[] getData() {
+ return data;
+ }
+
+ private void setDataLength() {
+ int len = 0;
+ for (ByteBuffer buf : data) {
+ len += buf.remaining();
+ }
+ dataLength = len;
+ }
+
+ @Override
+ void readIncomingImpl(ByteBufferConsumer bc) throws IOException {
+ if ((flags & PADDED) != 0) {
+ padLength = bc.getByte();
+ dataLength = length - (padLength + 1);
+ } else {
+ dataLength = length;
+ }
+ data = bc.getBuffers(dataLength);
+ }
+
+ int getPadLength() {
+ return padLength;
+ }
+
+ int getDataLength() {
+ return dataLength;
+ }
+
+ @Override
+ void writeOutgoing(ByteBufferGenerator bg) {
+ super.writeOutgoing(bg);
+ if ((flags & PADDED) != 0) {
+ ByteBuffer buf = bg.getBuffer(1);
+ buf.put((byte)getPadLength());
+ }
+ for (int i=0; i<data.length; i++) {
+ bg.addByteBuffer(data[i]);
+ }
+ if ((flags & PADDED) != 0) {
+ bg.addPadding(padLength);
+ }
+ }
+
+ @Override
+ void computeLength() {
+ length = dataLength;
+ if ((flags & PADDED) != 0) {
+ length += (1 + padLength);
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/ErrorFrame.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,88 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+abstract class ErrorFrame extends Http2Frame {
+
+ // error codes
+ public static final int NO_ERROR = 0x0;
+ public static final int PROTOCOL_ERROR = 0x1;
+ public static final int INTERNAL_ERROR = 0x2;
+ public static final int FLOW_CONTROL_ERROR = 0x3;
+ public static final int SETTINGS_TIMEOUT = 0x4;
+ public static final int STREAM_CLOSED = 0x5;
+ public static final int FRAME_SIZE_ERROR = 0x6;
+ public static final int REFUSED_STREAM = 0x7;
+ public static final int CANCEL = 0x8;
+ public static final int COMPRESSION_ERROR = 0x9;
+ public static final int CONNECT_ERROR = 0xa;
+ public static final int ENHANCE_YOUR_CALM = 0xb;
+ public static final int INADEQUATE_SECURITY = 0xc;
+ public static final int HTTP_1_1_REQUIRED = 0xd;
+ static final int LAST_ERROR = 0xd;
+
+ static final String[] errorStrings = {
+ "Not an error",
+ "Protocol error",
+ "Internal error",
+ "Flow control error",
+ "Settings timeout",
+ "Stream is closed",
+ "Frame size error",
+ "Stream not processed",
+ "Stream cancelled",
+ "Compression state not updated",
+ "TCP Connection error on CONNECT",
+ "Processing capacity exceeded",
+ "Negotiated TLS parameters not acceptable",
+ "Use HTTP/1.1 for request"
+ };
+
+ public static String stringForCode(int code) {
+ if (code < 0)
+ throw new IllegalArgumentException();
+
+ if (code > LAST_ERROR) {
+ return "Error: " + Integer.toString(code);
+ } else {
+ return errorStrings[code];
+ }
+ }
+
+ int errorCode;
+
+ @Override
+ public String toString() {
+ return super.toString() + " Error: " + stringForCode(errorCode);
+ }
+
+ public int getErrorCode() {
+ return this.errorCode;
+ }
+
+ public void setErrorCode(int errorCode) {
+ this.errorCode = errorCode;
+ }
+}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/Exchange.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/Exchange.java Sat Apr 30 00:30:31 2016 +0100
@@ -128,7 +128,7 @@
}
}
- HttpResponseImpl responseImpl0(HttpConnection connection)
+ private HttpResponseImpl responseImpl0(HttpConnection connection)
throws IOException, InterruptedException
{
exchImpl = ExchangeImpl.get(this, connection);
@@ -136,7 +136,7 @@
request.addSystemHeader("Expect", "100-Continue");
exchImpl.sendHeadersOnly();
HttpResponseImpl resp = exchImpl.getResponse();
- logResponse(resp);
+ Utils.logResponse(resp);
if (resp.statusCode() != 100) {
return resp;
}
@@ -145,7 +145,7 @@
} else {
exchImpl.sendRequest();
HttpResponseImpl resp = exchImpl.getResponse();
- logResponse(resp);
+ Utils.logResponse(resp);
return checkForUpgrade(resp, exchImpl);
}
}
@@ -163,9 +163,7 @@
}
SecurityException e = securityCheck(acc);
if (e != null) {
- CompletableFuture<HttpResponseImpl> cf = new CompletableFuture<>();
- cf.completeExceptionally(e);
- return cf;
+ return CompletableFuture.failedFuture(e);
}
if (permissions.size() > 0) {
return AccessController.doPrivileged(
@@ -182,9 +180,7 @@
try {
exchImpl = ExchangeImpl.get(this, connection);
} catch (IOException | InterruptedException e) {
- CompletableFuture<HttpResponseImpl> cf = new CompletableFuture<>();
- cf.completeExceptionally(e);
- return cf;
+ return CompletableFuture.failedFuture(e);
}
if (request.expectContinue()) {
request.addSystemHeader("Expect", "100-Continue");
@@ -200,23 +196,19 @@
return exchImpl.sendBodyAsync()
.thenCompose(exchImpl::getResponseAsync)
.thenApply((r) -> {
- logResponse(r);
+ Utils.logResponse(r);
return r;
});
} else {
Exchange.this.response = r1;
- logResponse(r1);
+ Utils.logResponse(r1);
return CompletableFuture.completedFuture(r1);
}
});
} else {
return exchImpl
- .sendHeadersAsync()
- .thenCompose((Void v) -> {
- // send body and get response at same time
- return exchImpl.sendBodyAsync()
- .thenCompose(exchImpl::getResponseAsync);
- })
+ .sendRequestAsync()
+ .thenCompose(exchImpl::getResponseAsync)
.thenCompose((HttpResponseImpl r1) -> {
int rcode = r1.statusCode();
CompletableFuture<HttpResponseImpl> cf =
@@ -225,13 +217,13 @@
return cf;
} else {
Exchange.this.response = r1;
- logResponse(r1);
+ Utils.logResponse(r1);
return CompletableFuture.completedFuture(r1);
}
})
.thenApply((HttpResponseImpl response) -> {
this.response = response;
- logResponse(response);
+ Utils.logResponse(response);
return response;
});
}
@@ -254,9 +246,9 @@
client.client2(),
this)
.thenCompose((Http2Connection c) -> {
+ c.putConnection();
Stream s = c.getStream(1);
exchImpl = s;
- c.putConnection();
return s.getResponseAsync(null);
})
);
@@ -294,21 +286,6 @@
}
- private void logResponse(HttpResponseImpl r) {
- if (!Log.requests())
- return;
- StringBuilder sb = new StringBuilder();
- String method = r.request().method();
- URI uri = r.uri();
- String uristring = uri == null ? "" : uri.toString();
- sb.append('(')
- .append(method)
- .append(" ")
- .append(uristring)
- .append(") ")
- .append(Integer.toString(r.statusCode()));
- Log.logResponse(sb.toString());
- }
<T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
return exchImpl.responseBodyAsync(processor);
@@ -352,9 +329,9 @@
}
String method = request.method();
- HttpHeadersImpl userHeaders = request.getUserHeaders();
+ HttpHeaders userHeaders = request.getUserHeaders();
URI u = getURIForSecurityCheck();
- URLPermission p = Utils.getPermission(u, method, userHeaders.directMap());
+ URLPermission p = Utils.getPermission(u, method, userHeaders.map());
try {
assert acc != null;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/FrameReader.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,70 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package java.net.http;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Represents one frame. May be initialized with a leftover buffer from previous
+ * frame. Call {@code haveFrame()} to determine if buffers contains at least one
+ * frame. If false, the obtain another buffer and call {@code}input(ByteBuffer)}.
+ * There may be additional bytes at end of the frame list.
+ */
+class FrameReader {
+
+ final List<ByteBuffer> buffers;
+
+ FrameReader() {
+ buffers = new LinkedList<>();
+ }
+
+ FrameReader(FrameReader that) {
+ this.buffers = that.buffers;
+ }
+
+ FrameReader(ByteBuffer remainder) {
+ buffers = new LinkedList<>();
+ if (remainder != null) {
+ buffers.add(remainder);
+ }
+ }
+
+ public synchronized void input(ByteBuffer buffer) {
+ buffers.add(buffer);
+ }
+
+ public synchronized boolean haveFrame() {
+ //buffers = Utils.superCompact(buffers, () -> ByteBuffer.allocate(Utils.BUFSIZE));
+ int size = 0;
+ for (ByteBuffer buffer : buffers) {
+ size += buffer.remaining();
+ }
+ if (size < 3) {
+ return false; // don't have length yet
+ }
+ // we at least have length field
+ int length = 0;
+ int j = 0;
+ ByteBuffer b = buffers.get(j);
+ b.mark();
+ for (int i=0; i<3; i++) {
+ while (!b.hasRemaining()) {
+ b.reset();
+ b = buffers.get(++j);
+ b.mark();
+ }
+ length = (length << 8) + (b.get() & 0xff);
+ }
+ b.reset();
+ return (size >= length + 9); // frame length
+ }
+
+ synchronized List<ByteBuffer> frame() {
+ return buffers;
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/GoAwayFrame.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+class GoAwayFrame extends ErrorFrame {
+
+ GoAwayFrame() {
+ type = TYPE;
+ }
+
+ int lastStream;
+ byte[] debugData = new byte[0];
+
+ public static final int TYPE = 0x7;
+
+ // Flags
+ public static final int ACK = 0x1;
+
+ public void setDebugData(byte[] debugData) {
+ this.debugData = debugData;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + " Debugdata: " + new String(debugData);
+ }
+
+ @Override
+ String flagAsString(int flag) {
+ switch (flag) {
+ case ACK:
+ return "ACK";
+ }
+ return super.flagAsString(flag);
+ }
+
+ public void setLastStream(int lastStream) {
+ this.lastStream = lastStream;
+ }
+
+ public int getLastStream() {
+ return this.lastStream;
+ }
+
+ public byte[] getDebugData() {
+ return debugData;
+ }
+
+ @Override
+ void readIncomingImpl(ByteBufferConsumer bc) throws IOException {
+ if (length < 8) {
+ throw new IOException("Invalid GoAway frame");
+ }
+ lastStream = bc.getInt() & 0x7fffffff;
+ errorCode = bc.getInt();
+ //debugData = bc.getBytes(8);
+ int datalen = length - 8;
+ if (datalen > 0) {
+ debugData = bc.getBytes(datalen);
+ Log.logError("GoAway debugData " + new String(debugData));
+ }
+ }
+
+ @Override
+ void writeOutgoing(ByteBufferGenerator bg) {
+ super.writeOutgoing(bg);
+ ByteBuffer buf = bg.getBuffer(length);
+ buf.putInt(lastStream);
+ buf.putInt(errorCode);
+ if (length > 8) {
+ buf.put(debugData);
+ }
+ }
+
+ @Override
+ void computeLength() {
+ length = 8 + debugData.length;
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/HeaderFrame.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Either a HeadersFrame or a ContinuationFrame
+ */
+abstract class HeaderFrame extends Http2Frame {
+
+ int offset;
+ int number;
+ int headerLength;
+ ByteBuffer[] headerBlocks;
+
+ public static final int END_HEADERS = 0x4;
+
+ @Override
+ String flagAsString(int flag) {
+ switch (flag) {
+ case END_HEADERS:
+ return "END_HEADERS";
+ }
+ return super.flagAsString(flag);
+ }
+
+ /**
+ * Sets the array of hpack encoded ByteBuffers
+ */
+ public void setHeaderBlock(ByteBuffer bufs[], int offset, int number) {
+ this.headerBlocks = bufs;
+ this.offset = offset;
+ this.number = number;
+ int length = 0;
+ for (int i=offset; i<offset+number; i++) {
+ length += headerBlocks[i].remaining();
+ }
+ this.headerLength = length;
+ }
+
+ public void setHeaderBlock(ByteBuffer bufs[]) {
+ setHeaderBlock(bufs, 0, bufs.length);
+ }
+
+ public ByteBuffer[] getHeaderBlock() {
+ return headerBlocks;
+ }
+
+ /**
+ * Returns true if this block is the final block of headers
+ */
+ public abstract boolean endHeaders();
+
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/HeadersFrame.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,138 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+class HeadersFrame extends HeaderFrame {
+
+ public final static int TYPE = 0x1;
+
+ // Flags
+ public static final int END_STREAM = 0x1;
+ public static final int PADDED = 0x8;
+ public static final int PRIORITY = 0x20;
+
+
+ int padLength;
+ int streamDependency;
+ int weight;
+ boolean exclusive;
+
+ HeadersFrame() {
+ type = TYPE;
+ }
+
+ @Override
+ String flagAsString(int flag) {
+ switch (flag) {
+ case END_STREAM:
+ return "END_STREAM";
+ case PADDED:
+ return "PADDED";
+ case PRIORITY:
+ return "PRIORITY";
+ }
+ return super.flagAsString(flag);
+ }
+
+ public void setPadLength(int padLength) {
+ this.padLength = padLength;
+ flags |= PADDED;
+ }
+
+ public void setPriority(int streamDependency, boolean exclusive, int weight) {
+ this.streamDependency = streamDependency;
+ this.exclusive = exclusive;
+ this.weight = weight;
+ this.flags |= PRIORITY;
+ }
+
+ public int getStreamDependency() {
+ return streamDependency;
+ }
+
+ public int getWeight() {
+ return weight;
+ }
+
+ @Override
+ public boolean endHeaders() {
+ return getFlag(END_HEADERS);
+ }
+
+ public boolean getExclusive() {
+ return exclusive;
+ }
+
+ @Override
+ void readIncomingImpl(ByteBufferConsumer bc) throws IOException {
+ if ((flags & PADDED) != 0) {
+ padLength = bc.getByte();
+ }
+ if ((flags & PRIORITY) != 0) {
+ int x = bc.getInt();
+ exclusive = (x & 0x80000000) != 0;
+ streamDependency = x & 0x7fffffff;
+ weight = bc.getByte();
+ }
+ headerLength = length - padLength;
+ headerBlocks = bc.getBuffers(headerLength);
+ }
+
+ @Override
+ void computeLength() {
+ int len = 0;
+ if ((flags & PADDED) != 0) {
+ len += (1 + padLength);
+ }
+ if ((flags & PRIORITY) != 0) {
+ len += 5;
+ }
+ len += headerLength;
+ this.length = len;
+ }
+
+ @Override
+ void writeOutgoing(ByteBufferGenerator bg) {
+ super.writeOutgoing(bg);
+ ByteBuffer buf = bg.getBuffer(6);
+ if ((flags & PADDED) != 0) {
+ buf.put((byte)padLength);
+ }
+ if ((flags & PRIORITY) != 0) {
+ int x = exclusive ? 1 << 31 + streamDependency : streamDependency;
+ buf.putInt(x);
+ buf.put((byte)weight);
+ }
+ for (int i=0; i<headerBlocks.length; i++) {
+ bg.addByteBuffer(headerBlocks[i]);
+ }
+ if ((flags & PADDED) != 0) {
+ bg.addPadding(padLength);
+ }
+ }
+}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/Http1Exchange.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/Http1Exchange.java Sat Apr 30 00:30:31 2016 +0100
@@ -64,32 +64,12 @@
if (connection != null) {
this.connection = connection;
} else {
- InetSocketAddress addr = getAddress(request);
+ InetSocketAddress addr = Utils.getAddress(request);
this.connection = HttpConnection.getConnection(addr, request);
}
this.requestAction = new Http1Request(request, this.connection);
}
- private static InetSocketAddress getAddress(HttpRequestImpl req) {
- URI uri = req.uri();
- if (uri == null) {
- return req.authority();
- }
- int port = uri.getPort();
- if (port == -1) {
- if (uri.getScheme().equalsIgnoreCase("https")) {
- port = 443;
- } else {
- port = 80;
- }
- }
- String host = uri.getHost();
- if (req.proxy() == null) {
- return new InetSocketAddress(host, port);
- } else {
- return InetSocketAddress.createUnresolved(host, port);
- }
- }
HttpConnection connection() {
return connection;
@@ -211,7 +191,7 @@
connection.close();
}
},
- () -> request.getAccessControlContext());
+ request::getAccessControlContext);
operations.add(cf);
return cf;
}
@@ -269,7 +249,7 @@
cf.completeExceptionally(e);
connection.close();
}
- }, () -> request.getAccessControlContext());
+ }, request::getAccessControlContext);
operations.add(cf);
return cf;
}
@@ -302,7 +282,7 @@
cf.completeExceptionally(e);
connection.close();
}
- }, () -> request.getAccessControlContext());
+ }, request::getAccessControlContext);
operations.add(cf);
return cf;
}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/Http1Request.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/Http1Request.java Sat Apr 30 00:30:31 2016 +0100
@@ -30,6 +30,7 @@
import java.util.Map;
import java.util.Set;
import java.net.InetSocketAddress;
+import java.net.http.HttpConnection.Mode;
import java.nio.charset.StandardCharsets;
import java.util.function.LongConsumer;
import static java.nio.charset.StandardCharsets.US_ASCII;
@@ -48,7 +49,8 @@
// See line 206 and below for description
final ByteBuffer[] buffers;
final HttpRequest.BodyProcessor requestProc;
- final HttpHeadersImpl userHeaders, systemHeaders;
+ final HttpHeaders userHeaders;
+ final HttpHeadersImpl systemHeaders;
final LongConsumer flowController;
boolean streaming;
long contentLength;
@@ -91,10 +93,10 @@
private void collectHeaders1(StringBuilder sb,
HttpRequestImpl request,
- HttpHeadersImpl headers)
+ HttpHeaders headers)
throws IOException
{
- Map<String,List<String>> h = headers.directMap();
+ Map<String,List<String>> h = headers.map();
Set<Map.Entry<String,List<String>>> entries = h.entrySet();
for (Map.Entry<String,List<String>> entry : entries) {
@@ -112,8 +114,6 @@
}
}
- private static final int BUFSIZE = 64 * 1024; // TODO: configurable?
-
private String getPathAndQuery(URI uri) {
String path = uri.getPath();
String query = uri.getQuery();
@@ -134,6 +134,25 @@
return addr.getHostString() + ":" + addr.getPort();
}
+ private String hostString() {
+ URI uri = request.uri();
+ int port = uri.getPort();
+ String host = uri.getHost();
+
+ boolean defaultPort;
+ if (port == -1)
+ defaultPort = true;
+ else if (request.secure())
+ defaultPort = port == 443;
+ else
+ defaultPort = port == 80;
+
+ if (defaultPort)
+ return host;
+ else
+ return host + ":" + Integer.toString(port);
+ }
+
private String requestURI() {
URI uri = request.uri();
String method = request.method();
@@ -161,6 +180,7 @@
void sendRequest() throws IOException {
collectHeaders();
+ chan.configureMode(Mode.BLOCKING);
if (contentLength == 0) {
chan.write(buffers, 0, 2);
} else if (contentLength > 0) {
@@ -196,7 +216,7 @@
buffers[0] = ByteBuffer.wrap(cmd.getBytes(StandardCharsets.US_ASCII));
URI uri = request.uri();
if (uri != null) {
- systemHeaders.setHeader("Host", uri.getHost());
+ systemHeaders.setHeader("Host", hostString());
}
if (request == null) {
// this is not a user request. No content
--- a/jdk/src/java.httpclient/share/classes/java/net/http/Http1Response.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/Http1Response.java Sat Apr 30 00:30:31 2016 +0100
@@ -24,7 +24,6 @@
package java.net.http;
import java.io.IOException;
-import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
--- a/jdk/src/java.httpclient/share/classes/java/net/http/Http2ClientImpl.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/Http2ClientImpl.java Sat Apr 30 00:30:31 2016 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -23,11 +23,133 @@
*/
package java.net.http;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import static java.net.http.SettingsFrame.INITIAL_WINDOW_SIZE;
+import static java.net.http.SettingsFrame.ENABLE_PUSH;
+import static java.net.http.SettingsFrame.HEADER_TABLE_SIZE;
+import static java.net.http.SettingsFrame.MAX_CONCURRENT_STREAMS;
+import static java.net.http.SettingsFrame.MAX_FRAME_SIZE;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Http2 specific aspects of HttpClientImpl
+ */
class Http2ClientImpl {
- Http2ClientImpl(HttpClientImpl t) {}
- String getSettingsString() {return "";}
- void debugPrint() {}
- Http2Connection getConnectionFor(HttpRequestImpl r) {
- return null;
+
+ final private HttpClientImpl client;
+
+ Http2ClientImpl(HttpClientImpl client) {
+ this.client = client;
+ }
+
+ /* Map key is "scheme:host:port" */
+ final private Map<String,Http2Connection> connections =
+ Collections.synchronizedMap(new HashMap<>());
+
+ final private Set<String> opening = Collections.synchronizedSet(new HashSet<>());
+
+ synchronized boolean haveConnectionFor(URI uri, InetSocketAddress proxy) {
+ return connections.containsKey(Http2Connection.keyFor(uri,proxy));
+ }
+
+ /**
+ * If a https request then blocks and waits until a connection is opened.
+ * Returns null if the request is 'http' as a different (upgrade)
+ * mechanism is used.
+ *
+ * Only one connection per destination is created. Blocks when opening
+ * connection, or when waiting for connection to be opened.
+ * First thread opens the connection and notifies the others when done.
+ *
+ * If the request is secure (https) then we open the connection here.
+ * If not, then the more complicated upgrade from 1.1 to 2 happens (not here)
+ * In latter case, when the Http2Connection is connected, putConnection() must
+ * be called to store it.
+ */
+ Http2Connection getConnectionFor(HttpRequestImpl req)
+ throws IOException, InterruptedException {
+ URI uri = req.uri();
+ InetSocketAddress proxy = req.proxy();
+ String key = Http2Connection.keyFor(uri, proxy);
+ Http2Connection connection;
+ synchronized (opening) {
+ while ((connection = connections.get(key)) == null) {
+ if (!req.secure()) {
+ return null;
+ }
+ if (!opening.contains(key)) {
+ opening.add(key);
+ break;
+ } else {
+ opening.wait();
+ }
+ }
+ }
+ if (connection != null) {
+ return connection;
+ }
+ // we are opening the connection here blocking until it is done.
+ connection = new Http2Connection(req);
+ synchronized (opening) {
+ connections.put(key, connection);
+ opening.remove(key);
+ opening.notifyAll();
+ }
+ return connection;
+ }
+
+
+ /*
+ * TODO: If there isn't a connection to the same destination, then
+ * store it. If there is already a connection, then close it
+ */
+ synchronized void putConnection(Http2Connection c) {
+ String key = c.key();
+ connections.put(key, c);
+ }
+
+ synchronized void deleteConnection(Http2Connection c) {
+ String key = c.key();
+ connections.remove(key);
+ }
+
+ HttpClientImpl client() {
+ return client;
+ }
+
+ /** Returns the client settings as a base64 (url) encoded string */
+ String getSettingsString() {
+ SettingsFrame sf = getClientSettings();
+ ByteBufferGenerator bg = new ByteBufferGenerator(client);
+ sf.writeOutgoing(bg);
+ byte[] settings = bg.asByteArray(9); // without the header
+ Base64.Encoder encoder = Base64.getUrlEncoder()
+ .withoutPadding();
+ return encoder.encodeToString(settings);
+ }
+
+ private static final int K = 1024;
+
+ SettingsFrame getClientSettings() {
+ SettingsFrame frame = new SettingsFrame();
+ frame.setParameter(HEADER_TABLE_SIZE, Utils.getIntegerNetProperty(
+ "java.net.httpclient.hpack.maxheadertablesize", 16 * K));
+ frame.setParameter(ENABLE_PUSH, Utils.getIntegerNetProperty(
+ "java.net.httpclient.enablepush", 1));
+ frame.setParameter(MAX_CONCURRENT_STREAMS, Utils.getIntegerNetProperty(
+ "java.net.httpclient.maxstreams", 16));
+ frame.setParameter(INITIAL_WINDOW_SIZE, Utils.getIntegerNetProperty(
+ "java.net.httpclient.windowsize", 32 * K));
+ frame.setParameter(MAX_FRAME_SIZE, Utils.getIntegerNetProperty(
+ "java.net.httpclient.maxframesize", 16 * K));
+ frame.computeLength();
+ return frame;
}
}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/Http2Connection.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/Http2Connection.java Sat Apr 30 00:30:31 2016 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -24,42 +24,767 @@
package java.net.http;
import java.io.IOException;
-import java.net.Authenticator;
-import java.net.CookieManager;
-import java.net.ProxySelector;
+import java.net.InetSocketAddress;
import java.net.URI;
-import static java.net.http.Utils.BUFSIZE;
+import java.net.http.HttpConnection.Mode;
import java.nio.ByteBuffer;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import static java.nio.channels.SelectionKey.OP_CONNECT;
-import static java.nio.channels.SelectionKey.OP_READ;
-import static java.nio.channels.SelectionKey.OP_WRITE;
-import java.nio.channels.Selector;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.*;
-import java.security.NoSuchAlgorithmException;
-import java.util.ListIterator;
-import java.util.Optional;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLParameters;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import sun.net.httpclient.hpack.Encoder;
+import sun.net.httpclient.hpack.Decoder;
+import static java.net.http.SettingsFrame.*;
+import static java.net.http.Utils.BUFSIZE;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Formatter;
+import java.util.stream.Collectors;
+import sun.net.httpclient.hpack.DecodingCallback;
+
+/**
+ * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
+ * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
+ *
+ * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
+ * to a HttpClientImpl.
+ *
+ * Creation cases:
+ * 1) upgraded HTTP/1.1 plain tcp connection
+ * 2) prior knowledge directly created plain tcp connection
+ * 3) directly created HTTP/2 SSL connection which uses ALPN.
+ *
+ * Sending is done by writing directly to underlying HttpConnection object which
+ * is operating in async mode. No flow control applies on output at this level
+ * and all writes are just executed as puts to an output Q belonging to HttpConnection
+ * Flow control is implemented by HTTP/2 protocol itself.
+ *
+ * Hpack header compression
+ * and outgoing stream creation is also done here, because these operations
+ * must be synchronized at the socket level. Stream objects send frames simply
+ * by placing them on the connection's output Queue. sendFrame() is called
+ * from a higher level (Stream) thread.
+ *
+ * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
+ * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
+ * or handles them directly itself. This thread performs hpack decompression
+ * and incoming stream creation (Server push). Incoming frames destined for a
+ * stream are provided by calling Stream.incoming().
+ */
+class Http2Connection implements BufferHandler {
+
+ final Queue<Http2Frame> outputQ;
+ volatile boolean closed;
+
+ //-------------------------------------
+ final HttpConnection connection;
+ HttpClientImpl client;
+ final Http2ClientImpl client2;
+ Map<Integer,Stream> streams;
+ int nextstreamid = 3; // stream 1 is registered separately
+ int nextPushStream = 2;
+ Encoder hpackOut;
+ Decoder hpackIn;
+ SettingsFrame clientSettings, serverSettings;
+ ByteBufferConsumer bbc;
+ final LinkedList<ByteBuffer> freeList;
+ final String key; // for HttpClientImpl.connections map
+ FrameReader reader;
+
+ // Connection level flow control windows
+ int sendWindow = INITIAL_WINDOW_SIZE;
+
+ final static int DEFAULT_FRAME_SIZE = 16 * 1024;
+ private static ByteBuffer[] empty = Utils.EMPTY_BB_ARRAY;
+
+ final ExecutorWrapper executor;
+
+ /**
+ * This is established by the protocol spec and the peer will update it with
+ * WINDOW_UPDATEs, which affects the sendWindow.
+ */
+ final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
+
+ // TODO: need list of control frames from other threads
+ // that need to be sent
+
+ /**
+ * Case 1) Create from upgraded HTTP/1.1 connection.
+ * Is ready to use. Will not be SSL. exchange is the Exchange
+ * that initiated the connection, whose response will be delivered
+ * on a Stream.
+ */
+ Http2Connection(HttpConnection connection, Http2ClientImpl client2,
+ Exchange exchange) throws IOException, InterruptedException {
+ this.outputQ = new Queue<>();
+ String msg = "Connection send window size " + Integer.toString(sendWindow);
+ Log.logTrace(msg);
+
+ //this.initialExchange = exchange;
+ assert !(connection instanceof SSLConnection);
+ this.connection = connection;
+ this.client = client2.client();
+ this.client2 = client2;
+ this.executor = client.executorWrapper();
+ this.freeList = new LinkedList<>();
+ this.key = keyFor(connection);
+ streams = Collections.synchronizedMap(new HashMap<>());
+ initCommon();
+ //sendConnectionPreface();
+ Stream initialStream = createStream(exchange);
+ initialStream.registerStream(1);
+ initialStream.requestSent();
+ sendConnectionPreface();
+ connection.configureMode(Mode.ASYNC);
+ // start reading and writing
+ // start reading
+ AsyncConnection asyncConn = (AsyncConnection)connection;
+ asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
+ asyncReceive(connection.getRemaining());
+ asyncConn.startReading();
+ }
+
+ // async style but completes immediately
+ static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
+ Http2ClientImpl client2, Exchange exchange) {
+ CompletableFuture<Http2Connection> cf = new CompletableFuture<>();
+ try {
+ Http2Connection c = new Http2Connection(connection, client2, exchange);
+ cf.complete(c);
+ } catch (IOException | InterruptedException e) {
+ cf.completeExceptionally(e);
+ }
+ return cf;
+ }
+
+ /**
+ * Cases 2) 3)
+ *
+ * request is request to be sent.
+ */
+ Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException {
+ InetSocketAddress proxy = request.proxy();
+ URI uri = request.uri();
+ InetSocketAddress addr = Utils.getAddress(request);
+ String msg = "Connection send window size " + Integer.toString(sendWindow);
+ Log.logTrace(msg);
+ this.key = keyFor(uri, proxy);
+ this.connection = HttpConnection.getConnection(addr, request, this);
+ streams = Collections.synchronizedMap(new HashMap<>());
+ this.client = request.client();
+ this.client2 = client.client2();
+ this.executor = client.executorWrapper();
+ this.freeList = new LinkedList<>();
+ this.outputQ = new Queue<>();
+ nextstreamid = 1;
+ initCommon();
+ connection.connect();
+ connection.configureMode(Mode.ASYNC);
+ // start reading
+ AsyncConnection asyncConn = (AsyncConnection)connection;
+ asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
+ sendConnectionPreface();
+ asyncConn.startReading();
+ }
+
+ // NEW
+ synchronized void obtainSendWindow(int amount) throws InterruptedException {
+ while (amount > 0) {
+ int n = Math.min(amount, sendWindow);
+ sendWindow -= n;
+ amount -= n;
+ if (amount > 0)
+ wait();
+ }
+ }
+
+ synchronized void updateSendWindow(int amount) {
+ if (sendWindow == 0) {
+ sendWindow += amount;
+ notifyAll();
+ } else
+ sendWindow += amount;
+ }
+
+ synchronized int sendWindow() {
+ return sendWindow;
+ }
+
+ static String keyFor(HttpConnection connection) {
+ boolean isProxy = connection.isProxied();
+ boolean isSecure = connection.isSecure();
+ InetSocketAddress addr = connection.address();
+
+ return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
+ }
+
+ static String keyFor(URI uri, InetSocketAddress proxy) {
+ boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
+ boolean isProxy = proxy != null;
+
+ String host;
+ int port;
+
+ if (isProxy) {
+ host = proxy.getHostString();
+ port = proxy.getPort();
+ } else {
+ host = uri.getHost();
+ port = uri.getPort();
+ }
+ return keyString(isSecure, isProxy, host, port);
+ }
+
+ // {C,S}:{H:P}:host:port
+ // C indicates clear text connection "http"
+ // S indicates secure "https"
+ // H indicates host (direct) connection
+ // P indicates proxy
+ // Eg: "S:H:foo.com:80"
+ static String keyString(boolean secure, boolean proxy, String host, int port) {
+ char c1 = secure ? 'S' : 'C';
+ char c2 = proxy ? 'P' : 'H';
+
+ StringBuilder sb = new StringBuilder(128);
+ sb.append(c1).append(':').append(c2).append(':')
+ .append(host).append(':').append(port);
+ return sb.toString();
+ }
+
+ String key() {
+ return this.key;
+ }
+
+ void putConnection() {
+ client2.putConnection(this);
+ }
+
+ private static String toHexdump1(ByteBuffer bb) {
+ bb.mark();
+ StringBuilder sb = new StringBuilder(512);
+ Formatter f = new Formatter(sb);
+
+ while (bb.hasRemaining()) {
+ int i = Byte.toUnsignedInt(bb.get());
+ f.format("%02x:", i);
+ }
+ sb.deleteCharAt(sb.length()-1);
+ bb.reset();
+ return sb.toString();
+ }
+
+ private static String toHexdump(ByteBuffer bb) {
+ List<String> words = new ArrayList<>();
+ int i = 0;
+ bb.mark();
+ while (bb.hasRemaining()) {
+ if (i % 2 == 0) {
+ words.add("");
+ }
+ byte b = bb.get();
+ String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1);
+ words.set(i / 2, words.get(i / 2) + hex);
+ i++;
+ }
+ bb.reset();
+ return words.stream().collect(Collectors.joining(" "));
+ }
+
+ private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) {
+ boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
+
+ ByteBuffer[] buffers = frame.getHeaderBlock();
+ for (int i = 0; i < buffers.length; i++) {
+ hpackIn.decode(buffers[i], endOfHeaders, decoder);
+ }
+ }
+
+ int getInitialSendWindowSize() {
+ return serverSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE);
+ }
+
+ void close() {
+ GoAwayFrame f = new GoAwayFrame();
+ f.setDebugData("Requested by user".getBytes());
+ // TODO: set last stream. For now zero ok.
+ sendFrame(f);
+ }
+
+ // BufferHandler methods
+
+ @Override
+ public ByteBuffer getBuffer(int n) {
+ return client.getBuffer(n);
+ }
+
+ @Override
+ public void returnBuffer(ByteBuffer buf) {
+ client.returnBuffer(buf);
+ }
-class Http2Connection {
- static CompletableFuture<Http2Connection> createAsync(
- HttpConnection connection, Http2ClientImpl client2, Exchange exchange) {
- return null;
+ @Override
+ public void setMinBufferSize(int n) {
+ client.setMinBufferSize(n);
+ }
+
+ private final Object readlock = new Object();
+
+ void asyncReceive(ByteBuffer buffer) {
+ synchronized (readlock) {
+ try {
+ if (reader == null) {
+ reader = new FrameReader(buffer);
+ } else {
+ reader.input(buffer);
+ }
+ while (true) {
+ if (reader.haveFrame()) {
+ List<ByteBuffer> buffers = reader.frame();
+
+ ByteBufferConsumer bbc = new ByteBufferConsumer(buffers, this::getBuffer);
+ processFrame(bbc);
+ if (bbc.consumed()) {
+ reader = new FrameReader();
+ return;
+ } else {
+ reader = new FrameReader(reader);
+ }
+ } else
+ return;
+ }
+ } catch (Throwable e) {
+ String msg = Utils.stackTrace(e);
+ Log.logTrace(msg);
+ shutdown(e);
+ }
+ }
+ }
+
+ void shutdown(Throwable t) {
+ System.err.println("Shutdown: " + t);
+ t.printStackTrace();
+ closed = true;
+ client2.deleteConnection(this);
+ Collection<Stream> c = streams.values();
+ for (Stream s : c) {
+ s.cancelImpl(t);
+ }
+ connection.close();
+ }
+
+ /**
+ * Handles stream 0 (common) frames that apply to whole connection and passes
+ * other stream specific frames to that Stream object.
+ *
+ * Invokes Stream.incoming() which is expected to process frame without
+ * blocking.
+ */
+ void processFrame(ByteBufferConsumer bbc) throws IOException, InterruptedException {
+ Http2Frame frame = Http2Frame.readIncoming(bbc);
+ Log.logFrames(frame, "IN");
+ int streamid = frame.streamid();
+ if (streamid == 0) {
+ handleCommonFrame(frame);
+ } else {
+ Stream stream = getStream(streamid);
+ if (stream == null) {
+ // should never receive a frame with unknown stream id
+ resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
+ }
+ if (frame instanceof PushPromiseFrame) {
+ PushPromiseFrame pp = (PushPromiseFrame)frame;
+ handlePushPromise(stream, pp);
+ } else if (frame instanceof HeaderFrame) {
+ // decode headers (or continuation)
+ decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
+ stream.incoming(frame);
+ } else
+ stream.incoming(frame);
+ }
+ }
+
+ private void handlePushPromise(Stream parent, PushPromiseFrame pp)
+ throws IOException, InterruptedException {
+
+ HttpRequestImpl parentReq = parent.request;
+ int promisedStreamid = pp.getPromisedStream();
+ if (promisedStreamid != nextPushStream) {
+ resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
+ return;
+ } else {
+ nextPushStream += 2;
+ }
+ HeaderDecoder decoder = new HeaderDecoder();
+ decodeHeaders(pp, decoder);
+ HttpHeadersImpl headers = decoder.headers();
+ HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
+
+ Stream.PushedStream pushStream = createPushStream(parent, pushReq);
+ pushStream.registerStream(promisedStreamid);
+ parent.incoming_pushPromise(pushReq, pushStream);
+ }
+
+ private void handleCommonFrame(Http2Frame frame)
+ throws IOException, InterruptedException {
+
+ switch (frame.type()) {
+ case SettingsFrame.TYPE:
+ { SettingsFrame f = (SettingsFrame)frame;
+ handleSettings(f);}
+ break;
+ case PingFrame.TYPE:
+ { PingFrame f = (PingFrame)frame;
+ handlePing(f);}
+ break;
+ case GoAwayFrame.TYPE:
+ { GoAwayFrame f = (GoAwayFrame)frame;
+ handleGoAway(f);}
+ break;
+ case WindowUpdateFrame.TYPE:
+ { WindowUpdateFrame f = (WindowUpdateFrame)frame;
+ handleWindowUpdate(f);}
+ break;
+ default:
+ protocolError(ErrorFrame.PROTOCOL_ERROR);
+ }
+ }
+
+ void resetStream(int streamid, int code) throws IOException, InterruptedException {
+ Log.logError(
+ "Resetting stream {0,number,integer} with error code {1,number,integer}",
+ streamid, code);
+ ResetFrame frame = new ResetFrame();
+ frame.streamid(streamid);
+ frame.setErrorCode(code);
+ sendFrame(frame);
+ streams.remove(streamid);
+ }
+
+ private void handleWindowUpdate(WindowUpdateFrame f)
+ throws IOException, InterruptedException {
+ updateSendWindow(f.getUpdate());
+ }
+
+ private void protocolError(int errorCode)
+ throws IOException, InterruptedException {
+ GoAwayFrame frame = new GoAwayFrame();
+ frame.setErrorCode(errorCode);
+ sendFrame(frame);
+ String msg = "Error code: " + errorCode;
+ shutdown(new IOException("protocol error"));
+ }
+
+ private void handleSettings(SettingsFrame frame)
+ throws IOException, InterruptedException {
+ if (frame.getFlag(SettingsFrame.ACK)) {
+ // ignore ack frames for now.
+ return;
+ }
+ serverSettings = frame;
+ SettingsFrame ack = getAckFrame(frame.streamid());
+ sendFrame(ack);
+ }
+
+ private void handlePing(PingFrame frame)
+ throws IOException, InterruptedException {
+ frame.setFlag(PingFrame.ACK);
+ sendFrame(frame);
+ }
+
+ private void handleGoAway(GoAwayFrame frame)
+ throws IOException, InterruptedException {
+ //System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode()));
+ shutdown(new IOException("GOAWAY received"));
+ }
+
+ private void initCommon() {
+ clientSettings = client2.getClientSettings();
+
+ // serverSettings will be updated by server
+ serverSettings = SettingsFrame.getDefaultSettings();
+ hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
+ hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
+ }
+
+ /**
+ * Max frame size we are allowed to send
+ */
+ public int getMaxSendFrameSize() {
+ int param = serverSettings.getParameter(MAX_FRAME_SIZE);
+ if (param == -1) {
+ param = DEFAULT_FRAME_SIZE;
+ }
+ return param;
+ }
+
+ /**
+ * Max frame size we will receive
+ */
+ public int getMaxReceiveFrameSize() {
+ return clientSettings.getParameter(MAX_FRAME_SIZE);
+ }
+
+ // Not sure how useful this is.
+ public int getMaxHeadersSize() {
+ return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
+ }
+
+ private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
+
+ private static final byte[] PREFACE_BYTES =
+ CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
+
+ /**
+ * Sends Connection preface and Settings frame with current preferred
+ * values
+ */
+ private void sendConnectionPreface() throws IOException {
+ ByteBufferGenerator bg = new ByteBufferGenerator(this);
+ bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES);
+ ByteBuffer[] ba = bg.getBufferArray();
+ connection.write(ba, 0, ba.length);
+
+ bg = new ByteBufferGenerator(this);
+ SettingsFrame sf = client2.getClientSettings();
+ Log.logFrames(sf, "OUT");
+ sf.writeOutgoing(bg);
+ WindowUpdateFrame wup = new WindowUpdateFrame();
+ wup.streamid(0);
+ // send a Window update for the receive buffer we are using
+ // minus the initial 64 K specified in protocol
+ wup.setUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1));
+ wup.computeLength();
+ wup.writeOutgoing(bg);
+ Log.logFrames(wup, "OUT");
+ ba = bg.getBufferArray();
+ connection.write(ba, 0, ba.length);
+ }
+
+ /**
+ * Returns an existing Stream with given id, or null if doesn't exist
+ */
+ Stream getStream(int streamid) {
+ return streams.get(streamid);
+ }
+
+ /**
+ * Creates Stream with given id.
+ */
+ Stream createStream(Exchange exchange) {
+ Stream stream = new Stream(client, this, exchange);
+ return stream;
+ }
+
+ Stream.PushedStream createPushStream(Stream parent, HttpRequestImpl pushReq) {
+ Stream.PushGroup<?> pg = parent.request.pushGroup();
+ return new Stream.PushedStream(pg, client, this, parent, pushReq);
+ }
+
+ void putStream(Stream stream, int streamid) {
+ streams.put(streamid, stream);
+ }
+
+ void deleteStream(Stream stream) {
+ streams.remove(stream.streamid);
+ }
+
+ static final int MAX_STREAM = Integer.MAX_VALUE - 2;
+
+ // Number of header bytes in a Headers Frame
+ final static int HEADERS_HEADER_SIZE = 15;
+
+ // Number of header bytes in a Continuation frame
+ final static int CONTIN_HEADER_SIZE = 9;
+
+ /**
+ * Encode the headers into a List<ByteBuffer> and then create HEADERS
+ * and CONTINUATION frames from the list and return the List<Http2Frame>.
+ *
+ * @param frame
+ * @return
+ */
+ private LinkedList<Http2Frame> encodeHeaders(OutgoingHeaders frame) {
+ LinkedList<ByteBuffer> buffers = new LinkedList<>();
+ ByteBuffer buf = getBuffer();
+ buffers.add(buf);
+ encodeHeadersImpl(frame.stream.getRequestPseudoHeaders(), buffers);
+ encodeHeadersImpl(frame.getUserHeaders(), buffers);
+ encodeHeadersImpl(frame.getSystemHeaders(), buffers);
+
+ for (ByteBuffer b : buffers) {
+ b.flip();
}
- Http2Connection(HttpConnection connection, Http2ClientImpl client2,
- Exchange exchange) throws IOException, InterruptedException {
+ LinkedList<Http2Frame> frames = new LinkedList<>();
+ int maxframesize = getMaxSendFrameSize();
+
+ HeadersFrame oframe = new HeadersFrame();
+ oframe.setFlags(frame.getFlags());
+ oframe.streamid(frame.streamid());
+
+ oframe.setHeaderBlock(getBufferArray(buffers, maxframesize));
+ frames.add(oframe);
+ // Any buffers left?
+ boolean done = buffers.isEmpty();
+ if (done) {
+ oframe.setFlag(HeaderFrame.END_HEADERS);
+ } else {
+ ContinuationFrame cf = null;
+ while (!done) {
+ cf = new ContinuationFrame();
+ cf.streamid(frame.streamid());
+ cf.setHeaderBlock(getBufferArray(buffers, maxframesize));
+ frames.add(cf);
+ done = buffers.isEmpty();
+ }
+ cf.setFlag(HeaderFrame.END_HEADERS);
+ }
+ return frames;
+ }
+
+ // should always return at least one buffer
+ private static ByteBuffer[] getBufferArray(LinkedList<ByteBuffer> list, int maxsize) {
+ assert maxsize >= BUFSIZE;
+ LinkedList<ByteBuffer> newlist = new LinkedList<>();
+ int size = list.size();
+ int nbytes = 0;
+ for (int i=0; i<size; i++) {
+ ByteBuffer buf = list.getFirst();
+ if (nbytes + buf.remaining() <= maxsize) {
+ nbytes += buf.remaining();
+ newlist.add(buf);
+ list.remove();
+ } else {
+ break;
+ }
+ }
+ return newlist.toArray(empty);
+ }
+
+ /**
+ * Encode all the headers from the given HttpHeadersImpl into the given List.
+ */
+ private void encodeHeadersImpl(HttpHeaders hdrs, LinkedList<ByteBuffer> buffers) {
+ ByteBuffer buffer;
+ if (!(buffer = buffers.getLast()).hasRemaining()) {
+ buffer = getBuffer();
+ buffers.add(buffer);
+ }
+ for (Map.Entry<String, List<String>> e : hdrs.map().entrySet()) {
+ String key = e.getKey();
+ String lkey = key.toLowerCase();
+ List<String> values = e.getValue();
+ for (String value : values) {
+ hpackOut.header(lkey, value);
+ boolean encoded = false;
+ do {
+ encoded = hpackOut.encode(buffer);
+ if (!encoded) {
+ buffer = getBuffer();
+ buffers.add(buffer);
+ }
+ } while (!encoded);
+ }
+ }
+ }
+
+ public void sendFrames(List<Http2Frame> frames) throws IOException, InterruptedException {
+ for (Http2Frame frame : frames) {
+ sendFrame(frame);
+ }
}
- Stream getStream(int i) {return null;}
- Stream createStream(Exchange ex) {return null;}
- void putConnection() {}
+ static Throwable getExceptionFrom(CompletableFuture<?> cf) {
+ try {
+ cf.get();
+ return null;
+ } catch (Throwable e) {
+ if (e.getCause() != null)
+ return e.getCause();
+ else
+ return e;
+ }
+ }
+
+
+ void execute(Runnable r) {
+ executor.execute(r, null);
+ }
+
+ private final Object sendlock = new Object();
+
+ /**
+ *
+ */
+ void sendFrame(Http2Frame frame) {
+ synchronized (sendlock) {
+ try {
+ if (frame instanceof OutgoingHeaders) {
+ OutgoingHeaders oh = (OutgoingHeaders) frame;
+ Stream stream = oh.getStream();
+ stream.registerStream(nextstreamid);
+ oh.streamid(nextstreamid);
+ nextstreamid += 2;
+ // set outgoing window here. This allows thread sending
+ // body to proceed.
+ stream.updateOutgoingWindow(getInitialSendWindowSize());
+ LinkedList<Http2Frame> frames = encodeHeaders(oh);
+ for (Http2Frame f : frames) {
+ sendOneFrame(f);
+ }
+ } else {
+ sendOneFrame(frame);
+ }
+
+ } catch (IOException e) {
+ if (!closed) {
+ Log.logError(e);
+ shutdown(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Send a frame.
+ *
+ * @param frame
+ * @throws IOException
+ */
+ private void sendOneFrame(Http2Frame frame) throws IOException {
+ ByteBufferGenerator bbg = new ByteBufferGenerator(this);
+ frame.computeLength();
+ Log.logFrames(frame, "OUT");
+ frame.writeOutgoing(bbg);
+ ByteBuffer[] currentBufs = bbg.getBufferArray();
+ connection.write(currentBufs, 0, currentBufs.length);
+ }
+
+
+ private SettingsFrame getAckFrame(int streamid) {
+ SettingsFrame frame = new SettingsFrame();
+ frame.setFlag(SettingsFrame.ACK);
+ frame.streamid(streamid);
+ return frame;
+ }
+
+ static class HeaderDecoder implements DecodingCallback {
+ HttpHeadersImpl headers;
+
+ HeaderDecoder() {
+ this.headers = new HttpHeadersImpl();
+ }
+
+ @Override
+ public void onDecoded(CharSequence name, CharSequence value) {
+ headers.addHeader(name.toString(), value.toString());
+ }
+
+ HttpHeadersImpl headers() {
+ return headers;
+ }
+ }
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/Http2Frame.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,211 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+package java.net.http;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * When sending a frame, the length field must be set in sub-class
+ * by calling computeLength()
+ */
+abstract class Http2Frame {
+
+ int length = -1;
+ int type;
+ int streamid;
+ int flags;
+
+ // called when reading in only
+ void initCommon(int length, int type, int streamid, int flags) {
+ this.length = length;
+ this.type = type;
+ this.streamid = streamid;
+ this.flags = flags;
+ }
+
+ public int length() {
+ return length;
+ }
+
+ public int type() {
+ return type;
+ }
+
+ public int streamid() {
+ return streamid;
+ }
+
+ public void setFlag(int flag) {
+ flags |= flag;
+ }
+
+ public void setFlags(int flags) {
+ this.flags = flags;
+ }
+
+ public int getFlags() {
+ return flags;
+ }
+
+ public boolean getFlag(int flag) {
+ return (flags & flag) != 0;
+ }
+
+ public void clearFlag(int flag) {
+ flags &= 0xffffffff ^ flag;
+ }
+
+ public void streamid(int streamid) {
+ this.streamid = streamid;
+ }
+
+ abstract void readIncomingImpl(ByteBufferConsumer bc) throws IOException;
+
+ /**
+ * assume given array contains at least one complete frame.
+ */
+ static Http2Frame readIncoming(ByteBufferConsumer bc) throws IOException {
+ int x = bc.getInt();
+ int length = x >> 8;
+ int type = x & 0xff;
+ int flags = bc.getByte();
+ int streamid = bc.getInt();
+ Http2Frame f = null;
+ switch (type) {
+ case DataFrame.TYPE:
+ f = new DataFrame();
+ break;
+ case HeadersFrame.TYPE:
+ f = new HeadersFrame();
+ break;
+ case ContinuationFrame.TYPE:
+ f = new ContinuationFrame();
+ break;
+ case ResetFrame.TYPE:
+ f = new ResetFrame();
+ break;
+ case PriorityFrame.TYPE:
+ f = new PriorityFrame();
+ break;
+ case SettingsFrame.TYPE:
+ f = new SettingsFrame();
+ break;
+ case GoAwayFrame.TYPE:
+ f = new GoAwayFrame();
+ break;
+ case PingFrame.TYPE:
+ f = new PingFrame();
+ break;
+ case PushPromiseFrame.TYPE:
+ f = new PushPromiseFrame();
+ break;
+ case WindowUpdateFrame.TYPE:
+ f = new WindowUpdateFrame();
+ break;
+ default:
+ String msg = Integer.toString(type);
+ throw new IOException("unknown frame type " + msg);
+ }
+ f.initCommon(length, type, streamid, flags);
+ f.readIncomingImpl(bc);
+ return f;
+ }
+
+ public String typeAsString() {
+ return asString(this.type);
+ }
+
+ public static String asString(int type) {
+ switch (type) {
+ case DataFrame.TYPE:
+ return "DATA";
+ case HeadersFrame.TYPE:
+ return "HEADERS";
+ case ContinuationFrame.TYPE:
+ return "CONTINUATION";
+ case ResetFrame.TYPE:
+ return "RESET";
+ case PriorityFrame.TYPE:
+ return "PRIORITY";
+ case SettingsFrame.TYPE:
+ return "SETTINGS";
+ case GoAwayFrame.TYPE:
+ return "GOAWAY";
+ case PingFrame.TYPE:
+ return "PING";
+ case PushPromiseFrame.TYPE:
+ return "PUSH_PROMISE";
+ case WindowUpdateFrame.TYPE:
+ return "WINDOW_UPDATE";
+ default:
+ return "UNKNOWN";
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(typeAsString())
+ .append(": length=")
+ .append(Integer.toString(length))
+ .append(", streamid=")
+ .append(streamid)
+ .append(", flags=");
+
+ int f = flags;
+ int i = 0;
+ if (f == 0) {
+ sb.append("0 ");
+ } else {
+ while (f != 0) {
+ if ((f & 1) == 1) {
+ sb.append(flagAsString(1 << i))
+ .append(' ');
+ }
+ f = f >> 1;
+ i++;
+ }
+ }
+ return sb.toString();
+ }
+
+ // Override
+ String flagAsString(int f) {
+ return "unknown";
+ }
+
+ abstract void computeLength();
+
+ void writeOutgoing(ByteBufferGenerator bg) {
+ if (length == -1) {
+ throw new InternalError("Length not set on outgoing frame");
+ }
+ ByteBuffer buf = bg.getBuffer(9);
+ int x = (length << 8) + type;
+ buf.putInt(x);
+ buf.put((byte)flags);
+ buf.putInt(streamid);
+ }
+}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java Sat Apr 30 00:30:31 2016 +0100
@@ -23,28 +23,32 @@
*/
package java.net.http;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLParameters;
import java.io.IOException;
import java.net.Authenticator;
import java.net.CookieManager;
import java.net.ProxySelector;
import java.net.URI;
-import static java.net.http.Utils.BUFSIZE;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
-import static java.nio.channels.SelectionKey.OP_CONNECT;
-import static java.nio.channels.SelectionKey.OP_READ;
-import static java.nio.channels.SelectionKey.OP_WRITE;
import java.nio.channels.Selector;
-import java.util.*;
-import java.util.stream.Stream;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.security.NoSuchAlgorithmException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLParameters;
+import java.util.stream.Stream;
+
+import static java.net.http.Utils.BUFSIZE;
/**
* Client implementation. Contains all configuration information and also
@@ -53,6 +57,9 @@
*/
class HttpClientImpl extends HttpClient implements BufferHandler {
+ private static final ThreadFactory defaultFactory =
+ (r -> new Thread(null, r, "HttpClient_worker", 0, true));
+
private final CookieManager cookieManager;
private final Redirect followRedirects;
private final ProxySelector proxySelector;
@@ -67,7 +74,6 @@
private final SelectorManager selmgr;
private final FilterFactory filters;
private final Http2ClientImpl client2;
- private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
private final LinkedList<TimeoutEvent> timeouts;
public static HttpClientImpl create(HttpClientBuilderImpl builder) {
@@ -115,7 +121,6 @@
throw new InternalError(e);
}
selmgr.setDaemon(true);
- selmgr.setName("HttpSelector");
filters = new FilterFactory();
initFilters();
}
@@ -135,7 +140,7 @@
* 4) - mark connection as blocking
* 5) - call AsyncEvent.handle()
*
- * If exchange needs to block again, then call registerEvent() again
+ * If exchange needs to block again, then call registerEvent() again
*/
void registerEvent(AsyncEvent exchange) throws IOException {
selmgr.register(exchange);
@@ -145,35 +150,56 @@
return client2;
}
- LinkedList<ByteBuffer> freelist = new LinkedList<>();
+ /**
+ * We keep one size of buffer on free list. That size may increase
+ * depending on demand. If that happens we dispose of free buffers
+ * that are smaller than new size.
+ */
+ private final LinkedList<ByteBuffer> freelist = new LinkedList<>();
+ int currentSize = BUFSIZE;
@Override
- public synchronized ByteBuffer getBuffer() {
- if (freelist.isEmpty()) {
- return ByteBuffer.allocate(BUFSIZE);
+ public synchronized ByteBuffer getBuffer(int size) {
+
+ ByteBuffer buf;
+ if (size == -1)
+ size = currentSize;
+
+ if (size > currentSize)
+ currentSize = size;
+
+ while (!freelist.isEmpty()) {
+ buf = freelist.removeFirst();
+ if (buf.capacity() < currentSize)
+ continue;
+ buf.clear();
+ return buf;
}
- return freelist.removeFirst();
+ return ByteBuffer.allocate(size);
}
@Override
public synchronized void returnBuffer(ByteBuffer buffer) {
- buffer.clear();
freelist.add(buffer);
}
+ @Override
+ public synchronized void setMinBufferSize(int n) {
+ currentSize = Math.max(n, currentSize);
+ }
// Main loop for this client's selector
+ private final class SelectorManager extends Thread {
- class SelectorManager extends Thread {
- final Selector selector;
- boolean closed;
-
- final List<AsyncEvent> readyList;
- final List<AsyncEvent> registrations;
+ private final Selector selector;
+ private volatile boolean closed;
+ private final List<AsyncEvent> readyList;
+ private final List<AsyncEvent> registrations;
SelectorManager() throws IOException {
- readyList = new LinkedList<>();
- registrations = new LinkedList<>();
+ super(null, null, "SelectorManager", 0, false);
+ readyList = new ArrayList<>();
+ registrations = new ArrayList<>();
selector = Selector.open();
}
@@ -193,32 +219,13 @@
closed = true;
try {
selector.close();
- } catch (IOException e) {}
- }
-
- private List<AsyncEvent> copy(List<AsyncEvent> list) {
- LinkedList<AsyncEvent> c = new LinkedList<>();
- for (AsyncEvent e : list) {
- c.add(e);
- }
- return c;
- }
-
- String opvals(int i) {
- StringBuilder sb = new StringBuilder();
- if ((i & OP_READ) != 0)
- sb.append("OP_READ ");
- if ((i & OP_CONNECT) != 0)
- sb.append("OP_CONNECT ");
- if ((i & OP_WRITE) != 0)
- sb.append("OP_WRITE ");
- return sb.toString();
+ } catch (IOException ignored) { }
}
@Override
public void run() {
try {
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
synchronized (this) {
for (AsyncEvent exchange : registrations) {
SelectableChannel c = exchange.channel();
@@ -229,7 +236,7 @@
if (key == null) {
sa = new SelectorAttachment(c, selector);
} else {
- sa = (SelectorAttachment)key.attachment();
+ sa = (SelectorAttachment) key.attachment();
}
sa.register(exchange);
} catch (IOException e) {
@@ -243,6 +250,7 @@
}
long timeval = getTimeoutValue();
long now = System.currentTimeMillis();
+ //debugPrint(selector);
int n = selector.select(timeval);
if (n == 0) {
signalTimeouts(now);
@@ -251,7 +259,7 @@
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) {
- SelectorAttachment sa = (SelectorAttachment)key.attachment();
+ SelectorAttachment sa = (SelectorAttachment) key.attachment();
int eventsOccurred = key.readyOps();
sa.events(eventsOccurred).forEach(readyList::add);
sa.resetInterestOps(eventsOccurred);
@@ -260,10 +268,8 @@
selector.selectedKeys().clear();
for (AsyncEvent exchange : readyList) {
- if (exchange instanceof AsyncEvent.Blocking) {
+ if (exchange.blocking()) {
exchange.channel().configureBlocking(true);
- } else {
- assert exchange instanceof AsyncEvent.NonBlocking;
}
executor.synchronize();
handleEvent(exchange); // will be delegated to executor
@@ -272,14 +278,26 @@
}
} catch (Throwable e) {
if (!closed) {
- System.err.println("HttpClientImpl terminating on error");
// This terminates thread. So, better just print stack trace
String err = Utils.stackTrace(e);
Log.logError("HttpClientImpl: fatal error: " + err);
}
+ } finally {
+ shutdown();
}
}
+ void debugPrint(Selector selector) {
+ System.err.println("Selector: debugprint start");
+ Set<SelectionKey> keys = selector.keys();
+ for (SelectionKey key : keys) {
+ SelectableChannel c = key.channel();
+ int ops = key.interestOps();
+ System.err.printf("selector chan:%s ops:%d\n", c, ops);
+ }
+ System.err.println("Selector: debugprint end");
+ }
+
void handleEvent(AsyncEvent e) {
if (closed) {
e.abort();
@@ -303,7 +321,7 @@
private final SelectableChannel chan;
private final Selector selector;
private final ArrayList<AsyncEvent> pending;
- private int interestops;
+ private int interestOps;
SelectorAttachment(SelectableChannel chan, Selector selector) {
this.pending = new ArrayList<>();
@@ -312,53 +330,53 @@
}
void register(AsyncEvent e) throws ClosedChannelException {
- int newops = e.interestOps();
- boolean reRegister = (interestops & newops) != newops;
- interestops |= newops;
+ int newOps = e.interestOps();
+ boolean reRegister = (interestOps & newOps) != newOps;
+ interestOps |= newOps;
pending.add(e);
if (reRegister) {
// first time registration happens here also
- chan.register(selector, interestops, this);
+ chan.register(selector, interestOps, this);
}
}
- int interestOps() {
- return interestops;
- }
-
/**
* Returns a Stream<AsyncEvents> containing only events that are
- * registered with the given {@code interestop}.
+ * registered with the given {@code interestOps}.
*/
- Stream<AsyncEvent> events(int interestop) {
+ Stream<AsyncEvent> events(int interestOps) {
return pending.stream()
- .filter(ev -> (ev.interestOps() & interestop) != 0);
+ .filter(ev -> (ev.interestOps() & interestOps) != 0);
}
/**
- * Removes any events with the given {@code interestop}, and if no
+ * Removes any events with the given {@code interestOps}, and if no
* events remaining, cancels the associated SelectionKey.
*/
- void resetInterestOps(int interestop) {
- int newops = 0;
+ void resetInterestOps(int interestOps) {
+ int newOps = 0;
Iterator<AsyncEvent> itr = pending.iterator();
while (itr.hasNext()) {
AsyncEvent event = itr.next();
int evops = event.interestOps();
- if ((evops & interestop) != 0) {
+ if (event.repeating()) {
+ newOps |= evops;
+ continue;
+ }
+ if ((evops & interestOps) != 0) {
itr.remove();
} else {
- newops |= evops;
+ newOps |= evops;
}
}
- interestops = newops;
+ this.interestOps = newOps;
SelectionKey key = chan.keyFor(selector);
- if (newops == 0) {
+ if (newOps == 0) {
key.cancel();
} else {
- key.interestOps(newops);
+ key.interestOps(newOps);
}
}
}
@@ -366,7 +384,8 @@
/**
* Creates a HttpRequest associated with this group.
*
- * @throws IllegalStateException if the group has been stopped
+ * @throws IllegalStateException
+ * if the group has been stopped
*/
@Override
public HttpRequestBuilderImpl request() {
@@ -376,7 +395,8 @@
/**
* Creates a HttpRequest associated with this group.
*
- * @throws IllegalStateException if the group has been stopped
+ * @throws IllegalStateException
+ * if the group has been stopped
*/
@Override
public HttpRequestBuilderImpl request(URI uri) {
@@ -444,16 +464,12 @@
return version.equals(Version.HTTP_2);
}
- //void setHttp2NotSupported(String host) {
- //http2NotSupported.put(host, false);
- //}
-
- final void initFilters() {
+ private void initFilters() {
addFilter(AuthenticationFilter.class);
addFilter(RedirectFilter.class);
}
- final void addFilter(Class<? extends HeaderFilter> f) {
+ private void addFilter(Class<? extends HeaderFilter> f) {
filters.addFilter(f);
}
@@ -479,14 +495,14 @@
iter.previous();
break;
} else if (!iter.hasNext()) {
- event.delta = event.timeval - listval ;
+ event.delta = event.timeval - listval;
}
}
iter.add(event);
selmgr.wakeupSelector();
}
- synchronized void signalTimeouts(long then) {
+ private synchronized void signalTimeouts(long then) {
if (timeouts.isEmpty()) {
return;
}
@@ -532,12 +548,12 @@
// used for the connection window
int getReceiveBufferSize() {
return Utils.getIntegerNetProperty(
- "sun.net.httpclient.connectionWindowSize", 256 * 1024
+ "java.net.httpclient.connectionWindowSize", 256 * 1024
);
}
// returns 0 meaning block forever, or a number of millis to block for
- synchronized long getTimeoutValue() {
+ private synchronized long getTimeoutValue() {
if (timeouts.isEmpty()) {
return 0;
} else {
--- a/jdk/src/java.httpclient/share/classes/java/net/http/HttpConnection.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/HttpConnection.java Sat Apr 30 00:30:31 2016 +0100
@@ -23,9 +23,8 @@
*/
package java.net.http;
-import java.io.FileOutputStream;
+import java.io.Closeable;
import java.io.IOException;
-import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
@@ -42,7 +41,17 @@
* SSLConnection: TLS channel direct to server
* SSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
*/
-abstract class HttpConnection implements BufferHandler {
+abstract class HttpConnection implements BufferHandler, Closeable {
+
+ protected final static ByteBuffer emptyBuf = Utils.EMPTY_BYTEBUFFER;
+
+ enum Mode {
+ BLOCKING,
+ NON_BLOCKING,
+ ASYNC
+ }
+
+ protected Mode mode;
// address we are connected to. Could be a server or a proxy
final InetSocketAddress address;
@@ -52,6 +61,7 @@
HttpConnection(InetSocketAddress address, HttpClientImpl client) {
this.address = address;
this.client = client;
+ this.buffer = emptyBuf;
}
/**
@@ -68,7 +78,21 @@
*/
public static HttpConnection getConnection(InetSocketAddress addr,
HttpRequestImpl request) {
- return getConnectionImpl(addr, request);
+ return getConnectionImpl(addr, request, null);
+ }
+
+ /**
+ * Called specifically to get an async connection for HTTP/2 over SSL.
+ *
+ * @param addr
+ * @param request
+ * @param http2
+ * @return
+ */
+ public static HttpConnection getConnection(InetSocketAddress addr,
+ HttpRequestImpl request, Http2Connection http2) {
+
+ return getConnectionImpl(addr, request, http2);
}
public abstract void connect() throws IOException, InterruptedException;
@@ -93,7 +117,7 @@
// at beginning of response.
ByteBuffer getRemaining() {
ByteBuffer b = buffer;
- buffer = null;
+ buffer = emptyBuf;
return b;
}
@@ -123,17 +147,18 @@
}
private static HttpConnection getSSLConnection(InetSocketAddress addr,
- InetSocketAddress proxy,
- HttpRequestImpl request,
- String[] alpn) {
+ InetSocketAddress proxy, HttpRequestImpl request,
+ String[] alpn, Http2Connection http2) {
HttpClientImpl client = request.client();
if (proxy != null) {
return new SSLTunnelConnection(addr,
client,
proxy,
request.getAccessControlContext());
+ } else if (http2 == null) {
+ return new SSLConnection(addr, client, alpn);
} else {
- return new SSLConnection(addr, client, alpn);
+ return new AsyncSSLConnection(addr, client, alpn);
}
}
@@ -142,7 +167,8 @@
* none available.
*/
private static HttpConnection getConnectionImpl(InetSocketAddress addr,
- HttpRequestImpl request) {
+ HttpRequestImpl request, Http2Connection http2) {
+
HttpConnection c;
HttpClientImpl client = request.client();
InetSocketAddress proxy = request.proxy();
@@ -167,7 +193,7 @@
if (c != null) {
return c;
} else {
- return getSSLConnection(addr, proxy, request, alpn);
+ return getSSLConnection(addr, proxy, request, alpn, http2);
}
}
}
@@ -223,64 +249,16 @@
return address;
}
- void configureBlocking(boolean mode) throws IOException {
- channel().configureBlocking(mode);
+ synchronized void configureMode(Mode mode) throws IOException {
+ this.mode = mode;
+ if (mode == Mode.BLOCKING)
+ channel().configureBlocking(true);
+ else
+ channel().configureBlocking(false);
}
abstract ConnectionPool.CacheKey cacheKey();
- /*
- static PrintStream ps;
-
- static {
- try {
- String propval = Utils.getNetProperty("java.net.httpclient.showData");
- if (propval != null && propval.equalsIgnoreCase("true")) {
- ps = new PrintStream(new FileOutputStream("/tmp/httplog.txt"), false);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- synchronized final void debugPrint(String s, ByteBuffer b) {
- ByteBuffer[] bufs = new ByteBuffer[1];
- bufs[0] = b;
- debugPrint(s, bufs, 0, 1);
- }
-
- synchronized final void debugPrint(String s,
- ByteBuffer[] bufs,
- int start,
- int number) {
- if (ps == null) {
- return;
- }
-
- ps.printf("\n%s:\n", s);
-
- for (int i=start; i<start+number; i++) {
- ByteBuffer b = bufs[i].duplicate();
- while (b.hasRemaining()) {
- int c = b.get();
- if (c == 10) {
- ps.printf("LF \n");
- } else if (c == 13) {
- ps.printf(" CR ");
- } else if (c == 0x20) {
- ps.printf("_");
- } else if (c > 0x20 && c <= 0x7F) {
- ps.printf("%c", (char)c);
- } else {
- ps.printf("0x%02x ", c);
- }
- }
- }
- ps.printf("\n---------------------\n");
- }
-
- */
-
// overridden in SSL only
SSLParameters sslParameters() {
return null;
@@ -296,7 +274,8 @@
/**
* Closes this connection, by returning the socket to its connection pool.
*/
- abstract void close();
+ @Override
+ public abstract void close();
/**
* Returns a ByteBuffer with data, or null if EOF.
@@ -356,12 +335,17 @@
}
@Override
- public final ByteBuffer getBuffer() {
- return client.getBuffer();
+ public final ByteBuffer getBuffer(int n) {
+ return client.getBuffer(n);
}
@Override
public final void returnBuffer(ByteBuffer buffer) {
client.returnBuffer(buffer);
}
+
+ @Override
+ public final void setMinBufferSize(int n) {
+ client.setMinBufferSize(n);
+ }
}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/HttpHeadersImpl.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/HttpHeadersImpl.java Sat Apr 30 00:30:31 2016 +0100
@@ -24,44 +24,22 @@
package java.net.http;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.TreeMap;
/**
* Implementation of HttpHeaders.
*/
-class HttpHeadersImpl implements HttpHeaders1 {
+class HttpHeadersImpl implements HttpHeaders {
- private final HashMap<String,List<String>> headers;
- private boolean isUnmodifiable = false;
+ private final TreeMap<String,List<String>> headers;
public HttpHeadersImpl() {
- headers = new HashMap<>();
- }
-
- /**
- * Replace all List<String> in headers with unmodifiable Lists. Call
- * this only after all headers are added. The headers HashMap
- * is wrapped with an unmodifiable HashMap in map()
- */
- @Override
- public void makeUnmodifiable() {
- if (isUnmodifiable)
- return;
-
- Set<String> keys = new HashSet<>(headers.keySet());
- for (String key : keys) {
- List<String> values = headers.remove(key);
- if (values != null) {
- headers.put(key, Collections.unmodifiableList(values));
- }
- }
- isUnmodifiable = true;
+ headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
}
@Override
@@ -88,7 +66,7 @@
public HttpHeadersImpl deepCopy() {
HttpHeadersImpl h1 = new HttpHeadersImpl();
- HashMap<String,List<String>> headers1 = h1.headers;
+ TreeMap<String,List<String>> headers1 = h1.headers;
Set<String> keys = headers.keySet();
for (String key : keys) {
List<String> vals = headers.get(key);
@@ -98,22 +76,13 @@
return h1;
}
- private List<String> getOrCreate(String name) {
- List<String> l = headers.get(name);
- if (l == null) {
- l = new LinkedList<>();
- headers.put(name, l);
- }
- return l;
- }
-
void addHeader(String name, String value) {
- List<String> l = getOrCreate(name);
- l.add(value);
+ headers.computeIfAbsent(name, k -> new LinkedList<>())
+ .add(value);
}
void setHeader(String name, String value) {
- List<String> l = getOrCreate(name);
+ List<String> l = headers.computeIfAbsent(name, k -> new LinkedList<>());
l.clear();
l.add(value);
}
@@ -122,7 +91,7 @@
public Optional<Long> firstValueAsLong(String name) {
List<String> l = headers.get(name);
if (l == null) {
- return Optional.ofNullable(null);
+ return Optional.empty();
} else {
String v = l.get(0);
Long lv = Long.parseLong(v);
@@ -133,4 +102,4 @@
void clear() {
headers.clear();
}
-}
+}
\ No newline at end of file
--- a/jdk/src/java.httpclient/share/classes/java/net/http/HttpRequestBuilderImpl.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/HttpRequestBuilderImpl.java Sat Apr 30 00:30:31 2016 +0100
@@ -39,10 +39,11 @@
private HttpClient.Version version;
private final HttpClientImpl client;
private ProxySelector proxy;
- private long timeval = 0;
+ private long timeval;
public HttpRequestBuilderImpl(HttpClientImpl client, URI uri) {
this.client = client;
+ checkURI(uri);
this.uri = uri;
this.version = client.version();
this.userHeaders = new HttpHeadersImpl();
@@ -58,10 +59,17 @@
@Override
public HttpRequestBuilderImpl uri(URI uri) {
Objects.requireNonNull(uri);
+ checkURI(uri);
this.uri = uri;
return this;
}
+ private static void checkURI(URI uri) {
+ String scheme = uri.getScheme().toLowerCase();
+ if (!scheme.equals("https") && !scheme.equals("http"))
+ throw new IllegalArgumentException("invalid URI scheme");
+ }
+
@Override
public HttpRequestBuilderImpl followRedirects(HttpClient.Redirect follow) {
Objects.requireNonNull(follow);
--- a/jdk/src/java.httpclient/share/classes/java/net/http/HttpRequestImpl.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/HttpRequestImpl.java Sat Apr 30 00:30:31 2016 +0100
@@ -30,16 +30,14 @@
import java.net.http.HttpClient.Version;
import java.net.http.HttpResponse.MultiProcessor;
import java.util.concurrent.CompletableFuture;
-import java.net.SocketPermission;
import java.security.AccessControlContext;
import java.security.AccessController;
-import java.util.Set;
import static java.net.http.HttpRedirectImpl.getRedirects;
import java.util.Locale;
class HttpRequestImpl extends HttpRequest {
- private final HttpHeadersImpl userHeaders;
+ private final ImmutableHeaders userHeaders;
private final HttpHeadersImpl systemHeaders;
private final URI uri;
private InetSocketAddress authority; // only used when URI not specified
@@ -56,6 +54,7 @@
private boolean receiving;
private AccessControlContext acc;
private final long timeval;
+ private Stream.PushGroup<?> pushGroup;
public HttpRequestImpl(HttpClientImpl client,
String method,
@@ -63,8 +62,8 @@
this.client = client;
this.method = method == null? "GET" : method;
this.userHeaders = builder.headers() == null ?
- new HttpHeadersImpl() : builder.headers();
- dropDisallowedHeaders();
+ new ImmutableHeaders() :
+ new ImmutableHeaders(builder.headers(), Utils.ALLOWED_HEADERS);
this.followRedirects = getRedirects(builder.followRedirects() == null ?
client.followRedirects() : builder.followRedirects());
this.systemHeaders = new HttpHeadersImpl();
@@ -90,15 +89,13 @@
HttpRequestImpl other) {
this.client = client;
this.method = method == null? "GET" : method;
- this.userHeaders = other.userHeaders == null ?
- new HttpHeadersImpl() : other.userHeaders;
- dropDisallowedHeaders();
+ this.userHeaders = other.userHeaders;
this.followRedirects = getRedirects(other.followRedirects() == null ?
client.followRedirects() : other.followRedirects());
this.systemHeaders = other.systemHeaders;
this.uri = uri;
this.expectContinue = other.expectContinue;
- this.secure = other.secure;
+ this.secure = uri.getScheme().toLowerCase(Locale.US).equals("https");
this.requestProcessor = other.requestProcessor;
this.proxy = other.proxy;
this.version = other.version;
@@ -115,7 +112,7 @@
this.method = method;
this.followRedirects = getRedirects(client.followRedirects());
this.systemHeaders = new HttpHeadersImpl();
- this.userHeaders = new HttpHeadersImpl();
+ this.userHeaders = new ImmutableHeaders();
this.uri = null;
this.proxy = null;
this.requestProcessor = HttpRequest.noBody();
@@ -132,16 +129,52 @@
return client;
}
+ /**
+ * Creates a HttpRequestImpl from the given set of Headers and the associated
+ * "parent" request. Fields not taken from the headers are taken from the
+ * parent.
+ */
+ static HttpRequestImpl createPushRequest(HttpRequestImpl parent,
+ HttpHeadersImpl headers) throws IOException {
+
+ return new HttpRequestImpl(parent, headers);
+ }
+
+ // only used for push requests
+ private HttpRequestImpl(HttpRequestImpl parent, HttpHeadersImpl headers) throws IOException {
+ this.method = headers.firstValue(":method")
+ .orElseThrow(() -> new IOException("No method in Push Promise"));
+ String path = headers.firstValue(":path")
+ .orElseThrow(() -> new IOException("No path in Push Promise"));
+ String scheme = headers.firstValue(":scheme")
+ .orElseThrow(() -> new IOException("No scheme in Push Promise"));
+ String authority = headers.firstValue(":authority")
+ .orElseThrow(() -> new IOException("No authority in Push Promise"));
+ StringBuilder sb = new StringBuilder();
+ sb.append(scheme).append("://").append(authority).append(path);
+ this.uri = URI.create(sb.toString());
+
+ this.client = parent.client;
+ this.userHeaders = new ImmutableHeaders(headers, Utils.ALLOWED_HEADERS);
+ this.followRedirects = parent.followRedirects;
+ this.systemHeaders = parent.systemHeaders;
+ this.expectContinue = parent.expectContinue;
+ this.secure = parent.secure;
+ this.requestProcessor = parent.requestProcessor;
+ this.proxy = parent.proxy;
+ this.version = parent.version;
+ this.acc = parent.acc;
+ this.exchange = parent.exchange;
+ this.timeval = parent.timeval;
+ }
@Override
public String toString() {
- return (uri == null ? "" : uri.toString()) + "/" + method + "("
- + hashCode() + ")";
+ return (uri == null ? "" : uri.toString()) + " " + method;
}
@Override
public HttpHeaders headers() {
- userHeaders.makeUnmodifiable();
return userHeaders;
}
@@ -154,21 +187,6 @@
systemHeaders.setHeader("HTTP2-Settings", h2client.getSettingsString());
}
- private static final Set<String> DISALLOWED_HEADERS_SET = Set.of(
- "authorization", "connection", "cookie", "content-length",
- "date", "expect", "from", "host", "origin", "proxy-authorization",
- "referer", "user-agent", "upgrade", "via", "warning");
-
-
- // we silently drop headers that are disallowed
- private void dropDisallowedHeaders() {
- Set<String> hdrnames = userHeaders.directMap().keySet();
-
- hdrnames.removeIf((s) ->
- DISALLOWED_HEADERS_SET.contains(s.toLowerCase())
- );
- }
-
private synchronized void receiving() {
if (receiving) {
throw new IllegalStateException("already receiving response");
@@ -176,6 +194,10 @@
receiving = true;
}
+ synchronized Stream.PushGroup<?> pushGroup() {
+ return pushGroup;
+ }
+
/*
* Response filters may result in a new HttpRequestImpl being created
* (but still associated with the same API HttpRequest) and the process
@@ -200,10 +222,25 @@
.thenApply((r) -> (HttpResponse)r);
}
- public <U> CompletableFuture<U>
- sendAsyncMulti(HttpResponse.MultiProcessor<U> rspproc) {
- // To change body of generated methods, choose Tools | Templates.
- throw new UnsupportedOperationException("Not supported yet.");
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public synchronized <U> CompletableFuture<U>
+ multiResponseAsync(MultiProcessor<U> rspproc) {
+ if (System.getSecurityManager() != null) {
+ acc = AccessController.getContext();
+ }
+ this.pushGroup = new Stream.PushGroup<>(rspproc, this);
+ CompletableFuture<HttpResponse> cf = pushGroup.mainResponse();
+ responseAsync()
+ .whenComplete((HttpResponse r, Throwable t) -> {
+ if (r != null)
+ cf.complete(r);
+ else
+ cf.completeExceptionally(t);
+ pushGroup.pushError(t);
+ });
+ return (CompletableFuture<U>)pushGroup.groupResult();
}
@Override
@@ -255,7 +292,7 @@
@Override
public URI uri() { return uri; }
- HttpHeadersImpl getUserHeaders() { return userHeaders; }
+ HttpHeaders getUserHeaders() { return userHeaders; }
HttpHeadersImpl getSystemHeaders() { return systemHeaders; }
@@ -275,11 +312,4 @@
}
long timeval() { return timeval; }
-
- @Override
- public <U> CompletableFuture<U>
- multiResponseAsync(MultiProcessor<U> rspproc) {
- //To change body of generated methods, choose Tools | Templates.
- throw new UnsupportedOperationException("Not supported yet.");
- }
}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/HttpResponse.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/HttpResponse.java Sat Apr 30 00:30:31 2016 +0100
@@ -200,8 +200,12 @@
if (n == -1) {
throw new IOException("Bad Content-Disposition type");
}
- String disposition = dispoHeader.substring(n + 9,
- dispoHeader.lastIndexOf(';'));
+ int lastsemi = dispoHeader.lastIndexOf(';');
+ String disposition;
+ if (lastsemi < n)
+ disposition = dispoHeader.substring(n + 9);
+ else
+ disposition = dispoHeader.substring(n + 9, lastsemi);
file = Paths.get(directory.toString(), disposition);
fc = FileChannel.open(file, openOptions);
return null;
@@ -727,11 +731,14 @@
}
private CompletableFuture<Path> getBody(HttpRequest req,
- CompletableFuture<HttpResponse> cf) {
+ CompletableFuture<? extends HttpResponse> cf) {
URI u = req.uri();
String path = u.getPath();
+ if (path.startsWith("/"))
+ path = path.substring(1);
+ final String fpath = path;
return cf.thenCompose((HttpResponse resp) -> {
- return resp.bodyAsync(HttpResponse.asFile(destination.resolve(path)));
+ return resp.bodyAsync(HttpResponse.asFile(destination.resolve(fpath)));
});
}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/HttpResponseImpl.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/HttpResponseImpl.java Sat Apr 30 00:30:31 2016 +0100
@@ -26,6 +26,7 @@
package java.net.http;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.AccessControlContext;
@@ -42,17 +43,18 @@
int responseCode;
Exchange exchange;
HttpRequestImpl request;
- HttpHeaders1 headers;
- HttpHeaders1 trailers;
+ HttpHeaders headers;
+ HttpHeaders trailers;
SSLParameters sslParameters;
URI uri;
HttpClient.Version version;
AccessControlContext acc;
RawChannel rawchan;
HttpConnection connection;
+ final Stream stream;
- public HttpResponseImpl(int responseCode, Exchange exch, HttpHeaders1 headers,
- HttpHeaders1 trailers, SSLParameters sslParameters,
+ public HttpResponseImpl(int responseCode, Exchange exch, HttpHeaders headers,
+ HttpHeaders trailers, SSLParameters sslParameters,
HttpClient.Version version, HttpConnection connection) {
this.responseCode = responseCode;
this.exchange = exch;
@@ -63,6 +65,23 @@
this.uri = request.uri();
this.version = version;
this.connection = connection;
+ this.stream = null;
+ }
+
+ // A response to a PUSH_PROMISE
+ public HttpResponseImpl(int responseCode, HttpRequestImpl pushRequest,
+ ImmutableHeaders headers,
+ Stream stream, SSLParameters sslParameters) {
+ this.responseCode = responseCode;
+ this.exchange = null;
+ this.request = pushRequest;
+ this.headers = headers;
+ this.trailers = null;
+ this.sslParameters = sslParameters;
+ this.uri = request.uri(); // TODO: take from headers
+ this.version = HttpClient.Version.HTTP_2;
+ this.connection = null;
+ this.stream = stream;
}
@Override
@@ -77,26 +96,35 @@
@Override
public HttpHeaders headers() {
- headers.makeUnmodifiable();
return headers;
}
@Override
public HttpHeaders trailers() {
- trailers.makeUnmodifiable();
return trailers;
}
@Override
public <T> T body(java.net.http.HttpResponse.BodyProcessor<T> processor) {
- return exchange.responseBody(processor);
+ try {
+ if (exchange != null) {
+ return exchange.responseBody(processor);
+ } else {
+ return stream.responseBody(processor);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
@Override
public <T> CompletableFuture<T> bodyAsync(java.net.http.HttpResponse.BodyProcessor<T> processor) {
acc = AccessController.getContext();
- return exchange.responseBodyAsync(processor);
+ if (exchange != null)
+ return exchange.responseBodyAsync(processor);
+ else
+ return stream.responseBodyAsync(processor);
}
@Override
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/ImmutableHeaders.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+
+/**
+ * Immutable HttpHeaders constructed from mutable HttpHeadersImpl.
+ */
+
+class ImmutableHeaders implements HttpHeaders {
+
+ private final Map<String,List<String>> map;
+
+ @SuppressWarnings("unchecked")
+ ImmutableHeaders() {
+ map = (Map<String,List<String>>)Collections.EMPTY_MAP;
+ }
+ // TODO: fix lower case issue. Must be lc for http/2 compares ignoreCase for http/1
+ ImmutableHeaders(HttpHeadersImpl h, Predicate<String> keyAllowed) {
+ Map<String,List<String>> src = h.directMap();
+ Map<String,List<String>> m = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+
+ src.forEach((key, value) -> {
+ if (keyAllowed.test(key))
+ m.put(key, Collections.unmodifiableList(value));
+ });
+ map = Collections.unmodifiableMap(m);
+ }
+
+ @Override
+ public Optional<String> firstValue(String name) {
+ List<String> l = map.get(name);
+ String v = l == null ? null : l.get(0);
+ return Optional.ofNullable(v);
+ }
+
+ @Override
+ public Optional<Long> firstValueAsLong(String name) {
+ return firstValue(name).map((v -> Long.parseLong(v)));
+ }
+
+ @Override
+ public List<String> allValues(String name) {
+ return map.get(name);
+ }
+
+ @Override
+ public Map<String, List<String>> map() {
+ return map;
+ }
+}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/Log.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/Log.java Sat Apr 30 00:30:31 2016 +0100
@@ -26,7 +26,9 @@
import java.util.Locale;
/**
- * -Djava.net.HttpClient.log=errors,requests,headers,frames[:type:type2:..],content
+ * -Djava.net.HttpClient.log=
+ * errors,requests,headers,
+ * frames[:type:type2:..],content,ssl,trace
*
* Any of errors, requests, headers or content are optional.
*
@@ -47,6 +49,7 @@
public static final int CONTENT = 0x8;
public static final int FRAMES = 0x10;
public static final int SSL = 0x20;
+ public static final int TRACE = 0x40;
static int logging;
// Frame types: "control", "data", "window", "all"
@@ -81,8 +84,11 @@
case "ssl":
logging |= SSL;
break;
+ case "trace":
+ logging |= TRACE;
+ break;
case "all":
- logging |= CONTENT|HEADERS|REQUESTS|FRAMES|ERRORS;
+ logging |= CONTENT|HEADERS|REQUESTS|FRAMES|ERRORS|TRACE;
break;
}
if (val.startsWith("frames")) {
@@ -130,6 +136,10 @@
return (logging & HEADERS) != 0;
}
+ static boolean trace() {
+ return (logging & TRACE) != 0;
+ }
+
static boolean ssl() {
return (logging & SSL) != 0;
}
@@ -138,9 +148,9 @@
return (logging & FRAMES) != 0;
}
- static void logError(String s) {
+ static void logError(String s, Object... s1) {
if (errors())
- logger.log(Level.INFO, "ERROR: " + s);
+ logger.log(Level.INFO, "ERROR: " + s, s1);
}
static void logError(Throwable t) {
@@ -150,24 +160,50 @@
}
}
- static void logSSL(String s) {
+ static void logSSL(String s, Object... s1) {
if (ssl())
- logger.log(Level.INFO, "SSL: " + s);
+ logger.log(Level.INFO, "SSL: " + s, s1);
+ }
+
+ static void logTrace(String s, Object... s1) {
+ if (trace()) {
+ String format = "TRACE: " + s;
+ logger.log(Level.INFO, format, s1);
+ }
+ }
+
+ static void logRequest(String s, Object... s1) {
+ if (requests())
+ logger.log(Level.INFO, "REQUEST: " + s, s1);
+ }
+
+ static void logResponse(String s, Object... s1) {
+ if (requests())
+ logger.log(Level.INFO, "RESPONSE: " + s, s1);
}
- static void logRequest(String s) {
- if (requests())
- logger.log(Level.INFO, "REQUEST: " + s);
+ static void logHeaders(String s, Object... s1) {
+ if (headers())
+ logger.log(Level.INFO, "HEADERS: " + s, s1);
+ }
+// START HTTP2
+ static boolean loggingFrame(Class<? extends Http2Frame> clazz) {
+ if (frametypes == ALL) {
+ return true;
+ }
+ if (clazz == DataFrame.class) {
+ return (frametypes & DATA) != 0;
+ } else if (clazz == WindowUpdateFrame.class) {
+ return (frametypes & WINDOW_UPDATES) != 0;
+ } else {
+ return (frametypes & CONTROL) != 0;
+ }
}
- static void logResponse(String s) {
- if (requests())
- logger.log(Level.INFO, "RESPONSE: " + s);
- }
-
- static void logHeaders(String s) {
- if (headers())
- logger.log(Level.INFO, "HEADERS: " + s);
+ static void logFrames(Http2Frame f, String direction) {
+ if (frames() && loggingFrame(f.getClass())) {
+ logger.log(Level.INFO, "FRAME: " + direction + ": " + f.toString());
+ }
}
// not instantiable
--- a/jdk/src/java.httpclient/share/classes/java/net/http/MultiExchange.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/MultiExchange.java Sat Apr 30 00:30:31 2016 +0100
@@ -54,7 +54,7 @@
final static int DEFAULT_MAX_ATTEMPTS = 5;
final static int max_attempts = Utils.getIntegerNetProperty(
- "sun.net.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
+ "java.net.httpclient.redirects.retrylimit", DEFAULT_MAX_ATTEMPTS
);
private final List<HeaderFilter> filters;
@@ -187,8 +187,7 @@
public CompletableFuture<HttpResponseImpl> responseAsync(Void v) {
CompletableFuture<HttpResponseImpl> cf;
if (++attempts > max_attempts) {
- cf = new CompletableFuture<>();
- cf.completeExceptionally(new IOException("Too many retries"));
+ cf = CompletableFuture.failedFuture(new IOException("Too many retries"));
} else {
if (currentreq.timeval() != 0) {
// set timer
@@ -241,7 +240,6 @@
* completed exceptionally.
*/
private CompletableFuture<HttpResponseImpl> getExceptionalCF(Throwable t) {
- CompletableFuture<HttpResponseImpl> error = new CompletableFuture<>();
if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
if (t.getCause() != null) {
t = t.getCause();
@@ -250,8 +248,7 @@
if (cancelled && t instanceof IOException) {
t = new HttpTimeoutException("request timed out");
}
- error.completeExceptionally(t);
- return error;
+ return CompletableFuture.failedFuture(t);
}
<T> T responseBody(HttpResponse.BodyProcessor<T> processor) {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/OutgoingHeaders.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.io.IOException;
+
+/**
+ * Contains all parameters for outgoing headers. Is converted to
+ * HeadersFrame and ContinuationFrames by Http2Connection.
+ */
+class OutgoingHeaders extends Http2Frame {
+
+ int streamDependency;
+ int weight;
+ boolean exclusive;
+ Stream stream;
+
+ public static final int PRIORITY = 0x20;
+
+ HttpHeaders user, system;
+
+ OutgoingHeaders(HttpHeaders hdrs1, HttpHeaders hdrs2, Stream stream) {
+ this.user = hdrs2;
+ this.system = hdrs1;
+ this.stream = stream;
+ }
+
+ public void setPriority(int streamDependency, boolean exclusive, int weight) {
+ this.streamDependency = streamDependency;
+ this.exclusive = exclusive;
+ this.weight = weight;
+ this.flags |= PRIORITY;
+ }
+
+ public int getStreamDependency() {
+ return streamDependency;
+ }
+
+ public int getWeight() {
+ return weight;
+ }
+
+ public boolean getExclusive() {
+ return exclusive;
+ }
+
+ public Stream getStream() {
+ return stream;
+ }
+
+ public HttpHeaders getUserHeaders() {
+ return user;
+ }
+
+ public HttpHeaders getSystemHeaders() {
+ return system;
+ }
+
+ @Override
+ void readIncomingImpl(ByteBufferConsumer bc) throws IOException {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ void computeLength() {
+ //To change body of generated methods, choose Tools | Templates.
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/Pair.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/Pair.java Sat Apr 30 00:30:31 2016 +0100
@@ -43,4 +43,9 @@
static <T, U> Pair<T, U> pair(T first, U second) {
return new Pair<>(first, second);
}
+
+ @Override
+ public String toString() {
+ return "(" + first + ", " + second + ")";
+ }
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/PingFrame.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+class PingFrame extends Http2Frame {
+
+ PingFrame() {
+ type = TYPE;
+ }
+
+ byte[] data;
+
+ public final static int TYPE = 0x6;
+
+ // Flags
+ public static final int ACK = 0x1;
+
+ @Override
+ String flagAsString(int flag) {
+ switch (flag) {
+ case ACK:
+ return "ACK";
+ }
+ return super.flagAsString(flag);
+ }
+
+ public void setData(byte[] data) {
+ if (data.length != 8) {
+ throw new IllegalArgumentException("Ping data not 8 bytes");
+ }
+ this.data = data;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ @Override
+ void readIncomingImpl(ByteBufferConsumer bc) throws IOException {
+ if (length != 8) {
+ throw new IOException("Invalid Ping frame");
+ }
+ data = bc.getBytes(8);
+ }
+
+ @Override
+ void writeOutgoing(ByteBufferGenerator bg) {
+ if (data == null) {
+ data = new byte[] {0, 0, 0, 0, 0 ,0, 0, 0};
+ }
+ super.writeOutgoing(bg);
+ ByteBuffer buf = bg.getBuffer(8);
+ buf.put(data);
+ }
+
+ @Override
+ void computeLength() {
+ length = 8;
+ }
+}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/PlainHttpConnection.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/PlainHttpConnection.java Sat Apr 30 00:30:31 2016 +0100
@@ -31,20 +31,43 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
/**
- * Plain raw TCP connection direct to destination
+ * Plain raw TCP connection direct to destination. 2 modes
+ * 1) Blocking used by http/1. In this case the connect is actually non
+ * blocking but the request is sent blocking. The first byte of a response
+ * is received non-blocking and the remainder of the response is received
+ * blocking
+ * 2) Non-blocking. In this case (for http/2) the connection is actually opened
+ * blocking but all reads and writes are done non-blocking under the
+ * control of a Http2Connection object.
*/
-class PlainHttpConnection extends HttpConnection {
+class PlainHttpConnection extends HttpConnection implements AsyncConnection {
protected SocketChannel chan;
private volatile boolean connected;
private boolean closed;
+ Consumer<ByteBuffer> asyncReceiver;
+ Consumer<Throwable> errorReceiver;
+ Queue<ByteBuffer> asyncOutputQ;
+ final Object reading = new Object();
+ final Object writing = new Object();
- class ConnectEvent extends AsyncEvent implements AsyncEvent.Blocking {
+ @Override
+ public void startReading() {
+ try {
+ client.registerEvent(new ReadEvent());
+ } catch (IOException e) {
+ shutdown();
+ }
+ }
+
+ class ConnectEvent extends AsyncEvent {
CompletableFuture<Void> cf;
ConnectEvent(CompletableFuture<Void> cf) {
+ super(AsyncEvent.BLOCKING);
this.cf = cf;
}
@@ -112,14 +135,62 @@
@Override
long write(ByteBuffer[] buffers, int start, int number) throws IOException {
- //debugPrint("Send", buffers, start, number);
- return chan.write(buffers, start, number);
+ if (mode != Mode.ASYNC)
+ return chan.write(buffers, start, number);
+ // async
+ synchronized(writing) {
+ int qlen = asyncOutputQ.size();
+ ByteBuffer[] bufs = Utils.reduce(buffers, start, number);
+ long n = Utils.remaining(bufs);
+ asyncOutputQ.putAll(bufs);
+ if (qlen == 0)
+ asyncOutput();
+ return n;
+ }
+ }
+
+ ByteBuffer asyncBuffer = null;
+
+ void asyncOutput() {
+ synchronized (writing) {
+ try {
+ while (true) {
+ if (asyncBuffer == null) {
+ asyncBuffer = asyncOutputQ.poll();
+ if (asyncBuffer == null) {
+ return;
+ }
+ }
+ if (!asyncBuffer.hasRemaining()) {
+ asyncBuffer = null;
+ continue;
+ }
+ int n = chan.write(asyncBuffer);
+ //System.err.printf("Written %d bytes to chan\n", n);
+ if (n == 0) {
+ client.registerEvent(new WriteEvent());
+ return;
+ }
+ }
+ } catch (IOException e) {
+ shutdown();
+ }
+ }
}
@Override
long write(ByteBuffer buffer) throws IOException {
- //debugPrint("Send", buffer);
- return chan.write(buffer);
+ if (mode != Mode.ASYNC)
+ return chan.write(buffer);
+ // async
+ synchronized(writing) {
+ int qlen = asyncOutputQ.size();
+ long n = buffer.remaining();
+ asyncOutputQ.put(buffer);
+ if (qlen == 0)
+ asyncOutput();
+ return n;
+ }
}
@Override
@@ -131,7 +202,7 @@
* Close this connection
*/
@Override
- synchronized void close() {
+ public synchronized void close() {
if (closed)
return;
closed = true;
@@ -155,14 +226,49 @@
return buf;
}
+ void shutdown() {
+ close();
+ errorReceiver.accept(new IOException("Connection aborted"));
+ }
+
+ void asyncRead() {
+ synchronized (reading) {
+ try {
+ while (true) {
+ ByteBuffer buf = getBuffer();
+ int n = chan.read(buf);
+ //System.err.printf("Read %d bytes from chan\n", n);
+ if (n == -1) {
+ throw new IOException();
+ }
+ if (n == 0) {
+ returnBuffer(buf);
+ return;
+ }
+ buf.flip();
+ asyncReceiver.accept(buf);
+ }
+ } catch (IOException e) {
+ shutdown();
+ }
+ }
+ }
+
@Override
protected int readImpl(ByteBuffer buf) throws IOException {
int mark = buf.position();
- int n = chan.read(buf);
+ int n;
+ // FIXME: this hack works in conjunction with the corresponding change
+ // in java.net.http.RawChannel.registerEvent
+ if ((n = buffer.remaining()) != 0) {
+ buf.put(buffer);
+ } else {
+ n = chan.read(buf);
+ }
if (n == -1) {
return -1;
}
- Utils.flipToMark(buffer, mark);
+ Utils.flipToMark(buf, mark);
String s = "Receive (" + n + " bytes) ";
//debugPrint(s, buf);
return n;
@@ -178,10 +284,67 @@
return connected;
}
- class ReceiveResponseEvent extends AsyncEvent implements AsyncEvent.Blocking {
+ // used for all output in HTTP/2
+ class WriteEvent extends AsyncEvent {
+ WriteEvent() {
+ super(0);
+ }
+
+ @Override
+ public SelectableChannel channel() {
+ return chan;
+ }
+
+ @Override
+ public int interestOps() {
+ return SelectionKey.OP_WRITE;
+ }
+
+ @Override
+ public void handle() {
+ asyncOutput();
+ }
+
+ @Override
+ public void abort() {
+ shutdown();
+ }
+ }
+
+ // used for all input in HTTP/2
+ class ReadEvent extends AsyncEvent {
+ ReadEvent() {
+ super(AsyncEvent.REPEATING); // && !BLOCKING
+ }
+
+ @Override
+ public SelectableChannel channel() {
+ return chan;
+ }
+
+ @Override
+ public int interestOps() {
+ return SelectionKey.OP_READ;
+ }
+
+ @Override
+ public void handle() {
+ asyncRead();
+ }
+
+ @Override
+ public void abort() {
+ shutdown();
+ }
+
+ }
+
+ // used in blocking channels only
+ class ReceiveResponseEvent extends AsyncEvent {
CompletableFuture<Void> cf;
ReceiveResponseEvent(CompletableFuture<Void> cf) {
+ super(AsyncEvent.BLOCKING);
this.cf = cf;
}
@Override
@@ -216,6 +379,15 @@
}
@Override
+ public synchronized void setAsyncCallbacks(Consumer<ByteBuffer> asyncReceiver,
+ Consumer<Throwable> errorReceiver) {
+ this.asyncReceiver = asyncReceiver;
+ this.errorReceiver = errorReceiver;
+ asyncOutputQ = new Queue<>();
+ asyncOutputQ.registerPutCallback(this::asyncOutput);
+ }
+
+ @Override
CompletableFuture<Void> whenReceivingResponse() {
CompletableFuture<Void> cf = new CompletableFuture<>();
try {
--- a/jdk/src/java.httpclient/share/classes/java/net/http/PlainTunnelingConnection.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/PlainTunnelingConnection.java Sat Apr 30 00:30:31 2016 +0100
@@ -111,7 +111,7 @@
}
@Override
- void close() {
+ public void close() {
delegate.close();
connected = false;
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/PriorityFrame.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+class PriorityFrame extends Http2Frame {
+
+ int streamDependency;
+ int weight;
+ boolean exclusive;
+
+ public final static int TYPE = 0x2;
+
+ PriorityFrame() {
+ type = TYPE;
+ }
+
+ public PriorityFrame(int streamDependency, boolean exclusive, int weight) {
+ this.streamDependency = streamDependency;
+ this.exclusive = exclusive;
+ this.weight = weight;
+ this.type = TYPE;
+ }
+
+ int streamDependency() {
+ return streamDependency;
+ }
+
+ int weight() {
+ return weight;
+ }
+
+ boolean exclusive() {
+ return exclusive;
+ }
+
+ @Override
+ void readIncomingImpl(ByteBufferConsumer bc) throws IOException {
+ int x = bc.getInt();
+ exclusive = (x & 0x80000000) != 0;
+ streamDependency = x & 0x7fffffff;
+ weight = bc.getByte();
+ }
+
+ @Override
+ void writeOutgoing(ByteBufferGenerator bg) {
+ super.writeOutgoing(bg);
+ ByteBuffer buf = bg.getBuffer(5);
+ int x = exclusive ? (1 << 31) + streamDependency : streamDependency;
+ buf.putInt(x);
+ buf.put((byte)weight);
+ }
+
+ @Override
+ void computeLength() {
+ length = 5;
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/PushPromiseFrame.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+class PushPromiseFrame extends HeaderFrame {
+
+ int padLength;
+ int promisedStream;
+
+ PushPromiseFrame() {
+ type = TYPE;
+ }
+
+ public static final int TYPE = 0x5;
+
+ // Flags
+ public static final int END_HEADERS = 0x4;
+ public static final int PADDED = 0x8;
+
+ @Override
+ public String toString() {
+ return super.toString() + " promisedStreamid: " + promisedStream
+ + " headerLength: " + headerLength;
+ }
+
+ @Override
+ String flagAsString(int flag) {
+ switch (flag) {
+ case PADDED:
+ return "PADDED";
+ case END_HEADERS:
+ return "END_HEADERS";
+ }
+ return super.flagAsString(flag);
+ }
+
+ public void setPadLength(int padLength) {
+ this.padLength = padLength;
+ flags |= PADDED;
+ }
+
+ public void setPromisedStream(int stream) {
+ this.promisedStream = stream;
+ }
+
+ public int getPromisedStream() {
+ return promisedStream;
+ }
+
+ /**
+ */
+ @Override
+ void readIncomingImpl(ByteBufferConsumer bc) throws IOException {
+ if ((flags & PADDED) != 0) {
+ padLength = bc.getByte();
+ headerLength = length - (padLength + 5);
+ } else
+ headerLength = length - 4;
+
+ promisedStream = bc.getInt() & 0x7fffffff;
+ headerBlocks = bc.getBuffers(headerLength);
+ }
+
+ @Override
+ void computeLength() {
+ int len = 0;
+ if ((flags & PADDED) != 0) {
+ len += (1 + padLength);
+ }
+ len += (4 + headerLength);
+ this.length = len;
+ }
+
+ @Override
+ void writeOutgoing(ByteBufferGenerator bg) {
+ super.writeOutgoing(bg);
+ ByteBuffer buf = bg.getBuffer(length);
+ if ((flags & PADDED) != 0) {
+ buf.put((byte)padLength);
+ }
+ buf.putInt(promisedStream);
+ for (int i=0; i<headerBlocks.length; i++) {
+ bg.addByteBuffer(headerBlocks[i]);
+ }
+ if ((flags & PADDED) != 0) {
+ bg.addPadding(padLength);
+ }
+ }
+
+ @Override
+ public boolean endHeaders() {
+ return getFlag(END_HEADERS);
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/Queue.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedList;
+
+// Each stream has one of these for input. Each Http2Connection has one
+// for output. Can be used blocking or asynchronously.
+
+class Queue<T> implements Closeable {
+
+ private final LinkedList<T> q = new LinkedList<>();
+ private volatile boolean closed = false;
+ private Runnable callback;
+ private boolean forceCallback;
+ private int waiters; // true if someone waiting
+
+ synchronized void putAll(T[] objs) throws IOException {
+ if (closed) {
+ throw new IOException("stream closed");
+ }
+ boolean wasEmpty = q.isEmpty();
+
+ for (T obj : objs) {
+ q.add(obj);
+ }
+
+ if (waiters > 0)
+ notifyAll();
+
+ if (wasEmpty || forceCallback) {
+ forceCallback = false;
+ if (callback != null) {
+ callback.run();
+ }
+ }
+ }
+
+ synchronized int size() {
+ return q.size();
+ }
+
+ synchronized void put(T obj) throws IOException {
+ if (closed) {
+ throw new IOException("stream closed");
+ }
+
+ q.add(obj);
+ if (waiters > 0)
+ notifyAll();
+
+ if (q.size() == 1 || forceCallback) {
+ forceCallback = false;
+ if (callback != null) {
+ callback.run();
+ }
+ }
+ }
+
+ /**
+ * callback is invoked any time put is called where
+ * the Queue was empty.
+ */
+ synchronized void registerPutCallback(Runnable callback) {
+ this.callback = callback;
+ if (callback != null && q.size() > 0)
+ callback.run();
+ }
+
+ @Override
+ public synchronized void close() {
+ closed = true;
+ notifyAll();
+ }
+
+ synchronized T take() throws IOException {
+ if (closed) {
+ throw new IOException("stream closed");
+ }
+ try {
+ while (q.size() == 0) {
+ waiters++;
+ wait();
+ waiters--;
+ }
+ return q.removeFirst();
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ public synchronized T poll() throws IOException {
+ if (closed)
+ throw new IOException("stream closed");
+
+ if (q.isEmpty())
+ return null;
+ T res = q.removeFirst();
+ return res;
+ }
+
+ public synchronized T[] pollAll(T[] type) throws IOException {
+ T[] ret = q.toArray(type);
+ q.clear();
+ return ret;
+ }
+
+ public synchronized void pushback(T v) {
+ forceCallback = true;
+ q.addFirst(v);
+ }
+
+ public synchronized void pushbackAll(T[] v) {
+ forceCallback = true;
+ for (int i=v.length-1; i>=0; i--) {
+ q.addFirst(v[i]);
+ }
+ }
+}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/RawChannel.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/RawChannel.java Sat Apr 30 00:30:31 2016 +0100
@@ -28,18 +28,18 @@
import java.nio.channels.ByteChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
-/**
- * Used to implement WebSocket. Each RawChannel corresponds to
- * a TCP connection (SocketChannel) but is connected to a Selector
- * and an ExecutorService for invoking the send and receive callbacks
- * Also includes SSL processing.
- */
-class RawChannel implements ByteChannel, GatheringByteChannel {
+//
+// Used to implement WebSocket. Each RawChannel corresponds to a TCP connection
+// (SocketChannel) but is connected to a Selector and an ExecutorService for
+// invoking the send and receive callbacks. Also includes SSL processing.
+//
+final class RawChannel implements ByteChannel, GatheringByteChannel {
private final HttpClientImpl client;
private final HttpConnection connection;
- private boolean closed;
+ private volatile boolean closed;
private interface RawEvent {
@@ -50,8 +50,6 @@
void handle();
}
- interface BlockingEvent extends RawEvent { }
-
interface NonBlockingEvent extends RawEvent { }
RawChannel(HttpClientImpl client, HttpConnection connection) {
@@ -64,39 +62,40 @@
private final RawEvent re;
RawAsyncEvent(RawEvent re) {
+ super(AsyncEvent.BLOCKING); // BLOCKING & !REPEATING
this.re = re;
}
+ RawAsyncEvent(RawEvent re, int flags) {
+ super(flags);
+ this.re = re;
+ }
+
+ @Override
public SelectableChannel channel() {
return connection.channel();
}
// must return the selector interest op flags OR'd
+ @Override
public int interestOps() {
return re.interestOps();
}
// called when event occurs
+ @Override
public void handle() {
re.handle();
}
- public void abort() {}
+ @Override
+ public void abort() { }
}
- private class BlockingRawAsyncEvent extends RawAsyncEvent
- implements AsyncEvent.Blocking {
-
- BlockingRawAsyncEvent(RawEvent re) {
- super(re);
- }
- }
-
- private class NonBlockingRawAsyncEvent extends RawAsyncEvent
- implements AsyncEvent.NonBlocking {
+ private class NonBlockingRawAsyncEvent extends RawAsyncEvent {
NonBlockingRawAsyncEvent(RawEvent re) {
- super(re);
+ super(re, 0); // !BLOCKING & !REPEATING
}
}
@@ -105,17 +104,24 @@
* (i.e. register new event for each callback)
*/
public void registerEvent(RawEvent event) throws IOException {
- if (event instanceof BlockingEvent) {
- client.registerEvent(new BlockingRawAsyncEvent(event));
- } else if (event instanceof NonBlockingEvent) {
+ if (!(event instanceof NonBlockingEvent)) {
+ throw new InternalError();
+ }
+ if ((event.interestOps() & SelectionKey.OP_READ) != 0
+ && connection.buffer.hasRemaining()) {
+ // FIXME: a hack to deal with leftovers from previous reads into an
+ // internal buffer (works in conjunction with change in
+ // java.net.http.PlainHttpConnection.readImpl(java.nio.ByteBuffer)
+ connection.channel().configureBlocking(false);
+ event.handle();
+ } else {
client.registerEvent(new NonBlockingRawAsyncEvent(event));
- } else {
- throw new InternalError();
}
}
@Override
public int read(ByteBuffer dst) throws IOException {
+ assert !connection.channel().isBlocking();
return connection.read(dst);
}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/RedirectFilter.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/RedirectFilter.java Sat Apr 30 00:30:31 2016 +0100
@@ -33,17 +33,19 @@
HttpRequestImpl requestImpl;
HttpRequest request;
HttpClientImpl client;
+ HttpClient.Redirect policy;
String method;
final static int DEFAULT_MAX_REDIRECTS = 5;
URI uri;
final static int max_redirects = Utils.getIntegerNetProperty(
- "sun.net.httpclient.redirects.retrylimit", DEFAULT_MAX_REDIRECTS
+ "java.net.httpclient.redirects.retrylimit", DEFAULT_MAX_REDIRECTS
);
@Override
public void request(HttpRequestImpl r) throws IOException {
this.request = r;
+ this.policy = request.followRedirects();
this.client = r.getClient();
this.method = r.method();
this.requestImpl = r;
@@ -61,7 +63,7 @@
*/
private HttpRequestImpl handleResponse(HttpResponseImpl r) {
int rcode = r.statusCode();
- if (rcode == 200) {
+ if (rcode == 200 || policy == HttpClient.Redirect.NEVER) {
return null;
}
if (rcode >= 300 && rcode <= 399) {
@@ -79,6 +81,7 @@
private URI getRedirectedURI(HttpHeaders headers) {
URI redirectedURI;
+ String ss = headers.firstValue("Location").orElse("Not present");
redirectedURI = headers.firstValue("Location")
.map((s) -> URI.create(s))
.orElseThrow(() -> new UncheckedIOException(
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/ResetFrame.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+class ResetFrame extends ErrorFrame {
+
+ public final static int TYPE = 0x3;
+
+ // See ErrorFrame for error values
+
+ ResetFrame() {
+ type = TYPE;
+ }
+
+ public ResetFrame(int errorCode) {
+ this.errorCode = errorCode;
+ this.type = TYPE;
+ }
+
+ @Override
+ void readIncomingImpl(ByteBufferConsumer bc) throws IOException {
+ errorCode = bc.getInt();
+ }
+
+ @Override
+ void writeOutgoing(ByteBufferGenerator bg) {
+ super.writeOutgoing(bg);
+ ByteBuffer buf = bg.getBuffer(4);
+ buf.putInt(errorCode);
+ }
+
+ @Override
+ void computeLength() {
+ length = 4;
+ }
+}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/ResponseHeaders.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/ResponseHeaders.java Sat Apr 30 00:30:31 2016 +0100
@@ -48,7 +48,7 @@
*
* This class is not thread-safe
*/
-class ResponseHeaders implements HttpHeaders1 {
+class ResponseHeaders implements HttpHeaders {
static final int DATA_SIZE = 16 * 1024; // initial space for headers
static final int NUM_HEADERS = 50; // initial expected max number of headers
@@ -368,10 +368,6 @@
return Collections.unmodifiableList(l);
}
- @Override
- public void makeUnmodifiable() {
- }
-
// Delegates map to HashMap but converts keys to lower case
static class HeaderMap implements Map<String,List<String>> {
--- a/jdk/src/java.httpclient/share/classes/java/net/http/SSLConnection.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/SSLConnection.java Sat Apr 30 00:30:31 2016 +0100
@@ -121,13 +121,8 @@
}
@Override
- void close() {
- try {
- //System.err.println ("Closing: " + this);
- delegate.channel().close(); // TODO: proper close
- } catch (IOException ex) {
- Log.logError(ex.toString());
- }
+ public void close() {
+ Utils.close(delegate.channel());
}
@Override
--- a/jdk/src/java.httpclient/share/classes/java/net/http/SSLDelegate.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/SSLDelegate.java Sat Apr 30 00:30:31 2016 +0100
@@ -29,13 +29,9 @@
import java.util.Arrays;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
-import javax.net.ssl.SSLParameters;
-import javax.net.ssl.SSLSession;
+import javax.net.ssl.*;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
/**
@@ -60,16 +56,18 @@
engine.setUseClientMode(true);
SSLParameters sslp = client.sslParameters().orElse(null);
if (sslp == null) {
- sslp = context.getDefaultSSLParameters();
+ sslp = context.getSupportedSSLParameters();
}
sslParameters = Utils.copySSLParameters(sslp);
if (alpn != null) {
sslParameters.setApplicationProtocols(alpn);
Log.logSSL("Setting application protocols: " + Arrays.toString(alpn));
} else {
- Log.logSSL("Warning no application protocols proposed!");
+ Log.logSSL("No application protocols proposed");
}
engine.setSSLParameters(sslParameters);
+ engine.setEnabledCipherSuites(sslp.getCipherSuites());
+ engine.setEnabledProtocols(sslp.getProtocols());
wrapper = new EngineWrapper(chan, engine);
this.chan = chan;
this.client = client;
@@ -268,7 +266,7 @@
do {
if (needData) {
do {
- x = chan.read (unwrap_src);
+ x = chan.read (unwrap_src);
} while (x == 0);
if (x == -1) {
throw new IOException ("connection closed for reading");
@@ -440,6 +438,27 @@
}
}
+ static void printParams(SSLParameters p) {
+ System.out.println("SSLParameters:");
+ if (p == null) {
+ System.out.println("Null params");
+ return;
+ }
+ for (String cipher : p.getCipherSuites()) {
+ System.out.printf("cipher: %s\n", cipher);
+ }
+ for (String approto : p.getApplicationProtocols()) {
+ System.out.printf("application protocol: %s\n", approto);
+ }
+ for (String protocol : p.getProtocols()) {
+ System.out.printf("protocol: %s\n", protocol);
+ }
+ if (p.getServerNames() != null)
+ for (SNIServerName sname : p.getServerNames()) {
+ System.out.printf("server name: %s\n", sname.toString());
+ }
+ }
+
String getSessionInfo() {
StringBuilder sb = new StringBuilder();
String application = engine.getApplicationProtocol();
--- a/jdk/src/java.httpclient/share/classes/java/net/http/SSLTunnelConnection.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/SSLTunnelConnection.java Sat Apr 30 00:30:31 2016 +0100
@@ -130,12 +130,8 @@
}
@Override
- void close() {
- try {
- //System.err.println ("Closing: " + this);
- delegate.channel().close(); // TODO: proper close
- } catch (IOException ex) {
- }
+ public void close() {
+ Utils.close(delegate.channel());
}
@Override
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/SettingsFrame.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+class SettingsFrame extends Http2Frame {
+
+ int[] parameters;
+
+ public static final int TYPE = 0x4;
+
+ // Flags
+ public static final int ACK = 0x1;
+
+ @Override
+ String flagAsString(int flag) {
+ switch (flag) {
+ case ACK:
+ return "ACK";
+ }
+ return super.flagAsString(flag);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(super.toString())
+ .append(" Settings: ");
+
+ for (int i = 0; i < MAX_PARAM; i++) {
+ if (parameters[i] != -1) {
+ sb.append(name(i))
+ .append("=")
+ .append(Integer.toString(parameters[i]))
+ .append(' ');
+ }
+ }
+ return sb.toString();
+ }
+
+ // Parameters
+ public static final int HEADER_TABLE_SIZE = 0x1;
+ public static final int ENABLE_PUSH = 0x2;
+ public static final int MAX_CONCURRENT_STREAMS = 0x3;
+ public static final int INITIAL_WINDOW_SIZE = 0x4;
+ public static final int MAX_FRAME_SIZE = 0x5;
+ public static final int MAX_HEADER_LIST_SIZE = 0x6;
+
+ private String name(int i) {
+ switch (i+1) {
+ case HEADER_TABLE_SIZE:
+ return "HEADER_TABLE_SIZE";
+ case ENABLE_PUSH:
+ return "ENABLE_PUSH";
+ case MAX_CONCURRENT_STREAMS:
+ return "MAX_CONCURRENT_STREAMS";
+ case INITIAL_WINDOW_SIZE:
+ return "INITIAL_WINDOW_SIZE";
+ case MAX_FRAME_SIZE:
+ return "MAX_FRAME_SIZE";
+ case MAX_HEADER_LIST_SIZE:
+ return "MAX_HEADER_LIST_SIZE";
+ }
+ return "unknown parameter";
+ }
+ public static final int MAX_PARAM = 0x6;
+
+ public SettingsFrame() {
+ type = TYPE;
+ parameters = new int [MAX_PARAM];
+ for (int i=0; i < parameters.length; i++) {
+ parameters[i] = -1;
+ }
+ }
+
+ public int getParameter(int paramID) {
+ if (paramID > MAX_PARAM) {
+ throw new IllegalArgumentException("illegal parameter");
+ }
+ return parameters[paramID-1];
+ }
+
+ public SettingsFrame setParameter(int paramID, int value) {
+ if (paramID > MAX_PARAM) {
+ throw new IllegalArgumentException("illegal parameter");
+ }
+ parameters[paramID-1] = value;
+ return this;
+ }
+
+ @Override
+ void readIncomingImpl(ByteBufferConsumer bc) throws IOException {
+ if (length % 6 != 0) {
+ throw new IOException("Protocol error: invalid settings frame");
+ }
+ int n = length / 6;
+ for (int i=0; i<n; i++) {
+ int id = bc.getShort();
+ int val = bc.getInt();
+ if (id > 0 || id <= MAX_PARAM) {
+ // a known parameter. Ignore otherwise
+ parameters[id-1] = val;
+ }
+ }
+ }
+
+ @Override
+ void computeLength() {
+ length = 0;
+ for (int i : parameters) {
+ if (i != -1) {
+ length += 6;
+ }
+ }
+ }
+
+ @Override
+ void writeOutgoing(ByteBufferGenerator bg) {
+ super.writeOutgoing(bg);
+ ByteBuffer buf = bg.getBuffer(length);
+ for (int i = 0; i < MAX_PARAM; i++) {
+ if (parameters[i] != -1) {
+ buf.putShort((short)(i+1));
+ buf.putInt(parameters[i]);
+ }
+ }
+ }
+
+ private static final int K = 1024;
+
+ public static SettingsFrame getDefaultSettings() {
+ SettingsFrame f = new SettingsFrame();
+ // TODO: check these values
+ f.setParameter(ENABLE_PUSH, 1);
+ f.setParameter(HEADER_TABLE_SIZE, 4 * K);
+ f.setParameter(MAX_CONCURRENT_STREAMS, 35);
+ f.setParameter(INITIAL_WINDOW_SIZE, 16 * K);
+ f.setParameter(MAX_FRAME_SIZE, 16 * K);
+ return f;
+ }
+}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/Stream.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/Stream.java Sat Apr 30 00:30:31 2016 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -24,78 +24,819 @@
package java.net.http;
+import sun.net.httpclient.hpack.DecodingCallback;
+
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
+import java.util.function.BiFunction;
import java.util.function.LongConsumer;
/**
- * Http/2 Stream
+ * Http/2 Stream handling.
+ *
+ * REQUESTS
+ *
+ * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
+ *
+ * sendRequest() -- sendHeadersOnly() + sendBody()
+ *
+ * sendBody() -- in calling thread: obeys all flow control (so may block)
+ * obtains data from request body processor and places on connection
+ * outbound Q.
+ *
+ * sendBodyAsync() -- calls sendBody() in an executor thread.
+ *
+ * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
+ *
+ * sendRequestAsync() -- calls sendRequest() in an executor thread
+ *
+ * RESPONSES
+ *
+ * Multiple responses can be received per request. Responses are queued up on
+ * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
+ * with the next response
+ *
+ * getResponseAsync() -- queries list of response CFs and returns first one
+ * if one exists. Otherwise, creates one and adds it to list
+ * and returns it. Completion is achieved through the
+ * incoming() upcall from connection reader thread.
+ *
+ * getResponse() -- calls getResponseAsync() and waits for CF to complete
+ *
+ * responseBody() -- in calling thread: blocks for incoming DATA frames on
+ * stream inputQ. Obeys remote and local flow control so may block.
+ * Calls user response body processor with data buffers.
+ *
+ * responseBodyAsync() -- calls responseBody() in an executor thread.
+ *
+ * incoming() -- entry point called from connection reader thread. Frames are
+ * either handled immediately without blocking or for data frames
+ * placed on the stream's inputQ which is consumed by the stream's
+ * reader thread.
+ *
+ * PushedStream sub class
+ * ======================
+ * Sending side methods are not used because the request comes from a PUSH_PROMISE
+ * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
+ * is created. PushedStream does not use responseCF list as there can be only
+ * one response. The CF is created when the object created and when the response
+ * HEADERS frame is received the object is completed.
*/
class Stream extends ExchangeImpl {
- void debugPrint() {
- }
+ final Queue<Http2Frame> inputQ;
+
+ volatile int streamid;
+
+ long responseContentLen = -1;
+ long responseBytesProcessed = 0;
+ long requestContentLen;
+
+ Http2Connection connection;
+ HttpClientImpl client;
+ final HttpRequestImpl request;
+ final DecodingCallback rspHeadersConsumer;
+ HttpHeadersImpl responseHeaders;
+ final HttpHeadersImpl requestHeaders;
+ final HttpHeadersImpl requestPseudoHeaders;
+ HttpResponse.BodyProcessor<?> responseProcessor;
+ final HttpRequest.BodyProcessor requestProcessor;
+ HttpResponse response;
+
+ // state flags
+ boolean requestSent, responseReceived;
+
+ final FlowController userRequestFlowController =
+ new FlowController();
+ final FlowController remoteRequestFlowController =
+ new FlowController();
+ final FlowController responseFlowController =
+ new FlowController();
+
+ final ExecutorWrapper executor;
@Override
@SuppressWarnings("unchecked")
<T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
- return null;
+ this.responseProcessor = processor;
+ CompletableFuture<T> cf;
+ try {
+ T body = processor.onResponseBodyStart(
+ responseContentLen, responseHeaders,
+ responseFlowController); // TODO: filter headers
+ if (body != null) {
+ cf = CompletableFuture.completedFuture(body);
+ receiveDataAsync(processor);
+ } else
+ cf = receiveDataAsync(processor);
+ } catch (IOException e) {
+ cf = CompletableFuture.failedFuture(e);
+ }
+ PushGroup<?> pg = request.pushGroup();
+ if (pg != null) {
+ // if an error occurs make sure it is recorded in the PushGroup
+ cf = cf.whenComplete((t,e) -> pg.pushError(e));
+ }
+ return cf;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("streamid: ")
+ .append(streamid);
+ return sb.toString();
+ }
+
+ // pushes entire response body into response processor
+ // blocking when required by local or remote flow control
+ void receiveData() throws IOException {
+ Http2Frame frame;
+ DataFrame df = null;
+ try {
+ do {
+ frame = inputQ.take();
+ if (!(frame instanceof DataFrame)) {
+ assert false;
+ continue;
+ }
+ df = (DataFrame) frame;
+ int len = df.getDataLength();
+ ByteBuffer[] buffers = df.getData();
+ for (ByteBuffer b : buffers) {
+ responseFlowController.take();
+ responseProcessor.onResponseBodyChunk(b);
+ }
+ sendWindowUpdate(len);
+ } while (!df.getFlag(DataFrame.END_STREAM));
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) {
+ CompletableFuture<T> cf = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ receiveData();
+ T body = processor.onResponseComplete();
+ cf.complete(body);
+ responseReceived();
+ } catch (Throwable t) {
+ cf.completeExceptionally(t);
+ }
+ }, null);
+ return cf;
+ }
+
+ private void sendWindowUpdate(int increment)
+ throws IOException, InterruptedException {
+ if (increment == 0)
+ return;
+ LinkedList<Http2Frame> list = new LinkedList<>();
+ WindowUpdateFrame frame = new WindowUpdateFrame();
+ frame.streamid(streamid);
+ frame.setUpdate(increment);
+ list.add(frame);
+ frame = new WindowUpdateFrame();
+ frame.streamid(0);
+ frame.setUpdate(increment);
+ list.add(frame);
+ connection.sendFrames(list);
+ }
+
+ @Override
+ CompletableFuture<Void> sendBodyAsync() {
+ final CompletableFuture<Void> cf = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ sendBodyImpl();
+ cf.complete(null);
+ } catch (IOException | InterruptedException e) {
+ cf.completeExceptionally(e);
+ }
+ }, null);
+ return cf;
+ }
+
+ @SuppressWarnings("unchecked")
+ Stream(HttpClientImpl client, Http2Connection connection, Exchange e) {
+ super(e);
+ this.client = client;
+ this.connection = connection;
+ this.request = e.request();
+ this.requestProcessor = request.requestProcessor();
+ responseHeaders = new HttpHeadersImpl();
+ requestHeaders = new HttpHeadersImpl();
+ rspHeadersConsumer = (name, value) -> {
+ responseHeaders.addHeader(name.toString(), value.toString());
+ };
+ this.executor = client.executorWrapper();
+ //this.response_cf = new CompletableFuture<HttpResponseImpl>();
+ this.requestPseudoHeaders = new HttpHeadersImpl();
+ // NEW
+ this.inputQ = new Queue<>();
+ }
+
+ @SuppressWarnings("unchecked")
+ Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) {
+ super(null);
+ this.client = client;
+ this.connection = connection;
+ this.request = req;
+ this.requestProcessor = null;
+ responseHeaders = new HttpHeadersImpl();
+ requestHeaders = new HttpHeadersImpl();
+ rspHeadersConsumer = (name, value) -> {
+ responseHeaders.addHeader(name.toString(), value.toString());
+ };
+ this.executor = client.executorWrapper();
+ //this.response_cf = new CompletableFuture<HttpResponseImpl>();
+ this.requestPseudoHeaders = new HttpHeadersImpl();
+ // NEW
+ this.inputQ = new Queue<>();
}
- Stream(HttpClientImpl client, Http2Connection connection, Exchange e) {
- super(e);
+ /**
+ * Entry point from Http2Connection reader thread.
+ *
+ * Data frames will be removed by response body thread.
+ *
+ * @param frame
+ * @throws IOException
+ */
+ void incoming(Http2Frame frame) throws IOException, InterruptedException {
+ if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) {
+ // Complete headers accumulated. handle response.
+ // It's okay if there are multiple HeaderFrames.
+ handleResponse();
+ } else if (frame instanceof DataFrame) {
+ inputQ.put(frame);
+ } else {
+ otherFrame(frame);
+ }
+ }
+
+ void otherFrame(Http2Frame frame) throws IOException {
+ switch (frame.type()) {
+ case WindowUpdateFrame.TYPE:
+ incoming_windowUpdate((WindowUpdateFrame) frame);
+ break;
+ case ResetFrame.TYPE:
+ incoming_reset((ResetFrame) frame);
+ break;
+ case PriorityFrame.TYPE:
+ incoming_priority((PriorityFrame) frame);
+ break;
+ default:
+ String msg = "Unexpected frame: " + frame.toString();
+ throw new IOException(msg);
+ }
+ }
+
+ // The Hpack decoder decodes into one of these consumers of name,value pairs
+
+ DecodingCallback rspHeadersConsumer() {
+ return rspHeadersConsumer;
+ }
+
+ // create and return the HttpResponseImpl
+ protected void handleResponse() throws IOException {
+ HttpConnection c = connection.connection; // TODO: improve
+ long statusCode = responseHeaders
+ .firstValueAsLong(":status")
+ .orElseThrow(() -> new IOException("no statuscode in response"));
+
+ this.response = new HttpResponseImpl((int)statusCode, exchange, responseHeaders, null,
+ c.sslParameters(), HttpClient.Version.HTTP_2, c);
+ this.responseContentLen = responseHeaders
+ .firstValueAsLong("content-length")
+ .orElse(-1L);
+ // different implementations for normal streams and pushed streams
+ completeResponse(response);
+ }
+
+ void incoming_reset(ResetFrame frame) {
+ // TODO: implement reset
+ int error = frame.getErrorCode();
+ IOException e = new IOException(ErrorFrame.stringForCode(error));
+ completeResponseExceptionally(e);
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ void incoming_priority(PriorityFrame frame) {
+ // TODO: implement priority
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ void incoming_windowUpdate(WindowUpdateFrame frame) {
+ int amount = frame.getUpdate();
+ if (amount > 0)
+ remoteRequestFlowController.accept(amount);
+ }
+
+ void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException {
+ if (Log.requests()) {
+ Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
+ }
+ PushGroup<?> pushGroup = request.pushGroup();
+ if (pushGroup == null) {
+ cancelImpl(new IllegalStateException("unexpected push promise"));
+ }
+ // get the handler and call it.
+ BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph =
+ pushGroup.pushHandler();
+
+ CompletableFuture<HttpResponse> pushCF = pushStream
+ .getResponseAsync(null)
+ .thenApply(r -> (HttpResponse)r);
+ boolean accept = ph.apply(pushReq, pushCF);
+ if (!accept) {
+ IOException ex = new IOException("Stream cancelled by user");
+ cancelImpl(ex);
+ pushCF.completeExceptionally(ex);
+ } else {
+ pushStream.requestSent();
+ pushGroup.addPush();
+ }
+ }
+
+ private OutgoingHeaders headerFrame(long contentLength) {
+ HttpHeadersImpl h = request.getSystemHeaders();
+ if (contentLength > 0) {
+ h.setHeader("content-length", Long.toString(contentLength));
+ }
+ setPseudoHeaderFields();
+ OutgoingHeaders f = new OutgoingHeaders(h, request.getUserHeaders(), this);
+ if (contentLength == 0) {
+ f.setFlag(HeadersFrame.END_STREAM);
+ }
+ return f;
+ }
+
+ private void setPseudoHeaderFields() {
+ HttpHeadersImpl hdrs = requestPseudoHeaders;
+ String method = request.method();
+ hdrs.setHeader(":method", method);
+ URI uri = request.uri();
+ hdrs.setHeader(":scheme", uri.getScheme());
+ // TODO: userinfo deprecated. Needs to be removed
+ hdrs.setHeader(":authority", uri.getAuthority());
+ // TODO: ensure header names beginning with : not in user headers
+ String query = uri.getQuery();
+ String path = uri.getPath();
+ if (path == null) {
+ if (method.equalsIgnoreCase("OPTIONS")) {
+ path = "*";
+ } else {
+ path = "/";
+ }
+ }
+ if (query != null) {
+ path += "?" + query;
+ }
+ hdrs.setHeader(":path", path);
+ }
+
+ HttpHeadersImpl getRequestPseudoHeaders() {
+ return requestPseudoHeaders;
}
@Override
HttpResponseImpl getResponse() throws IOException {
- return null;
+ try {
+ return getResponseAsync(null).join();
+ } catch (Throwable e) {
+ Throwable t = e.getCause();
+ if (t instanceof IOException) {
+ throw (IOException)t;
+ }
+ throw e;
+ }
}
@Override
void sendRequest() throws IOException, InterruptedException {
+ sendHeadersOnly();
+ sendBody();
+ }
+
+ /**
+ * A simple general purpose blocking flow controller
+ */
+ class FlowController implements LongConsumer {
+ int permits;
+
+ FlowController() {
+ this.permits = 0;
+ }
+
+ @Override
+ public synchronized void accept(long n) {
+ if (n < 1) {
+ throw new InternalError("FlowController.accept called with " + n);
+ }
+ if (permits == 0) {
+ permits += n;
+ notifyAll();
+ } else {
+ permits += n;
+ }
+ }
+
+ public synchronized void take() throws InterruptedException {
+ take(1);
+ }
+
+ public synchronized void take(int amount) throws InterruptedException {
+ assert permits >= 0;
+ while (permits < amount) {
+ int n = Math.min(amount, permits);
+ permits -= n;
+ amount -= n;
+ if (amount > 0)
+ wait();
+ }
+ }
}
@Override
void sendHeadersOnly() throws IOException, InterruptedException {
+ if (Log.requests() && request != null) {
+ Log.logRequest(request.toString());
+ }
+ requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController);
+ OutgoingHeaders f = headerFrame(requestContentLen);
+ connection.sendFrame(f);
}
@Override
void sendBody() throws IOException, InterruptedException {
+ sendBodyImpl();
}
+ void registerStream(int id) {
+ this.streamid = id;
+ connection.putStream(this, streamid);
+ }
+
+ DataFrame getDataFrame() throws IOException, InterruptedException {
+ userRequestFlowController.take();
+ int maxpayloadLen = connection.getMaxSendFrameSize() - 9;
+ ByteBuffer buffer = connection.getBuffer();
+ buffer.limit(maxpayloadLen);
+ boolean complete = requestProcessor.onRequestBodyChunk(buffer);
+ buffer.flip();
+ int amount = buffer.remaining();
+ // wait for flow control if necessary. Following method will block
+ // until after headers frame is sent, so correct streamid is set.
+ remoteRequestFlowController.take(amount);
+ connection.obtainSendWindow(amount);
+
+ DataFrame df = new DataFrame();
+ df.streamid(streamid);
+ if (complete) {
+ df.setFlag(DataFrame.END_STREAM);
+ }
+ df.setData(buffer);
+ df.computeLength();
+ return df;
+ }
+
+
@Override
CompletableFuture<Void> sendHeadersAsync() {
- return null;
+ try {
+ sendHeadersOnly();
+ return CompletableFuture.completedFuture(null);
+ } catch (IOException | InterruptedException ex) {
+ return CompletableFuture.failedFuture(ex);
+ }
}
+ /**
+ * A List of responses relating to this stream. Normally there is only
+ * one response, but intermediate responses like 100 are allowed
+ * and must be passed up to higher level before continuing. Deals with races
+ * such as if responses are returned before the CFs get created by
+ * getResponseAsync()
+ */
+
+ final List<CompletableFuture<HttpResponseImpl>> response_cfs = new LinkedList<>();
+
@Override
CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) {
- return null;
+ CompletableFuture<HttpResponseImpl> cf;
+ synchronized (response_cfs) {
+ if (!response_cfs.isEmpty()) {
+ cf = response_cfs.remove(0);
+ } else {
+ cf = new CompletableFuture<>();
+ response_cfs.add(cf);
+ }
+ }
+ PushGroup<?> pg = request.pushGroup();
+ if (pg != null) {
+ // if an error occurs make sure it is recorded in the PushGroup
+ cf = cf.whenComplete((t,e) -> pg.pushError(e));
+ }
+ return cf;
+ }
+
+ /**
+ * Completes the first uncompleted CF on list, and removes it. If there is no
+ * uncompleted CF then creates one (completes it) and adds to list
+ */
+ void completeResponse(HttpResponse r) {
+ HttpResponseImpl resp = (HttpResponseImpl)r;
+ synchronized (response_cfs) {
+ for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
+ if (!cf.isDone()) {
+ cf.complete(resp);
+ response_cfs.remove(cf);
+ //responseHeaders = new HttpHeadersImpl(); // for any following header blocks
+ return;
+ } else
+ System.err.println("Stream: " + this + " ALREADY DONE");
+ }
+ response_cfs.add(CompletableFuture.completedFuture(resp));
+ //responseHeaders = new HttpHeadersImpl(); // for any following header blocks
+ }
}
- @Override
- CompletableFuture<Void> sendBodyAsync() {
- return null;
+ // methods to update state and remove stream when finished
+
+ synchronized void requestSent() {
+ requestSent = true;
+ if (responseReceived)
+ connection.deleteStream(this);
+ }
+
+ synchronized void responseReceived() {
+ responseReceived = true;
+ if (requestSent)
+ connection.deleteStream(this);
+ PushGroup<?> pg = request.pushGroup();
+ if (pg != null)
+ pg.noMorePushes();
+ }
+
+ /**
+ * same as above but for errors
+ *
+ * @param t
+ */
+ void completeResponseExceptionally(Throwable t) {
+ synchronized (response_cfs) {
+ for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
+ if (!cf.isDone()) {
+ cf.completeExceptionally(t);
+ response_cfs.remove(cf);
+ return;
+ }
+ }
+ response_cfs.add(CompletableFuture.failedFuture(t));
+ }
+ }
+
+ void sendBodyImpl() throws IOException, InterruptedException {
+ if (requestContentLen == 0) {
+ // no body
+ return;
+ }
+ DataFrame df;
+ do {
+ df = getDataFrame();
+ // TODO: check accumulated content length (if not checked below)
+ connection.sendFrame(df);
+ } while (!df.getFlag(DataFrame.END_STREAM));
+ requestSent();
}
@Override
void cancel() {
+ cancelImpl(new Exception("Cancelled"));
}
+ void cancelImpl(Throwable e) {
+ Log.logTrace("cancelling stream: {0}\n", e.toString());
+ inputQ.close();
+ try {
+ connection.resetStream(streamid, ResetFrame.CANCEL);
+ } catch (IOException | InterruptedException ex) {
+ Log.logError(ex);
+ }
+ }
+
@Override
CompletableFuture<Void> sendRequestAsync() {
- return null;
+ CompletableFuture<Void> cf = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ sendRequest();
+ cf.complete(null);
+ } catch (IOException |InterruptedException e) {
+ cf.completeExceptionally(e);
+ }
+ }, null);
+ return cf;
}
@Override
<T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
- return null;
+ this.responseProcessor = processor;
+ T body = processor.onResponseBodyStart(
+ responseContentLen, responseHeaders,
+ responseFlowController); // TODO: filter headers
+ if (body == null) {
+ receiveData();
+ return processor.onResponseComplete();
+ } else
+ receiveDataAsync(processor);
+ responseReceived();
+ return body;
+ }
+
+ // called from Http2Connection reader thread
+ synchronized void updateOutgoingWindow(int update) {
+ remoteRequestFlowController.accept(update);
+ }
+
+ void close(String msg) {
+ cancel();
+ }
+
+ static class PushedStream extends Stream {
+ final PushGroup<?> pushGroup;
+ final private Stream parent; // used by server push streams
+ // push streams need the response CF allocated up front as it is
+ // given directly to user via the multi handler callback function.
+ final CompletableFuture<HttpResponseImpl> pushCF;
+ final HttpRequestImpl pushReq;
+
+ PushedStream(PushGroup<?> pushGroup, HttpClientImpl client,
+ Http2Connection connection, Stream parent,
+ HttpRequestImpl pushReq) {
+ super(client, connection, pushReq);
+ this.pushGroup = pushGroup;
+ this.pushReq = pushReq;
+ this.pushCF = new CompletableFuture<>();
+ this.parent = parent;
+ }
+
+ // Following methods call the super class but in case of
+ // error record it in the PushGroup. The error method is called
+ // with a null value when no error occurred (is a no-op)
+ @Override
+ CompletableFuture<Void> sendBodyAsync() {
+ return super.sendBodyAsync()
+ .whenComplete((v, t) -> pushGroup.pushError(t));
+ }
+
+ @Override
+ CompletableFuture<Void> sendHeadersAsync() {
+ return super.sendHeadersAsync()
+ .whenComplete((v, t) -> pushGroup.pushError(t));
+ }
+
+ @Override
+ CompletableFuture<Void> sendRequestAsync() {
+ return super.sendRequestAsync()
+ .whenComplete((v, t) -> pushGroup.pushError(t));
+ }
+
+ @Override
+ CompletableFuture<HttpResponseImpl> getResponseAsync(Void vo) {
+ return pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
+ }
+
+ @Override
+ <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
+ return super.responseBodyAsync(processor)
+ .whenComplete((v, t) -> pushGroup.pushError(t));
+ }
+
+ @Override
+ void completeResponse(HttpResponse r) {
+ HttpResponseImpl resp = (HttpResponseImpl)r;
+ Utils.logResponse(resp);
+ pushCF.complete(resp);
+ }
+
+ @Override
+ void completeResponseExceptionally(Throwable t) {
+ pushCF.completeExceptionally(t);
+ }
+
+ @Override
+ synchronized void responseReceived() {
+ super.responseReceived();
+ pushGroup.pushCompleted();
+ }
+
+ // create and return the PushResponseImpl
+ @Override
+ protected void handleResponse() {
+ HttpConnection c = connection.connection; // TODO: improve
+ long statusCode = responseHeaders
+ .firstValueAsLong(":status")
+ .orElse(-1L);
+
+ if (statusCode == -1L)
+ completeResponseExceptionally(new IOException("No status code"));
+ ImmutableHeaders h = new ImmutableHeaders(responseHeaders, Utils.ALL_HEADERS);
+ this.response = new HttpResponseImpl((int)statusCode, pushReq, h, this,
+ c.sslParameters());
+ this.responseContentLen = responseHeaders
+ .firstValueAsLong("content-length")
+ .orElse(-1L);
+ // different implementations for normal streams and pushed streams
+ completeResponse(response);
+ }
+ }
+
+ /**
+ * One PushGroup object is associated with the parent Stream of
+ * the pushed Streams. This keeps track of all common state associated
+ * with the pushes.
+ */
+ static class PushGroup<T> {
+ // the overall completion object, completed when all pushes are done.
+ final CompletableFuture<T> resultCF;
+ Throwable error; // any exception that occured during pushes
+
+ // CF for main response
+ final CompletableFuture<HttpResponse> mainResponse;
+
+ // user's processor object
+ final HttpResponse.MultiProcessor<T> multiProcessor;
+
+ // per push handler function provided by processor
+ final private BiFunction<HttpRequest,
+ CompletableFuture<HttpResponse>,
+ Boolean> pushHandler;
+ int numberOfPushes;
+ int remainingPushes;
+ boolean noMorePushes = false;
+
+ PushGroup(HttpResponse.MultiProcessor<T> multiProcessor, HttpRequestImpl req) {
+ this.resultCF = new CompletableFuture<>();
+ this.mainResponse = new CompletableFuture<>();
+ this.multiProcessor = multiProcessor;
+ this.pushHandler = multiProcessor.onStart(req, mainResponse);
+ }
+
+ CompletableFuture<T> groupResult() {
+ return resultCF;
+ }
+
+ CompletableFuture<HttpResponse> mainResponse() {
+ return mainResponse;
+ }
+
+ private BiFunction<HttpRequest,
+ CompletableFuture<HttpResponse>, Boolean> pushHandler()
+ {
+ return pushHandler;
+ }
+
+ synchronized void addPush() {
+ numberOfPushes++;
+ remainingPushes++;
+ }
+
+ synchronized int numberOfPushes() {
+ return numberOfPushes;
+ }
+ // This is called when the main body response completes because it means
+ // no more PUSH_PROMISEs are possible
+ synchronized void noMorePushes() {
+ noMorePushes = true;
+ checkIfCompleted();
+ }
+
+ synchronized void pushCompleted() {
+ remainingPushes--;
+ checkIfCompleted();
+ }
+
+ synchronized void checkIfCompleted() {
+ if (remainingPushes == 0 && error == null && noMorePushes) {
+ T overallResult = multiProcessor.onComplete();
+ resultCF.complete(overallResult);
+ }
+ }
+
+ synchronized void pushError(Throwable t) {
+ if (t == null)
+ return;
+ this.error = t;
+ resultCF.completeExceptionally(t);
+ }
}
}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/Utils.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/Utils.java Sat Apr 30 00:30:31 2016 +0100
@@ -21,28 +21,37 @@
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
*/
-
package java.net.http;
+import sun.net.NetProperties;
+
+import javax.net.ssl.SSLParameters;
import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
import java.net.NetPermission;
import java.net.URI;
import java.net.URLPermission;
+import java.nio.Buffer;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import javax.net.ssl.SSLParameters;
-import sun.net.NetProperties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.LongBinaryOperator;
+import java.util.function.Predicate;
/**
* Miscellaneous utilities
*/
-class Utils {
+final class Utils {
/**
* Allocated buffer size. Must never be higher than 16K. But can be lower
@@ -51,7 +60,59 @@
*/
public static final int BUFSIZE = 16 * 1024;
- /** Validates a RFC7230 token */
+ private static final Set<String> DISALLOWED_HEADERS_SET = Set.of(
+ "authorization", "connection", "cookie", "content-length",
+ "date", "expect", "from", "host", "origin", "proxy-authorization",
+ "referer", "user-agent", "upgrade", "via", "warning");
+
+ static final Predicate<String>
+ ALLOWED_HEADERS = header -> !Utils.DISALLOWED_HEADERS_SET.contains(header);
+
+ static final Predicate<String>
+ ALL_HEADERS = header -> true;
+
+ static InetSocketAddress getAddress(HttpRequestImpl req) {
+ URI uri = req.uri();
+ if (uri == null) {
+ return req.authority();
+ }
+ int port = uri.getPort();
+ if (port == -1) {
+ if (uri.getScheme().equalsIgnoreCase("https")) {
+ port = 443;
+ } else {
+ port = 80;
+ }
+ }
+ String host = uri.getHost();
+ if (req.proxy() == null) {
+ return new InetSocketAddress(host, port);
+ } else {
+ return InetSocketAddress.createUnresolved(host, port);
+ }
+ }
+
+ /**
+ * Puts position to limit and limit to capacity so we can resume reading
+ * into this buffer, but if required > 0 then limit may be reduced so that
+ * no more than required bytes are read next time.
+ */
+ static void resumeChannelRead(ByteBuffer buf, int required) {
+ int limit = buf.limit();
+ buf.position(limit);
+ int capacity = buf.capacity() - limit;
+ if (required > 0 && required < capacity) {
+ buf.limit(limit + required);
+ } else {
+ buf.limit(buf.capacity());
+ }
+ }
+
+ private Utils() { }
+
+ /**
+ * Validates a RFC7230 token
+ */
static void validateToken(String token, String errormsg) {
int length = token.length();
for (int i = 0; i < length; i++) {
@@ -69,7 +130,7 @@
}
/**
- * Return sthe security permission required for the given details.
+ * Returns the security permission required for the given details.
* If method is CONNECT, then uri must be of form "scheme://host:port"
*/
static URLPermission getPermission(URI uri,
@@ -117,13 +178,13 @@
}
static int getIntegerNetProperty(String name, int defaultValue) {
- return AccessController.doPrivileged((PrivilegedAction<Integer>)() ->
- NetProperties.getInteger(name, defaultValue) );
+ return AccessController.doPrivileged((PrivilegedAction<Integer>) () ->
+ NetProperties.getInteger(name, defaultValue));
}
static String getNetProperty(String name) {
- return AccessController.doPrivileged((PrivilegedAction<String>)() ->
- NetProperties.get(name) );
+ return AccessController.doPrivileged((PrivilegedAction<String>) () ->
+ NetProperties.get(name));
}
static SSLParameters copySSLParameters(SSLParameters p) {
@@ -134,7 +195,9 @@
p1.setEndpointIdentificationAlgorithm(p.getEndpointIdentificationAlgorithm());
p1.setMaximumPacketSize(p.getMaximumPacketSize());
p1.setNeedClientAuth(p.getNeedClientAuth());
- p1.setProtocols(p.getProtocols().clone());
+ String[] protocols = p.getProtocols();
+ if (protocols != null)
+ p1.setProtocols(protocols.clone());
p1.setSNIMatchers(p.getSNIMatchers());
p1.setServerNames(p.getServerNames());
p1.setUseCipherSuitesOrder(p.getUseCipherSuitesOrder());
@@ -142,33 +205,14 @@
return p1;
}
-
- /** Resumes reading into the given buffer. */
- static void unflip(ByteBuffer buf) {
- buf.position(buf.limit());
- buf.limit(buf.capacity());
- }
-
/**
* Set limit to position, and position to mark.
- *
- *
- * @param buffer
- * @param mark
*/
static void flipToMark(ByteBuffer buffer, int mark) {
buffer.limit(buffer.position());
buffer.position(mark);
}
- /** Compact and leave ready for reading. */
- static void compact(List<ByteBuffer> buffers) {
- for (ByteBuffer b : buffers) {
- b.compact();
- b.flip();
- }
- }
-
static String stackTrace(Throwable t) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
String s = null;
@@ -182,8 +226,10 @@
return s;
}
- /** Copies as much of src to dst as possible. */
- static void copy (ByteBuffer src, ByteBuffer dst) {
+ /**
+ * Copies as much of src to dst as possible.
+ */
+ static void copy(ByteBuffer src, ByteBuffer dst) {
int srcLen = src.remaining();
int dstLen = dst.remaining();
if (srcLen > dstLen) {
@@ -204,18 +250,101 @@
return dst;
}
- static String combine(String[] s) {
+ //
+ // Helps to trim long names (packages, nested/inner types) in logs/toString
+ //
+ static String toStringSimple(Object o) {
+ return o.getClass().getSimpleName() + "@" +
+ Integer.toHexString(System.identityHashCode(o));
+ }
+
+ //
+ // 1. It adds a number of remaining bytes;
+ // 2. Standard Buffer-type toString for CharBuffer (since it adheres to the
+ // contract of java.lang.CharSequence.toString() which is both not too
+ // useful and not too private)
+ //
+ static String toString(Buffer b) {
+ return toStringSimple(b)
+ + "[pos=" + b.position()
+ + " lim=" + b.limit()
+ + " cap=" + b.capacity()
+ + " rem=" + b.remaining() + "]";
+ }
+
+ static String toString(CharSequence s) {
+ return s == null
+ ? "null"
+ : toStringSimple(s) + "[len=" + s.length() + "]";
+ }
+
+ static String dump(Object... objects) {
+ return Arrays.toString(objects);
+ }
+
+ static final System.Logger logger = System.getLogger("java.net.http.WebSocket");
+
+ static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+
+ static String webSocketSpecViolation(String section, String detail) {
+ return "RFC 6455 " + section + " " + detail;
+ }
+
+ static void logResponse(HttpResponseImpl r) {
+ if (!Log.requests()) {
+ return;
+ }
StringBuilder sb = new StringBuilder();
- sb.append('[');
- boolean first = true;
- for (String s1 : s) {
- if (!first) {
- sb.append(", ");
- first = false;
+ String method = r.request().method();
+ URI uri = r.uri();
+ String uristring = uri == null ? "" : uri.toString();
+ sb.append('(').append(method).append(" ").append(uristring).append(") ").append(Integer.toString(r.statusCode()));
+ Log.logResponse(sb.toString());
+ }
+
+ static int remaining(ByteBuffer[] bufs) {
+ int remain = 0;
+ for (ByteBuffer buf : bufs)
+ remain += buf.remaining();
+ return remain;
+ }
+
+ // assumes buffer was written into starting at position zero
+ static void unflip(ByteBuffer buf) {
+ buf.position(buf.limit());
+ buf.limit(buf.capacity());
+ }
+
+ static void close(Closeable... chans) {
+ for (Closeable chan : chans) {
+ System.err.println("Closing " + chan);
+ try {
+ chan.close();
+ } catch (IOException e) {
}
- sb.append(s1);
}
- sb.append(']');
- return sb.toString();
+ }
+
+ static ByteBuffer[] reduce(ByteBuffer[] bufs, int start, int number) {
+ if (start == 0 && number == bufs.length)
+ return bufs;
+ ByteBuffer[] nbufs = new ByteBuffer[number];
+ int j = 0;
+ for (int i=start; i<start+number; i++)
+ nbufs[j++] = bufs[i];
+ return nbufs;
}
+
+ static String asString(ByteBuffer buf) {
+ byte[] b = new byte[buf.remaining()];
+ buf.get(b);
+ return new String(b, StandardCharsets.US_ASCII);
+ }
+
+ // Put all these static 'empty' singletons here
+ @SuppressWarnings("rawtypes")
+ static CompletableFuture[] EMPTY_CFARRAY = new CompletableFuture[0];
+
+ static ByteBuffer EMPTY_BYTEBUFFER = ByteBuffer.allocate(0);
+ static ByteBuffer[] EMPTY_BB_ARRAY = new ByteBuffer[0];
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/WindowUpdateFrame.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ */
+
+package java.net.http;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+class WindowUpdateFrame extends Http2Frame {
+
+ int windowUpdate;
+
+ WindowUpdateFrame() {
+ type = TYPE;
+ }
+
+ public final static int TYPE = 0x8;
+
+ public void setUpdate(int windowUpdate) {
+ this.windowUpdate = windowUpdate;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(super.toString())
+ .append(" WindowUpdate: ")
+ .append(windowUpdate);
+ return sb.toString();
+ }
+
+ public int getUpdate() {
+ return this.windowUpdate;
+ }
+
+ /**
+ */
+ @Override
+ void readIncomingImpl(ByteBufferConsumer bc) throws IOException {
+ if (length != 4) {
+ throw new IOException("Invalid WindowUpdate frame");
+ }
+ windowUpdate = bc.getInt() & 0x7fffffff;
+ }
+
+ @Override
+ void computeLength() {
+ length = 4;
+ }
+
+ @Override
+ void writeOutgoing(ByteBufferGenerator bg) {
+ super.writeOutgoing(bg);
+ ByteBuffer buf = bg.getBuffer(4);
+ buf.putInt(windowUpdate);
+ }
+}
--- a/jdk/src/java.httpclient/share/classes/java/net/http/package-info.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/package-info.java Sat Apr 30 00:30:31 2016 +0100
@@ -24,11 +24,11 @@
*/
/**
- * <h2>High level HTTP API</h2>
- * This provides a high-level client interface to HTTP (versions 1.1 and 2).
- * Synchronous and asynchronous (via
- * {@link java.util.concurrent.CompletableFuture}) modes are provided. The main
- * classes defined are:
+ * <h2>High level HTTP and WebSocket API</h2>
+ * This provides a high-level client interfaces to HTTP (versions 1.1 and 2)
+ * and WebSocket. Synchronous and asynchronous (via {@link
+ * java.util.concurrent.CompletableFuture}) modes are provided for HTTP.
+ * WebSocket works in asynchronous mode only. The main types defined are:
* <ul>
* <li>{@link java.net.http.HttpClient}</li>
* <li>{@link java.net.http.HttpRequest}</li>
--- a/jdk/test/java/net/httpclient/APIErrors.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/test/java/net/httpclient/APIErrors.java Sat Apr 30 00:30:31 2016 +0100
@@ -26,6 +26,7 @@
* @bug 8087112
* @library /lib/testlibrary/
* @build jdk.testlibrary.SimpleSSLContext ProxyServer
+ * @build TestKit
* @compile ../../../com/sun/net/httpserver/LogFilter.java
* @compile ../../../com/sun/net/httpserver/FileServerHandler.java
* @run main/othervm APIErrors
@@ -73,26 +74,6 @@
}
}
- static void reject(Runnable r, Class<? extends Exception> extype) {
- try {
- r.run();
- throw new RuntimeException("Expected: " + extype);
- } catch (Throwable t) {
- if (!extype.isAssignableFrom(t.getClass())) {
- throw new RuntimeException("Wrong exception type: " + extype + " / "
- +t.getClass());
- }
- }
- }
-
- static void accept(Runnable r) {
- try {
- r.run();
- } catch (Throwable t) {
- throw new RuntimeException("Unexpected exception: " + t);
- }
- }
-
static void checkNonNull(Supplier<?> r) {
if (r.get() == null)
throw new RuntimeException("Unexpected null return:");
@@ -108,12 +89,14 @@
System.out.println("Test 1");
HttpClient.Builder cb = HttpClient.create();
InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 5000);
- reject(() -> { cb.priority(-1);}, IllegalArgumentException.class);
- reject(() -> { cb.priority(500);}, IllegalArgumentException.class);
- accept(() -> { cb.priority(1);});
- accept(() -> { cb.priority(255);});
-
- accept(() -> {clients.add(cb.build()); clients.add(cb.build());});
+ TestKit.assertThrows(IllegalArgumentException.class, () -> cb.priority(-1));
+ TestKit.assertThrows(IllegalArgumentException.class, () -> cb.priority(500));
+ TestKit.assertNotThrows(() -> cb.priority(1));
+ TestKit.assertNotThrows(() -> cb.priority(255));
+ TestKit.assertNotThrows(() -> {
+ clients.add(cb.build());
+ clients.add(cb.build());
+ });
}
static void test2() throws Exception {
@@ -139,7 +122,7 @@
static void test3() throws Exception {
System.out.println("Test 3");
- reject(()-> {
+ TestKit.assertThrows(IllegalStateException.class, ()-> {
try {
HttpRequest r1 = request();
HttpResponse resp = r1.response();
@@ -147,9 +130,9 @@
} catch (IOException |InterruptedException e) {
throw new RuntimeException(e);
}
- }, IllegalStateException.class);
+ });
- reject(()-> {
+ TestKit.assertThrows(IllegalStateException.class, ()-> {
try {
HttpRequest r1 = request();
HttpResponse resp = r1.response();
@@ -157,8 +140,8 @@
} catch (IOException |InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
- }, IllegalStateException.class);
- reject(()-> {
+ });
+ TestKit.assertThrows(IllegalStateException.class, ()-> {
try {
HttpRequest r1 = request();
HttpResponse resp1 = r1.responseAsync().get();
@@ -166,7 +149,7 @@
} catch (IOException |InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
- }, IllegalStateException.class);
+ });
}
static class Auth extends java.net.Authenticator {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/EchoHandler.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import com.sun.net.httpserver.*;
+import java.net.*;
+import java.net.http.*;
+import java.io.*;
+import java.util.concurrent.*;
+import javax.net.ssl.*;
+import java.nio.file.*;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import jdk.testlibrary.SimpleSSLContext;
+import static java.net.http.HttpRequest.*;
+import static java.net.http.HttpResponse.*;
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class EchoHandler implements HttpHandler {
+ public EchoHandler() {}
+
+ @Override
+ public void handle(HttpExchange t)
+ throws IOException {
+ try {
+ System.err.println("EchoHandler received request to " + t.getRequestURI());
+ InputStream is = t.getRequestBody();
+ Headers map = t.getRequestHeaders();
+ Headers map1 = t.getResponseHeaders();
+ map1.add("X-Hello", "world");
+ map1.add("X-Bye", "universe");
+ String fixedrequest = map.getFirst("XFixed");
+ File outfile = File.createTempFile("foo", "bar");
+ FileOutputStream fos = new FileOutputStream(outfile);
+ int count = (int) is.transferTo(fos);
+ is.close();
+ fos.close();
+ InputStream is1 = new FileInputStream(outfile);
+ OutputStream os = null;
+ // return the number of bytes received (no echo)
+ String summary = map.getFirst("XSummary");
+ if (fixedrequest != null && summary == null) {
+ t.sendResponseHeaders(200, count);
+ os = t.getResponseBody();
+ is1.transferTo(os);
+ } else {
+ t.sendResponseHeaders(200, 0);
+ os = t.getResponseBody();
+ is1.transferTo(os);
+
+ if (summary != null) {
+ String s = Integer.toString(count);
+ os.write(s.getBytes());
+ }
+ }
+ outfile.delete();
+ os.close();
+ is1.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ }
+}
--- a/jdk/test/java/net/httpclient/LightWeightHttpServer.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/test/java/net/httpclient/LightWeightHttpServer.java Sat Apr 30 00:30:31 2016 +0100
@@ -22,10 +22,10 @@
*/
/**
- * @library /lib/testlibrary/
- * @build jdk.testlibrary.SimpleSSLContext ProxyServer
- * @compile ../../../com/sun/net/httpserver/LogFilter.java
- * @compile ../../../com/sun/net/httpserver/FileServerHandler.java
+ * library /lib/testlibrary/ /
+ * build jdk.testlibrary.SimpleSSLContext ProxyServer EchoHandler
+ * compile ../../../com/sun/net/httpserver/LogFilter.java
+ * compile ../../../com/sun/net/httpserver/FileServerHandler.java
*/
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpContext;
--- a/jdk/test/java/net/httpclient/ManyRequests.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/test/java/net/httpclient/ManyRequests.java Sat Apr 30 00:30:31 2016 +0100
@@ -24,18 +24,17 @@
/**
* @test
* @bug 8087112
- * @library /lib/testlibrary/
- * @build jdk.testlibrary.SimpleSSLContext
+ * @library /lib/testlibrary/ /
+ * @build jdk.testlibrary.SimpleSSLContext EchoHandler
* @compile ../../../com/sun/net/httpserver/LogFilter.java
* @compile ../../../com/sun/net/httpserver/FileServerHandler.java
- * @run main/othervm ManyRequests
+ * @run main/othervm/timeout=40 -Djava.net.http.HttpClient.log=ssl ManyRequests
* @summary Send a large number of requests asynchronously
*/
//package javaapplication16;
-import com.sun.net.httpserver.HttpsConfigurator;
-import com.sun.net.httpserver.HttpsServer;
+import com.sun.net.httpserver.*;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.http.HttpClient;
@@ -47,18 +46,25 @@
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Random;
+import java.util.logging.*;
import java.util.concurrent.CompletableFuture;
-import javax.net.ssl.SSLContext;
+import javax.net.ssl.*;
import jdk.testlibrary.SimpleSSLContext;
public class ManyRequests {
+ volatile static int counter = 0;
+
public static void main(String[] args) throws Exception {
+ Logger logger = Logger.getLogger("com.sun.net.httpserver");
+ logger.setLevel(Level.ALL);
+ logger.info("TEST");
+
SSLContext ctx = new SimpleSSLContext().get();
InetSocketAddress addr = new InetSocketAddress(0);
HttpsServer server = HttpsServer.create(addr, 0);
- server.setHttpsConfigurator(new HttpsConfigurator(ctx));
+ server.setHttpsConfigurator(new Configurator(ctx));
HttpClient client = HttpClient.create()
.sslContext(ctx)
@@ -72,7 +78,8 @@
}
}
- static final int REQUESTS = 1000;
+ //static final int REQUESTS = 1000;
+ static final int REQUESTS = 20;
static void test(HttpsServer server, HttpClient client) throws Exception {
int port = server.getAddress().getPort();
@@ -102,6 +109,9 @@
resp.bodyAsync(HttpResponse.ignoreBody());
String s = "Expected 200, got: " + resp.statusCode();
return completedWithIOException(s);
+ } else {
+ counter++;
+ System.out.println("Result from " + counter);
}
return resp.bodyAsync(HttpResponse.asByteArray())
.thenApply((b) -> new Pair<>(resp, b));
@@ -114,14 +124,18 @@
});
}
+
// wait for them all to complete and throw exception in case of error
- CompletableFuture.allOf(results).join();
+ //try {
+ CompletableFuture.allOf(results).join();
+ //} catch (Exception e) {
+ //e.printStackTrace();
+ //throw e;
+ //}
}
static <T> CompletableFuture<T> completedWithIOException(String message) {
- CompletableFuture<T> cf = new CompletableFuture<>();
- cf.completeExceptionally(new IOException(message));
- return cf;
+ return CompletableFuture.failedFuture(new IOException(message));
}
static final class Pair<T,U> {
@@ -192,3 +206,14 @@
throw new RuntimeException(sb.toString());
}
}
+
+class Configurator extends HttpsConfigurator {
+ public Configurator(SSLContext ctx) {
+ super(ctx);
+ }
+
+ public void configure (HttpsParameters params) {
+ params.setSSLParameters (getSSLContext().getSupportedSSLParameters());
+ }
+}
+
--- a/jdk/test/java/net/httpclient/RequestBodyTest.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/test/java/net/httpclient/RequestBodyTest.java Sat Apr 30 00:30:31 2016 +0100
@@ -23,10 +23,11 @@
/**
* @test @bug 8087112
- * @library /lib/testlibrary/
- * @build jdk.testlibrary.SimpleSSLContext ProxyServer
+ * @library /lib/testlibrary/ /
* @compile ../../../com/sun/net/httpserver/LogFilter.java
* @compile ../../../com/sun/net/httpserver/FileServerHandler.java
+ * @build LightWeightHttpServer
+ * @build jdk.testlibrary.SimpleSSLContext ProxyServer
* @run main/othervm RequestBodyTest
*/
--- a/jdk/test/java/net/httpclient/SmokeTest.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/test/java/net/httpclient/SmokeTest.java Sat Apr 30 00:30:31 2016 +0100
@@ -24,15 +24,13 @@
/**
* @test
* @bug 8087112
- * @library /lib/testlibrary/
- * @build jdk.testlibrary.SimpleSSLContext ProxyServer
+ * @library /lib/testlibrary/ /
+ * @build jdk.testlibrary.SimpleSSLContext ProxyServer EchoHandler
* @compile ../../../com/sun/net/httpserver/LogFilter.java
* @compile ../../../com/sun/net/httpserver/FileServerHandler.java
* @run main/othervm SmokeTest
*/
-//package javaapplication16;
-
import com.sun.net.httpserver.*;
import java.net.*;
import java.net.http.*;
@@ -69,6 +67,7 @@
*/
public class SmokeTest {
static SSLContext ctx;
+ static SSLParameters sslparams;
static HttpServer s1 ;
static HttpsServer s2;
static ExecutorService executor;
@@ -107,6 +106,7 @@
client = HttpClient.create()
.sslContext(ctx)
+ .sslParameters(sslparams)
.followRedirects(HttpClient.Redirect.ALWAYS)
.executorService(Executors.newCachedThreadPool())
.build();
@@ -285,6 +285,7 @@
HttpClient cl = HttpClient.create()
.proxy(ProxySelector.of(proxyAddr))
.sslContext(ctx)
+ .sslParameters(sslparams)
.build();
CompletableFuture<String> fut = cl.request(uri)
@@ -672,7 +673,8 @@
s1.setExecutor(executor);
s2.setExecutor(executor);
ctx = new SimpleSSLContext().get();
- s2.setHttpsConfigurator(new HttpsConfigurator(ctx));
+ sslparams = ctx.getSupportedSSLParameters();
+ s2.setHttpsConfigurator(new Configurator(ctx));
s1.start();
s2.start();
@@ -689,6 +691,16 @@
}
}
+class Configurator extends HttpsConfigurator {
+ public Configurator(SSLContext ctx) {
+ super(ctx);
+ }
+
+ public void configure (HttpsParameters params) {
+ params.setSSLParameters (getSSLContext().getSupportedSSLParameters());
+ }
+}
+
class UploadServer extends Thread {
int statusCode;
ServerSocket ss;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/TestKit.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import java.util.regex.Pattern;
+
+import static java.util.Objects.requireNonNull;
+
+//
+// A set of testing utility functions
+//
+public final class TestKit {
+
+ private TestKit() { }
+
+ public static void assertNotThrows(ThrowingProcedure code) {
+ requireNonNull(code, "code");
+ assertNotThrows(() -> {
+ code.run();
+ return null;
+ });
+ }
+
+ public static <V> V assertNotThrows(ThrowingFunction<V> code) {
+ requireNonNull(code, "code");
+ try {
+ return code.run();
+ } catch (Throwable t) {
+ throw new RuntimeException("Expected to run normally, but threw "
+ + t.getClass().getCanonicalName(), t);
+ }
+ }
+
+ public static <T extends Throwable> T assertThrows(Class<? extends T> clazz,
+ ThrowingProcedure code) {
+ requireNonNull(clazz, "clazz");
+ requireNonNull(code, "code");
+ try {
+ code.run();
+ } catch (Throwable t) {
+ if (clazz.isInstance(t)) {
+ return clazz.cast(t);
+ }
+ throw new RuntimeException("Expected to catch an exception of type "
+ + clazz.getCanonicalName() + ", but caught "
+ + t.getClass().getCanonicalName(), t);
+
+ }
+ throw new RuntimeException("Expected to catch an exception of type "
+ + clazz.getCanonicalName() + ", but caught nothing");
+ }
+
+ public interface ThrowingProcedure {
+ void run() throws Throwable;
+ }
+
+ public interface ThrowingFunction<V> {
+ V run() throws Throwable;
+ }
+
+ // The rationale behind asking for a regex is to not pollute variable names
+ // space in the scope of assertion: if it's something as simple as checking
+ // a message, we can do it inside
+ public static <T extends Throwable> T assertThrows(Class<? extends T> clazz,
+ String messageRegex,
+ ThrowingProcedure code) {
+ requireNonNull(messageRegex, "messagePattern");
+ T t = assertThrows(clazz, code);
+ String m = t.getMessage();
+ if (m == null) {
+ throw new RuntimeException(String.format(
+ "Expected exception message to match the regex '%s', " +
+ "but the message was null", messageRegex), t);
+ }
+ if (!Pattern.matches(messageRegex, m)) {
+ throw new RuntimeException(String.format(
+ "Expected exception message to match the regex '%s', " +
+ "actual message: %s", messageRegex, m), t);
+ }
+ return t;
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/TestKitTest.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import java.io.IOException;
+import java.util.IllegalFormatException;
+import java.util.Set;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+/*
+ * @test
+ * @compile TestKit.java
+ * @run testng TestKitTest
+ */
+public final class TestKitTest {
+
+ public static void main(String[] args) {
+ testAssertNotThrows();
+ testAssertThrows();
+ }
+
+ private static void testAssertNotThrows() {
+ Integer integer = TestKit.assertNotThrows(
+ () -> TestKit.assertNotThrows(() -> 1)
+ );
+ assertEquals(integer, Integer.valueOf(1));
+
+ RuntimeException re = TestKit.assertThrows(
+ RuntimeException.class,
+ () -> TestKit.assertNotThrows(() -> { throw new IOException(); })
+ );
+ assertEquals(re.getMessage(),
+ "Expected to run normally, but threw "
+ + "java.io.IOException");
+
+ TestKit.assertNotThrows(
+ () -> TestKit.assertNotThrows(() -> { })
+ );
+
+ re = TestKit.assertThrows(
+ RuntimeException.class,
+ () -> TestKit.assertNotThrows((TestKit.ThrowingProcedure) () -> { throw new IOException(); })
+ );
+ assertEquals(re.getMessage(),
+ "Expected to run normally, but threw "
+ + "java.io.IOException");
+ }
+
+ private static void testAssertThrows() {
+ NullPointerException npe = TestKit.assertThrows(
+ NullPointerException.class,
+ () -> TestKit.assertThrows(null, null)
+ );
+ assertNotNull(npe);
+ assertTrue(Set.of("clazz", "code").contains(npe.getMessage()), npe.getMessage());
+
+ npe = TestKit.assertThrows(
+ NullPointerException.class,
+ () -> TestKit.assertThrows(IOException.class, null)
+ );
+ assertNotNull(npe);
+ assertEquals(npe.getMessage(), "code");
+
+ npe = TestKit.assertThrows(
+ NullPointerException.class,
+ () -> TestKit.assertThrows(null, () -> { })
+ );
+ assertEquals(npe.getMessage(), "clazz");
+
+ npe = TestKit.assertThrows(
+ NullPointerException.class,
+ () -> { throw new NullPointerException(); }
+ );
+ assertNotNull(npe);
+ assertNull(npe.getMessage());
+ assertEquals(npe.getClass(), NullPointerException.class);
+
+ RuntimeException re = TestKit.assertThrows(
+ RuntimeException.class,
+ () -> TestKit.assertThrows(NullPointerException.class, () -> { })
+ );
+ assertEquals(re.getClass(), RuntimeException.class);
+ assertEquals(re.getMessage(),
+ "Expected to catch an exception of type "
+ + "java.lang.NullPointerException, but caught nothing");
+
+ re = TestKit.assertThrows(
+ RuntimeException.class,
+ () -> { throw new NullPointerException(); }
+ );
+ assertNotNull(re);
+ assertNull(re.getMessage());
+ assertEquals(re.getClass(), NullPointerException.class);
+
+ re = TestKit.assertThrows(
+ RuntimeException.class,
+ () -> TestKit.assertThrows(
+ IllegalFormatException.class,
+ () -> { throw new IndexOutOfBoundsException(); }
+ ));
+ assertNotNull(re);
+ assertEquals(re.getClass(), RuntimeException.class);
+ assertEquals(re.getMessage(),
+ "Expected to catch an exception of type java.util.IllegalFormatException"
+ + ", but caught java.lang.IndexOutOfBoundsException");
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/http2/BasicTest.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,222 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @bug 8087112
+ * @library /lib/testlibrary
+ * @build jdk.testlibrary.SimpleSSLContext
+ * @modules java.httpclient
+ * @compile/module=java.httpclient java/net/http/BodyOutputStream.java
+ * @compile/module=java.httpclient java/net/http/BodyInputStream.java
+ * @compile/module=java.httpclient java/net/http/EchoHandler.java
+ * @compile/module=java.httpclient java/net/http/Http2Handler.java
+ * @compile/module=java.httpclient java/net/http/Http2TestExchange.java
+ * @compile/module=java.httpclient java/net/http/Http2TestServerConnection.java
+ * @compile/module=java.httpclient java/net/http/Http2TestServer.java
+ * @compile/module=java.httpclient java/net/http/OutgoingPushPromise.java
+ * @compile/module=java.httpclient java/net/http/TestUtil.java
+ * @run testng/othervm -Djava.net.http.HttpClient.log=ssl,requests,responses,errors BasicTest
+ */
+
+import java.io.*;
+import java.net.*;
+import java.net.http.*;
+import static java.net.http.HttpClient.Version.HTTP_2;
+import javax.net.ssl.*;
+import java.nio.file.*;
+import java.util.concurrent.*;
+import jdk.testlibrary.SimpleSSLContext;
+
+
+import org.testng.annotations.Test;
+import org.testng.annotations.Parameters;
+
+@Test
+public class BasicTest {
+ static int httpPort, httpsPort;
+ static Http2TestServer httpServer, httpsServer;
+ static HttpClient client = null;
+ static ExecutorService exec;
+ static SSLContext sslContext;
+
+ static String httpURIString, httpsURIString;
+
+ static void initialize() throws Exception {
+ try {
+ SimpleSSLContext sslct = new SimpleSSLContext();
+ sslContext = sslct.get();
+ client = getClient();
+ exec = client.executorService();
+ httpServer = new Http2TestServer(false, 0, new EchoHandler(),
+ exec, sslContext);
+ httpPort = httpServer.getAddress().getPort();
+
+ httpsServer = new Http2TestServer(true, 0, new EchoHandler(),
+ exec, sslContext);
+
+ httpsPort = httpsServer.getAddress().getPort();
+ httpURIString = "http://127.0.0.1:" + Integer.toString(httpPort) +
+ "/foo/";
+ httpsURIString = "https://127.0.0.1:" + Integer.toString(httpsPort) +
+ "/bar/";
+
+ httpServer.start();
+ httpsServer.start();
+ } catch (Throwable e) {
+ System.err.println("Throwing now");
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @Test(timeOut=30000)
+ public static void test() throws Exception {
+ try {
+ initialize();
+ simpleTest(false);
+ simpleTest(true);
+ streamTest(false);
+ streamTest(true);
+ Thread.sleep(1000 * 4);
+ } finally {
+ httpServer.stop();
+ httpsServer.stop();
+ exec.shutdownNow();
+ }
+ }
+
+ static HttpClient getClient() {
+ if (client == null) {
+ client = HttpClient.create()
+ .sslContext(sslContext)
+ .version(HTTP_2)
+ .build();
+ }
+ return client;
+ }
+
+ static URI getURI(boolean secure) {
+ if (secure)
+ return URI.create(httpsURIString);
+ else
+ return URI.create(httpURIString);
+ }
+
+ static void checkStatus(int expected, int found) throws Exception {
+ if (expected != found) {
+ System.err.printf ("Test failed: wrong status code %d/%d\n",
+ expected, found);
+ throw new RuntimeException("Test failed");
+ }
+ }
+
+ static void checkStrings(String expected, String found) throws Exception {
+ if (!expected.equals(found)) {
+ System.err.printf ("Test failed: wrong string %s/%s\n",
+ expected, found);
+ throw new RuntimeException("Test failed");
+ }
+ }
+
+ static Void compareFiles(Path path1, Path path2) {
+ return java.net.http.TestUtil.compareFiles(path1, path2);
+ }
+
+ static Path tempFile() {
+ return java.net.http.TestUtil.tempFile();
+ }
+
+ static final String SIMPLE_STRING = "Hello world Goodbye world";
+
+ static final int LOOPS = 13;
+ static final int FILESIZE = 64 * 1024;
+
+ static void streamTest(boolean secure) throws Exception {
+ URI uri = getURI(secure);
+ System.err.printf("streamTest %b to %s\n" , secure, uri);
+
+ HttpClient client = getClient();
+ Path src = java.net.http.TestUtil.getAFile(FILESIZE * 4);
+ HttpRequest req = client.request(uri)
+ .body(HttpRequest.fromFile(src))
+ .POST();
+
+ CompletableFuture<InputStream> response = req.responseAsync()
+ .thenCompose(resp -> {
+ if (resp.statusCode() != 200)
+ throw new RuntimeException();
+ return resp.bodyAsync(HttpResponse.asInputStream());
+ });
+ InputStream is = response.join();
+ File dest = File.createTempFile("foo","bar");
+ dest.deleteOnExit();
+ FileOutputStream os = new FileOutputStream(dest);
+ is.transferTo(os);
+ is.close();
+ os.close();
+ int count = 0;
+ compareFiles(src, dest.toPath());
+ System.err.println("DONE");
+ }
+
+
+ static void simpleTest(boolean secure) throws Exception {
+ URI uri = getURI(secure);
+ System.err.println("Request to " + uri);
+
+ // Do a simple warmup request
+
+ HttpClient client = getClient();
+ HttpRequest req = client.request(uri)
+ .body(HttpRequest.fromString(SIMPLE_STRING))
+ .POST();
+ HttpResponse response = req.response();
+ HttpHeaders h = response.headers();
+
+ checkStatus(200, response.statusCode());
+
+ String responseBody = response.body(HttpResponse.asString());
+ checkStrings(SIMPLE_STRING, responseBody);
+
+ checkStrings(h.firstValue("x-hello").get(), "world");
+ checkStrings(h.firstValue("x-bye").get(), "universe");
+
+ // Do loops asynchronously
+
+ CompletableFuture[] responses = new CompletableFuture[LOOPS];
+ final Path source = java.net.http.TestUtil.getAFile(FILESIZE);
+ for (int i = 0; i < LOOPS; i++) {
+ responses[i] = client.request(uri)
+ .body(HttpRequest.fromFile(source))
+ .version(HTTP_2)
+ .POST()
+ .responseAsync()
+ .thenCompose(r -> r.bodyAsync(HttpResponse.asFile(tempFile())))
+ .thenApply(path -> compareFiles(path, source));
+ Thread.sleep(100);
+ }
+ CompletableFuture.allOf(responses).join();
+ System.err.println("DONE");
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/http2/ServerPush.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @bug 8087112
+ * @library /lib/testlibrary
+ * @build jdk.testlibrary.SimpleSSLContext
+ * @modules java.httpclient
+ * @compile/module=java.httpclient java/net/http/BodyOutputStream.java
+ * @compile/module=java.httpclient java/net/http/BodyInputStream.java
+ * @compile/module=java.httpclient java/net/http/PushHandler.java
+ * @compile/module=java.httpclient java/net/http/Http2Handler.java
+ * @compile/module=java.httpclient java/net/http/Http2TestExchange.java
+ * @compile/module=java.httpclient java/net/http/Http2TestServerConnection.java
+ * @compile/module=java.httpclient java/net/http/Http2TestServer.java
+ * @compile/module=java.httpclient java/net/http/OutgoingPushPromise.java
+ * @compile/module=java.httpclient java/net/http/TestUtil.java
+ * @run testng/othervm -Djava.net.http.HttpClient.log=requests,responses ServerPush
+ */
+
+import java.io.*;
+import java.net.*;
+import java.nio.file.*;
+import java.nio.file.attribute.*;
+import java.net.http.*;
+import java.util.*;
+import java.util.concurrent.*;
+import org.testng.annotations.Test;
+
+public class ServerPush {
+
+ static ExecutorService e = Executors.newCachedThreadPool();
+
+ static final int LOOPS = 13;
+ static final int FILE_SIZE = 32 * 1024;
+
+ static Path tempFile;
+
+ @Test(timeOut=30000)
+ public static void test() throws Exception {
+ Http2TestServer server = null;
+ Path dir = null;
+ try {
+ server = new Http2TestServer(false, 0,
+ new PushHandler(FILE_SIZE, LOOPS));
+ tempFile = TestUtil.getAFile(FILE_SIZE);
+
+ System.err.println("Server listening on port " + server.getAddress().getPort());
+ server.start();
+ int port = server.getAddress().getPort();
+ dir = Files.createTempDirectory("serverPush");
+
+ URI uri = new URI("http://127.0.0.1:" + Integer.toString(port) + "/foo");
+ HttpRequest request = HttpRequest.create(uri)
+ .version(HttpClient.Version.HTTP_2)
+ .GET();
+
+ CompletableFuture<Map<URI,Path>> cf =
+ request.multiResponseAsync(HttpResponse.multiFile(dir));
+ Map<URI,Path> results = cf.get();
+
+ //HttpResponse resp = request.response();
+ System.err.println(results.size());
+ Set<URI> uris = results.keySet();
+ for (URI u : uris) {
+ Path result = results.get(u);
+ System.err.printf("%s -> %s\n", u.toString(), result.toString());
+ TestUtil.compareFiles(result, tempFile);
+ }
+ System.out.println("TEST OK");
+ } finally {
+ e.shutdownNow();
+ server.stop();
+ Files.walkFileTree(dir, new SimpleFileVisitor<Path>() {
+ public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
+ dir.toFile().delete();
+ return FileVisitResult.CONTINUE;
+ }
+ public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) {
+ path.toFile().delete();
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/http2/TEST.properties Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,1 @@
+bootclasspath.dirs = /java/net/httpclient
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/http2/java.httpclient/java/net/http/BodyInputStream.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,107 @@
+package java.net.http;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+
+/**
+ * InputStream reads frames off stream q and supplies read demand from any
+ * DataFrames it finds. Window updates are sent back on the connections send
+ * q.
+ */
+class BodyInputStream extends InputStream {
+
+ final Queue<Http2Frame> q;
+ final int streamid;
+ boolean closed;
+ boolean eof;
+ final Http2TestServerConnection conn;
+
+ @SuppressWarnings({"rawtypes","unchecked"})
+ BodyInputStream(Queue q, int streamid, Http2TestServerConnection conn) {
+ this.q = q;
+ this.streamid = streamid;
+ this.conn = conn;
+ }
+
+ DataFrame df;
+ ByteBuffer[] buffers;
+ ByteBuffer buffer;
+ int nextIndex = -1;
+
+ private DataFrame getData() throws IOException {
+ if (eof) {
+ return null;
+ }
+ Http2Frame frame;
+ do {
+ frame = q.take();
+ if (frame.type() == ResetFrame.TYPE) {
+ conn.handleStreamReset((ResetFrame) frame); // throws IOException
+ }
+ // ignoring others for now Wupdates handled elsewhere
+ if (frame.type() != DataFrame.TYPE) {
+ System.out.println("Ignoring " + frame.toString() + " CHECK THIS");
+ }
+ } while (frame.type() != DataFrame.TYPE);
+ df = (DataFrame) frame;
+ int len = df.getDataLength();
+ eof = frame.getFlag(DataFrame.END_STREAM);
+ // acknowledge
+ conn.sendWindowUpdates(len, streamid);
+ return (DataFrame) frame;
+ }
+
+ // null return means EOF
+ private ByteBuffer getBuffer() throws IOException {
+ if (buffer == null || !buffer.hasRemaining()) {
+ if (nextIndex == -1 || nextIndex == buffers.length) {
+ DataFrame df = getData();
+ if (df == null) {
+ return null;
+ }
+ int len = df.getDataLength();
+ if ((len == 0) && eof) {
+ return null;
+ }
+ buffers = df.getData();
+ nextIndex = 0;
+ }
+ buffer = buffers[nextIndex++];
+ }
+ return buffer;
+ }
+
+ @Override
+ public int read(byte[] buf, int offset, int length) throws IOException {
+ if (closed) {
+ throw new IOException("closed");
+ }
+ ByteBuffer b = getBuffer();
+ if (b == null) {
+ return -1;
+ }
+ int remaining = b.remaining();
+ if (remaining < length) {
+ length = remaining;
+ }
+ b.get(buf, offset, length);
+ return length;
+ }
+
+ byte[] one = new byte[1];
+
+ @Override
+ public int read() throws IOException {
+ int c = read(one, 0, 1);
+ if (c == -1) {
+ return -1;
+ }
+ return one[0];
+ }
+
+ @Override
+ public void close() {
+ // TODO reset this stream
+ closed = true;
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/http2/java.httpclient/java/net/http/BodyOutputStream.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,106 @@
+package java.net.http;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+
+/**
+ * OutputStream. Incoming window updates handled by the main connection
+ * reader thread.
+ */
+@SuppressWarnings({"rawtypes","unchecked"})
+class BodyOutputStream extends OutputStream {
+ final static byte[] EMPTY_BARRAY = new byte[0];
+
+ final int streamid;
+ int window;
+ boolean closed;
+ boolean goodToGo = false; // not allowed to send until headers sent
+ final Http2TestServerConnection conn;
+ final Queue outputQ;
+
+ BodyOutputStream(int streamid, int initialWindow, Http2TestServerConnection conn) {
+ this.window = initialWindow;
+ this.streamid = streamid;
+ this.conn = conn;
+ this.outputQ = conn.outputQ;
+ conn.registerStreamWindowUpdater(streamid, this::updateWindow);
+ }
+
+ // called from connection reader thread as all incoming window
+ // updates are handled there.
+ synchronized void updateWindow(int update) {
+ window += update;
+ notifyAll();
+ }
+
+ void waitForWindow(int demand) throws InterruptedException {
+ // first wait for the connection window
+ conn.obtainConnectionWindow(demand);
+ // now wait for the stream window
+ synchronized (this) {
+ while (demand > 0) {
+ int n = Math.min(demand, window);
+ demand -= n;
+ window -= n;
+ if (demand > 0) {
+ wait();
+ }
+ }
+ }
+ }
+
+ void goodToGo() {
+ goodToGo = true;
+ }
+
+ @Override
+ public void write(byte[] buf, int offset, int len) throws IOException {
+ if (closed) {
+ throw new IOException("closed");
+ }
+
+ if (!goodToGo) {
+ throw new IllegalStateException("sendResponseHeaders must be called first");
+ }
+ try {
+ waitForWindow(len);
+ send(buf, offset, len, 0);
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ private void send(byte[] buf, int offset, int len, int flags) throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocate(len);
+ buffer.put(buf, offset, len);
+ buffer.flip();
+ DataFrame df = new DataFrame();
+ assert streamid != 0;
+ df.streamid(streamid);
+ df.setFlags(flags);
+ df.setData(buffer);
+ outputQ.put(df);
+ }
+
+ byte[] one = new byte[1];
+
+ @Override
+ public void write(int b) throws IOException {
+ one[0] = (byte) b;
+ write(one, 0, 1);
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ try {
+ send(EMPTY_BARRAY, 0, 0, DataFrame.END_STREAM);
+ } catch (IOException ex) {
+ System.err.println("TestServer: OutputStream.close exception: " + ex);
+ ex.printStackTrace();
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/http2/java.httpclient/java/net/http/EchoHandler.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2005, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package java.net.http;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.io.*;
+import java.net.*;
+
+public class EchoHandler implements Http2Handler {
+ public EchoHandler() {}
+
+ @Override
+ public void handle(Http2TestExchange t)
+ throws IOException {
+ try {
+ System.err.println("EchoHandler received request to " + t.getRequestURI());
+ InputStream is = t.getRequestBody();
+ HttpHeadersImpl map = t.getRequestHeaders();
+ HttpHeadersImpl map1 = t.getResponseHeaders();
+ map1.addHeader("X-Hello", "world");
+ map1.addHeader("X-Bye", "universe");
+ String fixedrequest = map.firstValue("XFixed").orElse(null);
+ File outfile = File.createTempFile("foo", "bar");
+ FileOutputStream fos = new FileOutputStream(outfile);
+ int count = (int) is.transferTo(fos);
+ System.err.printf("EchoHandler read %d bytes\n", count);
+ is.close();
+ fos.close();
+ InputStream is1 = new FileInputStream(outfile);
+ OutputStream os = null;
+ // return the number of bytes received (no echo)
+ String summary = map.firstValue("XSummary").orElse(null);
+ if (fixedrequest != null && summary == null) {
+ t.sendResponseHeaders(200, count);
+ os = t.getResponseBody();
+ is1.transferTo(os);
+ } else {
+ t.sendResponseHeaders(200, 0);
+ os = t.getResponseBody();
+ int count1 = (int)is1.transferTo(os);
+ System.err.printf("EchoHandler wrote %d bytes\n", count1);
+
+ if (summary != null) {
+ String s = Integer.toString(count);
+ os.write(s.getBytes());
+ }
+ }
+ outfile.delete();
+ os.close();
+ is1.close();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/http2/java.httpclient/java/net/http/Http2Handler.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,18 @@
+package java.net.http;
+
+import java.io.IOException;
+
+/**
+ * A handler which is invoked to process HTTP exchanges. Each
+ * HTTP exchange is handled by one of these handlers.
+ */
+public interface Http2Handler {
+ /**
+ * Handle the given request and generate an appropriate response.
+ * @param exchange the exchange containing the request from the
+ * client and used to send the response
+ * @throws NullPointerException if exchange is <code>null</code>
+ */
+ public abstract void handle (Http2TestExchange exchange) throws IOException;
+}
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/http2/java.httpclient/java/net/http/Http2TestExchange.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,126 @@
+package java.net.http;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.InetSocketAddress;
+
+public class Http2TestExchange {
+
+ final HttpHeadersImpl reqheaders;
+ final HttpHeadersImpl rspheaders;
+ final URI uri;
+ final String method;
+ final InputStream is;
+ final BodyOutputStream os;
+ final int streamid;
+ final boolean pushAllowed;
+ final Http2TestServerConnection conn;
+ final Http2TestServer server;
+
+ int responseCode = -1;
+ long responseLength;
+
+ Http2TestExchange(int streamid, String method, HttpHeadersImpl reqheaders,
+ HttpHeadersImpl rspheaders, URI uri, InputStream is,
+ BodyOutputStream os, Http2TestServerConnection conn, boolean pushAllowed) {
+ this.reqheaders = reqheaders;
+ this.rspheaders = rspheaders;
+ this.uri = uri;
+ this.method = method;
+ this.is = is;
+ this.streamid = streamid;
+ this.os = os;
+ this.pushAllowed = pushAllowed;
+ this.conn = conn;
+ this.server = conn.server;
+ }
+
+ public HttpHeadersImpl getRequestHeaders() {
+ return reqheaders;
+ }
+
+ public HttpHeadersImpl getResponseHeaders() {
+ return rspheaders;
+ }
+
+ public URI getRequestURI() {
+ return uri;
+ }
+
+ public String getRequestMethod() {
+ return method;
+ }
+
+ public void close() {
+ try {
+ is.close();
+ os.close();
+ } catch (IOException e) {
+ System.err.println("TestServer: HttpExchange.close exception: " + e);
+ e.printStackTrace();
+ }
+ }
+
+ public InputStream getRequestBody() {
+ return is;
+ }
+
+ public OutputStream getResponseBody() {
+ return os;
+ }
+
+ public void sendResponseHeaders(int rCode, long responseLength) throws IOException {
+ this.responseLength = responseLength;
+ if (responseLength > 0 || responseLength < 0) {
+ long clen = responseLength > 0 ? responseLength : 0;
+ rspheaders.setHeader("Content-length", Long.toString(clen));
+ }
+
+ rspheaders.setHeader(":status", Integer.toString(rCode));
+
+ Http2TestServerConnection.ResponseHeaders response
+ = new Http2TestServerConnection.ResponseHeaders(rspheaders);
+ response.streamid(streamid);
+ response.setFlag(HeaderFrame.END_HEADERS);
+ conn.outputQ.put(response);
+ os.goodToGo();
+ System.err.println("Sent response headers " + rCode);
+ }
+
+ public InetSocketAddress getRemoteAddress() {
+ return (InetSocketAddress) conn.socket.getRemoteSocketAddress();
+ }
+
+ public int getResponseCode() {
+ return responseCode;
+ }
+
+ public InetSocketAddress getLocalAddress() {
+ return server.getAddress();
+ }
+
+ public String getProtocol() {
+ return "HTTP/2";
+ }
+
+ public boolean serverPushAllowed() {
+ return pushAllowed;
+ }
+
+ public void serverPush(URI uri, HttpHeadersImpl headers, InputStream content) {
+ OutgoingPushPromise pp = new OutgoingPushPromise(
+ streamid, uri, headers, content);
+ headers.setHeader(":method", "GET");
+ headers.setHeader(":scheme", uri.getScheme());
+ headers.setHeader(":authority", uri.getAuthority());
+ headers.setHeader(":path", uri.getPath());
+ try {
+ conn.outputQ.put(pp);
+ // writeLoop will spin up thread to read the InputStream
+ } catch (IOException ex) {
+ System.err.println("TestServer: pushPromise exception: " + ex);
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/http2/java.httpclient/java/net/http/Http2TestServer.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,159 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package java.net.http;
+
+import java.io.IOException;
+import java.net.*;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+
+/**
+ * Waits for incoming TCP connections from a client and establishes
+ * a HTTP2 connection. Two threads are created per connection. One for reading
+ * and one for writing. Incoming requests are dispatched to the supplied
+ * Http2Handler on additional threads. All threads
+ * obtained from the supplied ExecutorService.
+ */
+public class Http2TestServer {
+ final ServerSocket server;
+ boolean secure;
+ SettingsFrame serverSettings, clientSettings;
+ final ExecutorService exec;
+ volatile boolean stopping = false;
+ final Http2Handler handler;
+ final SSLContext sslContext;
+ final HashMap<InetSocketAddress,Http2TestServerConnection> connections;
+
+ private static ThreadFactory defaultThreadFac =
+ (Runnable r) -> {
+ Thread t = new Thread(r);
+ t.setName("Test-server-pool");
+ return t;
+ };
+
+
+ private static ExecutorService getDefaultExecutor() {
+ return Executors.newCachedThreadPool(defaultThreadFac);
+ }
+
+ public Http2TestServer(boolean secure, int port, Http2Handler handler) throws Exception {
+ this(secure, port, handler, getDefaultExecutor(), null);
+ }
+
+ public InetSocketAddress getAddress() {
+ return (InetSocketAddress)server.getLocalSocketAddress();
+ }
+
+ /**
+ * Create a Http2Server listening on the given port. Currently needs
+ * to know in advance whether incoming connections are plain TCP "h2c"
+ * or TLS "h2"/
+ *
+ * @param secure https or http
+ * @param port listen port
+ * @param handler the handler which receives incoming requests
+ * @param exec executor service (cached thread pool is used if null)
+ * @param context the SSLContext used when secure is true
+ * @throws Exception
+ */
+ public Http2TestServer(boolean secure, int port, Http2Handler handler,
+ ExecutorService exec, SSLContext context) throws Exception {
+ if (secure) {
+ server = initSecure(port);
+ } else {
+ server = initPlaintext(port);
+ }
+ this.secure = secure;
+ this.exec = exec == null ? getDefaultExecutor() : exec;
+ this.handler = handler;
+ this.sslContext = context;
+ this.connections = new HashMap<>();
+ }
+
+ final ServerSocket initPlaintext(int port) throws Exception {
+ return new ServerSocket(port);
+ }
+
+ public void stop() {
+ // TODO: clean shutdown GoAway
+ stopping = true;
+ for (Http2TestServerConnection connection : connections.values()) {
+ connection.close();
+ }
+ try {
+ server.close();
+ } catch (IOException e) {}
+ exec.shutdownNow();
+ }
+
+
+ final ServerSocket initSecure(int port) throws Exception {
+ ServerSocketFactory fac;
+ if (sslContext != null) {
+ fac = sslContext.getServerSocketFactory();
+ } else {
+ fac = SSLServerSocketFactory.getDefault();
+ }
+ SSLServerSocket se = (SSLServerSocket) fac.createServerSocket(port);
+ SSLParameters sslp = se.getSSLParameters();
+ sslp.setApplicationProtocols(new String[]{"h2"});
+ se.setSSLParameters(sslp);
+ se.setEnabledCipherSuites(se.getSupportedCipherSuites());
+ se.setEnabledProtocols(se.getSupportedProtocols());
+ // other initialisation here
+ return se;
+ }
+
+ /**
+ * Start thread which waits for incoming connections.
+ *
+ * @throws Exception
+ */
+ public void start() {
+ exec.submit(() -> {
+ try {
+ while (!stopping) {
+ Socket socket = server.accept();
+ InetSocketAddress addr = (InetSocketAddress) socket.getRemoteSocketAddress();
+ Http2TestServerConnection c = new Http2TestServerConnection(this, socket);
+ connections.put(addr, c);
+ c.run();
+ }
+ } catch (Throwable e) {
+ if (!stopping) {
+ System.err.println("TestServer: start exception: " + e);
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/http2/java.httpclient/java/net/http/Http2TestServerConnection.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,730 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package java.net.http;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import static java.net.http.SettingsFrame.HEADER_TABLE_SIZE;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import sun.net.httpclient.hpack.Decoder;
+import sun.net.httpclient.hpack.DecodingCallback;
+import sun.net.httpclient.hpack.Encoder;
+
+/**
+ * Represents one HTTP2 connection, either plaintext upgraded from HTTP/1.1
+ * or HTTPS opened using "h2" ALPN.
+ */
+public class Http2TestServerConnection {
+ final Http2TestServer server;
+ @SuppressWarnings({"rawtypes","unchecked"})
+ final Map<Integer, Queue> streams; // input q per stream
+ final Queue<Http2Frame> outputQ;
+ int nextstream;
+ final Socket socket;
+ final InputStream is;
+ final OutputStream os;
+ Encoder hpackOut;
+ Decoder hpackIn;
+ SettingsFrame clientSettings, serverSettings;
+ final ExecutorService exec;
+ final boolean secure;
+ final Http2Handler handler;
+ volatile boolean stopping;
+ int nextPushStreamId = 2;
+
+ final static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+ final static byte[] EMPTY_BARRAY = new byte[0];
+
+ final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
+
+ Http2TestServerConnection(Http2TestServer server, Socket socket) throws IOException {
+ System.err.println("New connection from " + socket);
+ this.server = server;
+ this.streams = Collections.synchronizedMap(new HashMap<>());
+ this.outputQ = new Queue<>();
+ this.socket = socket;
+ this.clientSettings = server.clientSettings;
+ this.serverSettings = server.serverSettings;
+ this.exec = server.exec;
+ this.secure = server.secure;
+ this.handler = server.handler;
+ is = new BufferedInputStream(socket.getInputStream());
+ os = new BufferedOutputStream(socket.getOutputStream());
+ }
+
+ void close() {
+ streams.forEach((i, q) -> {
+ q.close();
+ });
+ stopping = true;
+ try {
+ socket.close();
+ // TODO: put a reset on each stream
+ } catch (IOException e) {
+ }
+ }
+
+ private void readPreface() throws IOException {
+ int len = clientPreface.length;
+ byte[] bytes = new byte[len];
+ is.readNBytes(bytes, 0, len);
+ if (Arrays.compare(clientPreface, bytes) != 0) {
+ throw new IOException("Invalid preface: " + new String(bytes, 0, len));
+ }
+ }
+
+ String doUpgrade() throws IOException {
+ String upgrade = readHttp1Request();
+ String h2c = getHeader(upgrade, "Upgrade");
+ if (h2c == null || !h2c.equals("h2c")) {
+ throw new IOException("Bad upgrade 1 " + h2c);
+ }
+
+ sendHttp1Response(101, "Switching Protocols", "Connection", "Upgrade",
+ "Upgrade", "h2c");
+
+ sendSettingsFrame();
+ readPreface();
+
+ String clientSettingsString = getHeader(upgrade, "HTTP2-Settings");
+ clientSettings = getSettingsFromString(clientSettingsString);
+
+ return upgrade;
+ }
+
+ /**
+ * Client settings payload provided in base64 HTTP1 header. Decode it
+ * and add a header so we can interpret it.
+ *
+ * @param s
+ * @return
+ * @throws IOException
+ */
+ private SettingsFrame getSettingsFromString(String s) throws IOException {
+ Base64.Decoder decoder = Base64.getUrlDecoder();
+ byte[] payload = decoder.decode(s);
+ ByteBuffer bb1 = ByteBuffer.wrap(payload);
+ // simulate header of Settings Frame
+ ByteBuffer bb0 = ByteBuffer.wrap(
+ new byte[] {0, 0, (byte)payload.length, 4, 0, 0, 0, 0, 0});
+ ByteBufferConsumer bbc = new ByteBufferConsumer(
+ new LinkedList<ByteBuffer>(List.of(bb0, bb1)),
+ this::getBuffer);
+ Http2Frame frame = Http2Frame.readIncoming(bbc);
+ if (!(frame instanceof SettingsFrame))
+ throw new IOException("Expected SettingsFrame");
+ return (SettingsFrame)frame;
+ }
+
+ void run() throws Exception {
+ String upgrade = null;
+ if (!secure) {
+ upgrade = doUpgrade();
+ } else {
+ readPreface();
+ sendSettingsFrame(true);
+ clientSettings = (SettingsFrame) readFrame();
+ nextstream = 1;
+ }
+
+ hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
+ hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
+
+ exec.submit(() -> {
+ readLoop();
+ });
+ exec.submit(() -> {
+ writeLoop();
+ });
+ if (!secure) {
+ createPrimordialStream(upgrade);
+ nextstream = 3;
+ }
+ }
+
+ static class BufferPool implements BufferHandler {
+
+ public void setMinBufferSize(int size) {
+ }
+
+ public ByteBuffer getBuffer(int size) {
+ if (size == -1)
+ size = 32 * 1024;
+ return ByteBuffer.allocate(size);
+ }
+
+ public void returnBuffer(ByteBuffer buffer) {
+ }
+ }
+
+ static BufferPool bufferpool = new BufferPool();
+
+ private void writeFrame(Http2Frame frame) throws IOException {
+ ByteBufferGenerator bg = new ByteBufferGenerator(bufferpool);
+ frame.computeLength();
+ System.err.println("Writing frame " + frame.toString());
+ frame.writeOutgoing(bg);
+ ByteBuffer[] bufs = bg.getBufferArray();
+ int c = 0;
+ for (ByteBuffer buf : bufs) {
+ byte[] ba = buf.array();
+ int start = buf.arrayOffset() + buf.position();
+ c += buf.remaining();
+ os.write(ba, start, buf.remaining());
+ }
+ os.flush();
+ System.err.printf("wrote %d bytes\n", c);
+ }
+
+ void handleStreamReset(ResetFrame resetFrame) throws IOException {
+ // TODO: cleanup
+ throw new IOException("Stream reset");
+ }
+
+ private void handleCommonFrame(Http2Frame f) throws IOException {
+ if (f instanceof SettingsFrame) {
+ serverSettings = (SettingsFrame) f;
+ if (serverSettings.getFlag(SettingsFrame.ACK)) // ignore
+ {
+ return;
+ }
+ // otherwise acknowledge it
+ SettingsFrame frame = new SettingsFrame();
+ frame.setFlag(SettingsFrame.ACK);
+ frame.streamid(0);
+ outputQ.put(frame);
+ return;
+ }
+ System.err.println("Received ---> " + f.toString());
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ void sendWindowUpdates(int len, int streamid) throws IOException {
+ if (len == 0)
+ return;
+ WindowUpdateFrame wup = new WindowUpdateFrame();
+ wup.streamid(streamid);
+ wup.setUpdate(len);
+ outputQ.put(wup);
+ wup = new WindowUpdateFrame();
+ wup.streamid(0);
+ wup.setUpdate(len);
+ outputQ.put(wup);
+ }
+
+ HttpHeadersImpl decodeHeaders(List<HeaderFrame> frames) {
+ HttpHeadersImpl headers = new HttpHeadersImpl();
+
+ DecodingCallback cb = (name, value) -> {
+ headers.addHeader(name.toString(), value.toString());
+ };
+
+ for (HeaderFrame frame : frames) {
+ ByteBuffer[] buffers = frame.getHeaderBlock();
+ for (ByteBuffer buffer : buffers) {
+ hpackIn.decode(buffer, false, cb);
+ }
+ }
+ hpackIn.decode(EMPTY_BUFFER, true, cb);
+ return headers;
+ }
+
+ String getRequestLine(String request) {
+ int eol = request.indexOf(CRLF);
+ return request.substring(0, eol);
+ }
+
+ // First stream (1) comes from a plaintext HTTP/1.1 request
+ @SuppressWarnings({"rawtypes","unchecked"})
+ void createPrimordialStream(String request) throws IOException {
+ HttpHeadersImpl headers = new HttpHeadersImpl();
+ String requestLine = getRequestLine(request);
+ String[] tokens = requestLine.split(" ");
+ if (!tokens[2].equals("HTTP/1.1")) {
+ throw new IOException("bad request line");
+ }
+ URI uri = null;
+ try {
+ uri = new URI(tokens[1]);
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ String host = getHeader(request, "Host");
+ if (host == null) {
+ throw new IOException("missing Host");
+ }
+
+ headers.setHeader(":method", tokens[0]);
+ headers.setHeader(":scheme", "http"); // always in this case
+ headers.setHeader(":authority", host);
+ headers.setHeader(":path", uri.getPath());
+ Queue q = new Queue();
+ String body = getRequestBody(request);
+ headers.setHeader("Content-length", Integer.toString(body.length()));
+
+ addRequestBodyToQueue(body, q);
+ streams.put(1, q);
+ exec.submit(() -> {
+ handleRequest(headers, q, 1);
+ });
+ }
+
+ // all other streams created here
+ @SuppressWarnings({"rawtypes","unchecked"})
+ void createStream(HeaderFrame frame) throws IOException {
+ List<HeaderFrame> frames = new LinkedList<>();
+ frames.add(frame);
+ int streamid = frame.streamid();
+ if (streamid != nextstream) {
+ throw new IOException("unexpected stream id");
+ }
+ nextstream += 2;
+
+ while (!frame.getFlag(HeaderFrame.END_HEADERS)) {
+ Http2Frame f = readFrame();
+ if (!(f instanceof HeaderFrame)) {
+ handleCommonFrame(f); // should only be error frames
+ } else {
+ frame = (HeaderFrame) f;
+ frames.add(frame);
+ }
+ }
+ HttpHeadersImpl headers = decodeHeaders(frames);
+ Queue q = new Queue();
+ streams.put(streamid, q);
+ exec.submit(() -> {
+ handleRequest(headers, q, streamid);
+ });
+ }
+
+ // runs in own thread. Handles request from start to finish. Incoming frames
+ // for this stream/request delivered on Q
+
+ @SuppressWarnings({"rawtypes","unchecked"})
+ void handleRequest(HttpHeadersImpl headers, Queue queue, int streamid) {
+ String method = headers.firstValue(":method").orElse("");
+ System.out.println("method = " + method);
+ String path = headers.firstValue(":path").orElse("");
+ System.out.println("path = " + path);
+ String scheme = headers.firstValue(":scheme").orElse("");
+ System.out.println("scheme = " + scheme);
+ String authority = headers.firstValue(":authority").orElse("");
+ System.out.println("authority = " + authority);
+ HttpHeadersImpl rspheaders = new HttpHeadersImpl();
+ int winsize = clientSettings.getParameter(
+ SettingsFrame.INITIAL_WINDOW_SIZE);
+ System.err.println ("Stream window size = " + winsize);
+ try (
+ BodyInputStream bis = new BodyInputStream(queue, streamid, this);
+ BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this);
+ )
+ {
+ String us = scheme + "://" + authority + path;
+ URI uri = new URI(us);
+ boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1;
+ Http2TestExchange exchange = new Http2TestExchange(streamid, method,
+ headers, rspheaders, uri, bis, bos, this, pushAllowed);
+
+ // give to user
+ handler.handle(exchange);
+
+ // everything happens in the exchange from here. Hopefully will
+ // return though.
+ } catch (Throwable e) {
+ System.err.println("TestServer: handleRequest exception: " + e);
+ e.printStackTrace();
+ }
+ }
+
+ // Runs in own thread
+
+ @SuppressWarnings({"rawtypes","unchecked"})
+ void readLoop() {
+ try {
+ while (!stopping) {
+ Http2Frame frame = readFrame();
+ int stream = frame.streamid();
+ if (stream == 0) {
+ if (frame.type() == WindowUpdateFrame.TYPE) {
+ WindowUpdateFrame wup = (WindowUpdateFrame) frame;
+ updateConnectionWindow(wup.getUpdate());
+ } else {
+ // other common frame types
+ handleCommonFrame(frame);
+ }
+ } else {
+ Queue q = streams.get(stream);
+ if (frame.type() == HeadersFrame.TYPE) {
+ if (q != null) {
+ System.err.println("HEADERS frame for existing stream! Error.");
+ // TODO: close connection
+ continue;
+ } else {
+ createStream((HeadersFrame) frame);
+ }
+ } else {
+ if (q == null) {
+ System.err.printf("Non Headers frame received with"+
+ " non existing stream (%d) ", frame.streamid());
+ System.err.println(frame);
+ continue;
+ }
+ if (frame.type() == WindowUpdateFrame.TYPE) {
+ WindowUpdateFrame wup = (WindowUpdateFrame) frame;
+ synchronized (updaters) {
+ Consumer<Integer> r = updaters.get(stream);
+ r.accept(wup.getUpdate());
+ }
+ } else {
+ q.put(frame);
+ }
+ }
+ }
+ }
+ } catch (Throwable e) {
+ close();
+ if (!stopping) {
+ System.err.println("Http server reader thread shutdown");
+ e.printStackTrace();
+ }
+ }
+ }
+
+ // set streamid outside plus other specific fields
+ void encodeHeaders(HttpHeadersImpl headers, HeaderFrame out) {
+ List<ByteBuffer> buffers = new LinkedList<>();
+
+ ByteBuffer buf = getBuffer();
+ boolean encoded;
+ for (Map.Entry<String, List<String>> entry : headers.map().entrySet()) {
+ List<String> values = entry.getValue();
+ String key = entry.getKey().toLowerCase();
+ for (String value : values) {
+ do {
+ hpackOut.header(key, value);
+ encoded = hpackOut.encode(buf);
+ if (!encoded) {
+ buf.flip();
+ buffers.add(buf);
+ buf = getBuffer();
+ }
+ } while (!encoded);
+ }
+ }
+ buf.flip();
+ buffers.add(buf);
+ out.setFlags(HeaderFrame.END_HEADERS);
+ out.setHeaderBlock(buffers.toArray(bbarray));
+ }
+
+ static void closeIgnore(Closeable c) {
+ try {
+ c.close();
+ } catch (IOException e) {}
+ }
+
+ // Runs in own thread
+ void writeLoop() {
+ try {
+ while (!stopping) {
+ Http2Frame frame = outputQ.take();
+ if (frame instanceof ResponseHeaders) {
+ ResponseHeaders rh = (ResponseHeaders)frame;
+ HeadersFrame hf = new HeadersFrame();
+ encodeHeaders(rh.headers, hf);
+ hf.streamid(rh.streamid());
+ writeFrame(hf);
+ } else if (frame instanceof OutgoingPushPromise) {
+ handlePush((OutgoingPushPromise)frame);
+ } else
+ writeFrame(frame);
+ }
+ System.err.println("Connection writer stopping");
+ } catch (Throwable e) {
+ e.printStackTrace();
+ /*close();
+ if (!stopping) {
+ e.printStackTrace();
+ System.err.println("TestServer: writeLoop exception: " + e);
+ }*/
+ }
+ }
+
+ private void handlePush(OutgoingPushPromise op) throws IOException {
+ PushPromiseFrame pp = new PushPromiseFrame();
+ encodeHeaders(op.headers, pp);
+ int promisedStreamid = nextPushStreamId;
+ nextPushStreamId += 2;
+ pp.streamid(op.parentStream);
+ pp.setPromisedStream(promisedStreamid);
+ writeFrame(pp);
+ final InputStream ii = op.is;
+ final BodyOutputStream oo = new BodyOutputStream(
+ promisedStreamid,
+ clientSettings.getParameter(
+ SettingsFrame.INITIAL_WINDOW_SIZE), this);
+ oo.goodToGo();
+ exec.submit(() -> {
+ try {
+ ResponseHeaders oh = getPushResponse(promisedStreamid);
+ outputQ.put(oh);
+ ii.transferTo(oo);
+ } catch (Throwable ex) {
+ System.err.printf("TestServer: pushing response error: %s\n",
+ ex.toString());
+ } finally {
+ closeIgnore(ii);
+ closeIgnore(oo);
+ }
+ });
+
+ }
+
+ // returns a minimal response with status 200
+ // that is the response to the push promise just sent
+ private ResponseHeaders getPushResponse(int streamid) {
+ HttpHeadersImpl h = new HttpHeadersImpl();
+ h.addHeader(":status", "200");
+ ResponseHeaders oh = new ResponseHeaders(h);
+ oh.streamid(streamid);
+ return oh;
+ }
+
+ private ByteBuffer getBuffer() {
+ return ByteBuffer.allocate(8 * 1024);
+ }
+
+ private Http2Frame readFrame() throws IOException {
+ byte[] buf = new byte[9];
+ if (is.readNBytes(buf, 0, 9) != 9)
+ throw new IOException("readFrame: connection closed");
+ int len = 0;
+ for (int i = 0; i < 3; i++) {
+ int n = buf[i] & 0xff;
+ //System.err.println("n = " + n);
+ len = (len << 8) + n;
+ }
+ byte[] rest = new byte[len];
+ int n = is.readNBytes(rest, 0, len);
+ if (n != len)
+ throw new IOException("Error reading frame");
+ ByteBufferConsumer bc = new ByteBufferConsumer(
+ new LinkedList<ByteBuffer>(List.of(ByteBuffer.wrap(buf), ByteBuffer.wrap(rest))),
+ this::getBuffer);
+ return Http2Frame.readIncoming(bc);
+ }
+
+ void sendSettingsFrame() throws IOException {
+ sendSettingsFrame(false);
+ }
+
+ void sendSettingsFrame(boolean now) throws IOException {
+ if (serverSettings == null) {
+ serverSettings = SettingsFrame.getDefaultSettings();
+ }
+ if (now) {
+ writeFrame(serverSettings);
+ } else {
+ outputQ.put(serverSettings);
+ }
+ }
+
+ String readUntil(String end) throws IOException {
+ int number = end.length();
+ int found = 0;
+ StringBuilder sb = new StringBuilder();
+ while (found < number) {
+ char expected = end.charAt(found);
+ int c = is.read();
+ if (c == -1) {
+ throw new IOException("Connection closed");
+ }
+ char c0 = (char) c;
+ sb.append(c0);
+ if (c0 != expected) {
+ found = 0;
+ continue;
+ }
+ found++;
+ }
+ return sb.toString();
+ }
+
+ private int getContentLength(String headers) {
+ return getIntHeader(headers, "Content-length");
+ }
+
+ private int getIntHeader(String headers, String name) {
+ String val = getHeader(headers, name);
+ if (val == null) {
+ return -1;
+ }
+ return Integer.parseInt(val);
+ }
+
+ private String getHeader(String headers, String name) {
+ String headers1 = headers.toLowerCase(); // not efficient
+ name = CRLF + name.toLowerCase();
+ int start = headers1.indexOf(name);
+ if (start == -1) {
+ return null;
+ }
+ start += 2;
+ int end = headers1.indexOf(CRLF, start);
+ String line = headers.substring(start, end);
+ start = line.indexOf(':');
+ if (start == -1) {
+ return null;
+ }
+ return line.substring(start + 1).trim();
+ }
+
+ final static String CRLF = "\r\n";
+
+ String readHttp1Request() throws IOException {
+ String headers = readUntil(CRLF + CRLF);
+ int clen = getContentLength(headers);
+ // read the content. There shouldn't be content but ..
+ byte[] buf = new byte[clen];
+ is.readNBytes(buf, 0, clen);
+ String body = new String(buf, "US-ASCII");
+ return headers + body;
+ }
+
+ void sendHttp1Response(int code, String msg, String... headers) throws IOException {
+ StringBuilder sb = new StringBuilder();
+ sb.append("HTTP/1.1 ")
+ .append(code)
+ .append(' ')
+ .append(msg)
+ .append(CRLF);
+ int numheaders = headers.length;
+ for (int i = 0; i < numheaders; i += 2) {
+ sb.append(headers[i])
+ .append(": ")
+ .append(headers[i + 1])
+ .append(CRLF);
+ }
+ sb.append(CRLF);
+ String s = sb.toString();
+ os.write(s.getBytes("US-ASCII"));
+ os.flush();
+ }
+
+ private void unexpectedFrame(Http2Frame frame) {
+ System.err.println("OOPS. Unexpected");
+ assert false;
+ }
+
+ final static ByteBuffer[] bbarray = new ByteBuffer[0];
+
+ // wrapper around a BlockingQueue that throws an exception when it's closed
+ // Each stream has one of these
+
+ String getRequestBody(String request) {
+ int bodystart = request.indexOf(CRLF+CRLF);
+ String body;
+ if (bodystart == -1)
+ body = "";
+ else
+ body = request.substring(bodystart+4);
+ return body;
+ }
+
+ @SuppressWarnings({"rawtypes","unchecked"})
+ void addRequestBodyToQueue(String body, Queue q) throws IOException {
+ ByteBuffer buf = ByteBuffer.wrap(body.getBytes(StandardCharsets.US_ASCII));
+ DataFrame df = new DataFrame();
+ df.streamid(1); // only used for primordial stream
+ df.setData(buf);
+ df.computeLength();
+ df.setFlag(DataFrame.END_STREAM);
+ q.put(df);
+ }
+
+ // window updates done in main reader thread because they may
+ // be used to unblock BodyOutputStreams waiting for WUPs
+
+ HashMap<Integer,Consumer<Integer>> updaters = new HashMap<>();
+
+ void registerStreamWindowUpdater(int streamid, Consumer<Integer> r) {
+ synchronized(updaters) {
+ updaters.put(streamid, r);
+ }
+ }
+
+ int sendWindow = 64 * 1024 - 1; // connection level send window
+
+ /**
+ * BodyOutputStreams call this to get the connection window first.
+ *
+ * @param amount
+ */
+ synchronized void obtainConnectionWindow(int amount) throws InterruptedException {
+ while (amount > 0) {
+ int n = Math.min(amount, sendWindow);
+ amount -= n;
+ sendWindow -= n;
+ if (amount > 0)
+ wait();
+ }
+ }
+
+ synchronized void updateConnectionWindow(int amount) {
+ sendWindow += amount;
+ notifyAll();
+ }
+
+ // simplified output headers class. really just a type safe container
+ // for the hashmap.
+
+ static class ResponseHeaders extends Http2Frame {
+ HttpHeadersImpl headers;
+
+ ResponseHeaders(HttpHeadersImpl headers) {
+ this.headers = headers;
+ }
+
+ @Override
+ void readIncomingImpl(ByteBufferConsumer bc) throws IOException {
+ throw new UnsupportedOperationException("Not supported ever!");
+ }
+
+ @Override
+ void computeLength() {
+ throw new UnsupportedOperationException("Not supported ever!");
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/http2/java.httpclient/java/net/http/OutgoingPushPromise.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,31 @@
+package java.net.http;
+
+import java.io.*;
+import java.net.*;
+
+// will be converted to a PushPromiseFrame in the writeLoop
+// a thread is then created to produce the DataFrames from the InputStream
+class OutgoingPushPromise extends Http2Frame {
+ final HttpHeadersImpl headers;
+ final URI uri;
+ final InputStream is;
+ final int parentStream; // not the pushed streamid
+
+ OutgoingPushPromise(int parentStream, URI uri, HttpHeadersImpl headers, InputStream is) {
+ this.uri = uri;
+ this.headers = headers;
+ this.is = is;
+ this.parentStream = parentStream;
+ }
+
+ @Override
+ void readIncomingImpl(ByteBufferConsumer bc) throws IOException {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ void computeLength() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/http2/java.httpclient/java/net/http/PushHandler.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package java.net.http;
+
+import java.io.*;
+import java.net.*;
+import java.nio.file.*;
+import java.nio.file.attribute.*;
+import java.net.http.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+public class PushHandler implements Http2Handler {
+
+ final Path tempFile;
+ final int loops;
+
+ public PushHandler(int file_size, int loops) throws Exception {
+ tempFile = TestUtil.getAFile(file_size);
+ this.loops = loops;
+ }
+
+ int invocation = 0;
+
+ public void handle(Http2TestExchange ee) {
+ try {
+ System.err.println ("Server: handle " + ee);
+ invocation++;
+
+ if (ee.serverPushAllowed()) {
+ for (int i=0; i<loops; i++) {
+ InputStream is = new FileInputStream(tempFile.toFile());
+ URI u = new URI ("http://www.foo.com/" + Integer.toString(i));
+ HttpHeadersImpl h = new HttpHeadersImpl();
+ h.addHeader("X-foo", "bar");
+ ee.serverPush(u, h, is);
+ }
+ System.err.println ("Server: sent all pushes");
+ }
+ ee.sendResponseHeaders(200, 0);
+ OutputStream os = ee.getResponseBody();
+ InputStream iis = new FileInputStream(tempFile.toFile());
+ iis.transferTo(os);
+ os.close();
+ iis.close();
+ } catch (Exception ex) {
+ System.err.println ("Server: exception " + ex);
+ }
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/net/httpclient/http2/java.httpclient/java/net/http/TestUtil.java Sat Apr 30 00:30:31 2016 +0100
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package java.net.http;
+
+import java.io.*;
+import java.net.*;
+import java.nio.file.*;
+import java.util.concurrent.*;
+import java.util.Arrays;
+
+public class TestUtil {
+
+ final static String fileContent = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; // repeated
+
+ public static Path getAFile(int size) throws IOException {
+ Path p = tempFile();
+ BufferedWriter writer = Files.newBufferedWriter(p);
+ int len = fileContent.length();
+ int iterations = size / len;
+ int remainder = size - (iterations * len);
+ for (int i=0; i<iterations; i++)
+ writer.write(fileContent, 0, len);
+ writer.write(fileContent, 0, remainder);
+ writer.close();
+ return p;
+ }
+
+ public static Path tempFile() {
+ try {
+ Path p = Files.createTempFile("foo", "test");
+ p.toFile().deleteOnExit();
+ return p;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public static Void compareFiles(Path path1, Path path2) {
+ try {
+ if (Files.size(path1) != Files.size(path2))
+ throw new RuntimeException("File sizes do not match");
+ compareContents(path1, path2);
+ return null;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ static void compareContents(Path path1, Path path2) {
+ try {
+ byte[] b1 = Files.readAllBytes(path1);
+ byte[] b2 = Files.readAllBytes(path2);
+ if (!Arrays.equals(b1, b2))
+ throw new RuntimeException ("Files do not match");
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+}
--- a/jdk/test/java/net/httpclient/security/15.policy Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/test/java/net/httpclient/security/15.policy Sat Apr 30 00:30:31 2016 +0100
@@ -16,7 +16,7 @@
permission java.net.URLPermission "socket://127.0.0.1:27301", "CONNECT";
// Test checks for this explicitly
- permission java.net.RuntimePermission "foobar";
+ permission java.net.RuntimePermission "foobar";
};