# HG changeset patch # User dfuchs # Date 1521645131 0 # Node ID 7e56c39fa1faab50f216ee44ff153b4def498168 # Parent ca27c57cb857c8978a621080546eab6b58b9bee7 http-client-branch: bye, bye, SSLDelegate diff -r ca27c57cb857 -r 7e56c39fa1fa src/java.net.http/share/classes/jdk/internal/net/http/AbstractAsyncSSLConnection.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/AbstractAsyncSSLConnection.java Wed Mar 21 14:11:38 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/AbstractAsyncSSLConnection.java Wed Mar 21 15:12:11 2018 +0000 @@ -25,17 +25,13 @@ package jdk.internal.net.http; -import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import javax.net.ssl.SNIHostName; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLParameters; import jdk.internal.net.http.common.SSLTube; @@ -126,69 +122,4 @@ return true; } - // Support for WebSocket/RawChannelImpl which unfortunately - // still depends on synchronous read/writes. - // It should be removed when RawChannelImpl moves to using asynchronous APIs. - static final class SSLConnectionChannel extends DetachedConnectionChannel { - final DetachedConnectionChannel delegate; - final SSLDelegate sslDelegate; - SSLConnectionChannel(DetachedConnectionChannel delegate, SSLDelegate sslDelegate) { - this.delegate = delegate; - this.sslDelegate = sslDelegate; - } - - SocketChannel channel() { - return delegate.channel(); - } - - @Override - ByteBuffer read() throws IOException { - SSLDelegate.WrapperResult r = sslDelegate.recvData(ByteBuffer.allocate(8192)); - // TODO: check for closure - int n = r.result.bytesProduced(); - if (n > 0) { - return r.buf; - } else if (n == 0) { - return Utils.EMPTY_BYTEBUFFER; - } else { - return null; - } - } - @Override - long write(ByteBuffer[] buffers, int start, int number) throws IOException { - long l = SSLDelegate.countBytes(buffers, start, number); - SSLDelegate.WrapperResult r = sslDelegate.sendData(buffers, start, number); - if (r.result.getStatus() == SSLEngineResult.Status.CLOSED) { - if (l > 0) { - throw new IOException("SSLHttpConnection closed"); - } - } - return l; - } - @Override - public void shutdownInput() throws IOException { - delegate.shutdownInput(); - } - @Override - public void shutdownOutput() throws IOException { - delegate.shutdownOutput(); - } - @Override - public void close() { - delegate.close(); - } - } - - // Support for WebSocket/RawChannelImpl which unfortunately - // still depends on synchronous read/writes. - // It should be removed when RawChannelImpl moves to using asynchronous APIs. - @Override - DetachedConnectionChannel detachChannel() { - assert client() != null; - DetachedConnectionChannel detachedChannel = plainConnection().detachChannel(); - SSLDelegate sslDelegate = new SSLDelegate(engine, - detachedChannel.channel()); - return new SSLConnectionChannel(detachedChannel, sslDelegate); - } - } diff -r ca27c57cb857 -r 7e56c39fa1fa src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLConnection.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLConnection.java Wed Mar 21 14:11:38 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLConnection.java Wed Mar 21 15:12:11 2018 +0000 @@ -25,8 +25,6 @@ package jdk.internal.net.http; -import java.io.IOException; -import java.lang.System.Logger.Level; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; import java.util.concurrent.CompletableFuture; @@ -97,19 +95,7 @@ } @Override - void shutdownInput() throws IOException { - debug.log(Level.DEBUG, "plainConnection.channel().shutdownInput()"); - plainConnection.channel().shutdownInput(); - } - - @Override - void shutdownOutput() throws IOException { - debug.log(Level.DEBUG, "plainConnection.channel().shutdownOutput()"); - plainConnection.channel().shutdownOutput(); - } - - @Override - SSLTube getConnectionFlow() { + SSLTube getConnectionFlow() { return flow; } } diff -r ca27c57cb857 -r 7e56c39fa1fa src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLTunnelConnection.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLTunnelConnection.java Wed Mar 21 14:11:38 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLTunnelConnection.java Wed Mar 21 15:12:11 2018 +0000 @@ -25,7 +25,6 @@ package jdk.internal.net.http; -import java.io.IOException; import java.lang.System.Logger.Level; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; @@ -103,16 +102,6 @@ } @Override - void shutdownInput() throws IOException { - plainConnection.channel().shutdownInput(); - } - - @Override - void shutdownOutput() throws IOException { - plainConnection.channel().shutdownOutput(); - } - - @Override SocketChannel channel() { return plainConnection.channel(); } diff -r ca27c57cb857 -r 7e56c39fa1fa src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Wed Mar 21 14:11:38 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Wed Mar 21 15:12:11 2018 +0000 @@ -367,18 +367,6 @@ } /** - * Only used from RawChannel to disconnect the channel from - * the selector - */ - void cancelRegistration(SocketChannel s) { - selmgr.cancel(s); - } - - void detachChannel(SocketChannel s, AsyncEvent... events) { - selmgr.detach(s, events); - } - - /** * Allows an AsyncEvent to modify its interestOps. * @param event The modified event. */ @@ -533,40 +521,6 @@ selector = Selector.open(); } - void detach(SelectableChannel channel, AsyncEvent... events) { - if (Thread.currentThread() == this) { - debug.log(Level.DEBUG, "detaching channel"); - SelectionKey key = channel.keyFor(selector); - if (key != null) { - boolean removed = false; - SelectorAttachment sa = (SelectorAttachment) key.attachment(); - if (sa != null) { - for (AsyncEvent e : events) { - if (sa.pending.remove(e)) removed = true; - } - // The key could already have been cancelled, in which - // case the events would already have been removed. - if (removed) { - // We found at least one of the events, so we - // should now cancel the key. - sa.resetInterestOps(0); - key.cancel(); - } - } - } - registrations.removeAll(Arrays.asList(events)); - debug.log(Level.DEBUG, "channel detached"); - } else { - synchronized (this) { - debug.log(Level.DEBUG, "scheduling event to detach channel"); - deregistrations.add(new AsyncTriggerEvent( - (x) -> debug.log(Level.DEBUG, - "Unexpected exception raised while detaching channel", x), - () -> detach(channel, events))); - } - } - } - void eventUpdated(AsyncEvent e) throws ClosedChannelException { if (Thread.currentThread() == this) { SelectionKey key = e.channel().keyFor(selector); @@ -879,10 +833,6 @@ } } - boolean deregister(AsyncEvent e) { - return pending.remove(e); - } - /** * Returns a Stream containing only events that are * registered with the given {@code interestOps}. @@ -1008,8 +958,8 @@ // Make sure to pass the HttpClientFacade to the WebSocket builder. // This will ensure that the facade is not released before the // WebSocket has been created, at which point the pendingOperationCount - // will have been incremented by the DetachedConnectionChannel - // (see PlainHttpConnection.detachChannel()) + // will have been incremented by the RawChannelTube. + // See RawChannelTube. return new BuilderImpl(this.facade(), proxySelector); } diff -r ca27c57cb857 -r 7e56c39fa1fa src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java Wed Mar 21 14:11:38 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java Wed Mar 21 15:12:11 2018 +0000 @@ -346,34 +346,6 @@ @Override public abstract void close(); - abstract void shutdownInput() throws IOException; - - abstract void shutdownOutput() throws IOException; - - // Support for WebSocket/RawChannelImpl which unfortunately - // still depends on synchronous read/writes. - // It should be removed when RawChannelImpl moves to using asynchronous APIs. - abstract static class DetachedConnectionChannel implements Closeable { - DetachedConnectionChannel() {} - abstract SocketChannel channel(); - abstract long write(ByteBuffer[] buffers, int start, int number) - throws IOException; - abstract void shutdownInput() throws IOException; - abstract void shutdownOutput() throws IOException; - abstract ByteBuffer read() throws IOException; - @Override - public abstract void close(); - @Override - public String toString() { - return this.getClass().getSimpleName() + ": " + channel().toString(); - } - } - - // Support for WebSocket/RawChannelImpl which unfortunately - // still depends on synchronous read/writes. - // It should be removed when RawChannelImpl moves to using asynchronous APIs. - abstract DetachedConnectionChannel detachChannel(); - abstract FlowTube getConnectionFlow(); /** diff -r ca27c57cb857 -r 7e56c39fa1fa src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Wed Mar 21 14:11:38 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Wed Mar 21 15:12:11 2018 +0000 @@ -199,17 +199,6 @@ } } - @Override - void shutdownInput() throws IOException { - debug.log(Level.DEBUG, "Shutting down input"); - chan.shutdownInput(); - } - - @Override - void shutdownOutput() throws IOException { - debug.log(Level.DEBUG, "Shutting down output"); - chan.shutdownOutput(); - } @Override ConnectionPool.CacheKey cacheKey() { @@ -232,91 +221,4 @@ return false; } - // Support for WebSocket/RawChannelImpl which unfortunately - // still depends on synchronous read/writes. - // It should be removed when RawChannelImpl moves to using asynchronous APIs. - private static final class PlainDetachedChannel - extends DetachedConnectionChannel { - final PlainHttpConnection plainConnection; - boolean closed; - PlainDetachedChannel(PlainHttpConnection conn) { - // We're handing the connection channel over to a web socket. - // We need the selector manager's thread to stay alive until - // the WebSocket is closed. - conn.client().webSocketOpen(); - this.plainConnection = conn; - } - - @Override - SocketChannel channel() { - return plainConnection.channel(); - } - - @Override - ByteBuffer read() throws IOException { - ByteBuffer dst = ByteBuffer.allocate(8192); - int n = readImpl(dst); - if (n > 0) { - return dst; - } else if (n == 0) { - return Utils.EMPTY_BYTEBUFFER; - } else { - return null; - } - } - - @Override - public void close() { - HttpClientImpl client = plainConnection.client(); - try { - plainConnection.close(); - } finally { - // notify the HttpClientImpl that the websocket is no - // no longer operating. - synchronized(this) { - if (closed == true) return; - closed = true; - } - client.webSocketClose(); - } - } - - @Override - public long write(ByteBuffer[] buffers, int start, int number) - throws IOException - { - return channel().write(buffers, start, number); - } - - @Override - public void shutdownInput() throws IOException { - plainConnection.shutdownInput(); - } - - @Override - public void shutdownOutput() throws IOException { - plainConnection.shutdownOutput(); - } - - private int readImpl(ByteBuffer buf) throws IOException { - int mark = buf.position(); - int n; - n = channel().read(buf); - if (n == -1) { - return -1; - } - Utils.flipToMark(buf, mark); - return n; - } - } - - // Support for WebSocket/RawChannelImpl which unfortunately - // still depends on synchronous read/writes. - // It should be removed when RawChannelImpl moves to using asynchronous APIs. - @Override - DetachedConnectionChannel detachChannel() { - tube.detach(); - return new PlainDetachedChannel(this); - } - } diff -r ca27c57cb857 -r 7e56c39fa1fa src/java.net.http/share/classes/jdk/internal/net/http/PlainTunnelingConnection.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/PlainTunnelingConnection.java Wed Mar 21 14:11:38 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/PlainTunnelingConnection.java Wed Mar 21 15:12:11 2018 +0000 @@ -137,16 +137,6 @@ } @Override - void shutdownInput() throws IOException { - delegate.shutdownInput(); - } - - @Override - void shutdownOutput() throws IOException { - delegate.shutdownOutput(); - } - - @Override boolean isSecure() { return false; } @@ -156,11 +146,4 @@ return true; } - // Support for WebSocket/RawChannelImpl which unfortunately - // still depends on synchronous read/writes. - // It should be removed when RawChannelImpl moves to using asynchronous APIs. - @Override - DetachedConnectionChannel detachChannel() { - return delegate.detachChannel(); - } } diff -r ca27c57cb857 -r 7e56c39fa1fa src/java.net.http/share/classes/jdk/internal/net/http/RawChannelImpl.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/RawChannelImpl.java Wed Mar 21 14:11:38 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,156 +0,0 @@ -/* - * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.internal.net.http; - -import jdk.internal.net.http.common.Utils; -import jdk.internal.net.http.websocket.RawChannel; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SocketChannel; -import java.util.function.Supplier; - -/* - * Each RawChannel corresponds to a TCP connection (SocketChannel) but is - * connected to a Selector and an ExecutorService for invoking the send and - * receive callbacks. Also includes SSL processing. - */ -final class RawChannelImpl implements RawChannel { - - private final HttpClientImpl client; - private final HttpConnection.DetachedConnectionChannel detachedChannel; - private final Object initialLock = new Object(); - private Supplier initial; - - RawChannelImpl(HttpClientImpl client, - HttpConnection connection, - Supplier initial) - throws IOException - { - this.client = client; - this.detachedChannel = connection.detachChannel(); - this.initial = initial; - - SocketChannel chan = connection.channel(); - client.cancelRegistration(chan); - // Constructing a RawChannel is supposed to have a "hand over" - // semantics, in other words if construction fails, the channel won't be - // needed by anyone, in which case someone still needs to close it - try { - chan.configureBlocking(false); - } catch (IOException e) { - try { - chan.close(); - } catch (IOException e1) { - e.addSuppressed(e1); - } finally { - detachedChannel.close(); - } - throw e; - } - } - - private class NonBlockingRawAsyncEvent extends AsyncEvent { - - private final RawEvent re; - - NonBlockingRawAsyncEvent(RawEvent re) { - // !BLOCKING & !REPEATING - this.re = re; - } - - @Override - public SelectableChannel channel() { - return detachedChannel.channel(); - } - - @Override - public int interestOps() { - return re.interestOps(); - } - - @Override - public void handle() { - re.handle(); - } - - @Override - public void abort(IOException ioe) { } - } - - @Override - public void registerEvent(RawEvent event) throws IOException { - client.registerEvent(new NonBlockingRawAsyncEvent(event)); - } - - @Override - public ByteBuffer read() throws IOException { - assert !detachedChannel.channel().isBlocking(); - // connection.read() will no longer be available. - return detachedChannel.read(); - } - - @Override - public ByteBuffer initialByteBuffer() { - synchronized (initialLock) { - if (initial == null) { - throw new IllegalStateException(); - } - ByteBuffer ref = initial.get(); - ref = ref.hasRemaining() ? Utils.copy(ref) - : Utils.EMPTY_BYTEBUFFER; - initial = null; - return ref; - } - } - - @Override - public long write(ByteBuffer[] src, int offset, int len) throws IOException { - // this makes the whitebox driver test fail. - return detachedChannel.write(src, offset, len); - } - - @Override - public void shutdownInput() throws IOException { - detachedChannel.shutdownInput(); - } - - @Override - public void shutdownOutput() throws IOException { - detachedChannel.shutdownOutput(); - } - - @Override - public void close() { - detachedChannel.close(); - } - - @Override - public String toString() { - return super.toString() + "(" + detachedChannel.toString() + ")"; - } -} diff -r ca27c57cb857 -r 7e56c39fa1fa src/java.net.http/share/classes/jdk/internal/net/http/SSLDelegate.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/SSLDelegate.java Wed Mar 21 14:11:38 2018 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,491 +0,0 @@ -/* - * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.internal.net.http; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import javax.net.ssl.SSLEngineResult.HandshakeStatus; -import javax.net.ssl.SSLEngineResult.Status; -import javax.net.ssl.*; -import jdk.internal.net.http.common.Log; -import jdk.internal.net.http.common.Utils; -import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*; - -/** - * Implements the mechanics of SSL by managing an SSLEngine object. - *

- * This class is only used to implement the {@link - * AbstractAsyncSSLConnection.SSLConnectionChannel} which is handed of - * to RawChannelImpl when creating a WebSocket. - */ -class SSLDelegate { - - final SSLEngine engine; - final EngineWrapper wrapper; - final Lock handshaking = new ReentrantLock(); - final SocketChannel chan; - - SSLDelegate(SSLEngine eng, SocketChannel chan) - { - this.engine = eng; - this.chan = chan; - this.wrapper = new EngineWrapper(chan, engine); - } - - // alpn[] may be null -// SSLDelegate(SocketChannel chan, HttpClientImpl client, String[] alpn, String sn) -// throws IOException -// { -// serverName = sn; -// SSLContext context = client.sslContext(); -// engine = context.createSSLEngine(); -// engine.setUseClientMode(true); -// SSLParameters sslp = client.sslParameters(); -// sslParameters = Utils.copySSLParameters(sslp); -// if (sn != null) { -// SNIHostName sni = new SNIHostName(sn); -// sslParameters.setServerNames(List.of(sni)); -// } -// if (alpn != null) { -// sslParameters.setApplicationProtocols(alpn); -// Log.logSSL("SSLDelegate: Setting application protocols: {0}" + Arrays.toString(alpn)); -// } else { -// Log.logSSL("SSLDelegate: No application protocols proposed"); -// } -// engine.setSSLParameters(sslParameters); -// wrapper = new EngineWrapper(chan, engine); -// this.chan = chan; -// this.client = client; -// } - -// SSLParameters getSSLParameters() { -// return sslParameters; -// } - - static long countBytes(ByteBuffer[] buffers, int start, int number) { - long c = 0; - for (int i=0; i packet_buf_size) { - packet_buf_size = len; - } - size = packet_buf_size; - } else { - if (app_buf_size == 0) { - SSLSession sess = engine.getSession(); - app_buf_size = sess.getApplicationBufferSize(); - } - if (len > app_buf_size) { - app_buf_size = len; - } - size = app_buf_size; - } - return ByteBuffer.allocate (size); - } - } - - /* reallocates the buffer by :- - * 1. creating a new buffer double the size of the old one - * 2. putting the contents of the old buffer into the new one - * 3. set xx_buf_size to the new size if it was smaller than new size - * - * flip is set to true if the old buffer needs to be flipped - * before it is copied. - */ - private ByteBuffer realloc (ByteBuffer b, boolean flip, BufType type) { - // TODO: there should be the linear growth, rather than exponential as - // we definitely know the maximum amount of space required to unwrap - synchronized (this) { - int nsize = 2 * b.capacity(); - ByteBuffer n = allocate (type, nsize); - if (flip) { - b.flip(); - } - n.put(b); - b = n; - } - return b; - } - - /** - * This is a thin wrapper over SSLEngine and the SocketChannel, which - * guarantees the ordering of wraps/unwraps with respect to the underlying - * channel read/writes. It handles the UNDER/OVERFLOW status codes - * It does not handle the handshaking status codes, or the CLOSED status code - * though once the engine is closed, any attempt to read/write to it - * will get an exception. The overall result is returned. - * It functions synchronously/blocking - */ - class EngineWrapper { - - SocketChannel chan; - SSLEngine engine; - final Object wrapLock; - final Object unwrapLock; - ByteBuffer unwrap_src, wrap_dst; - boolean closed = false; - int u_remaining; // the number of bytes left in unwrap_src after an unwrap() - - EngineWrapper (SocketChannel chan, SSLEngine engine) { - this.chan = chan; - this.engine = engine; - wrapLock = new Object(); - unwrapLock = new Object(); - unwrap_src = allocate(BufType.PACKET); - wrap_dst = allocate(BufType.PACKET); - } - -// void close () throws IOException { -// } - - WrapperResult wrapAndSend(ByteBuffer src, boolean ignoreClose) - throws IOException - { - ByteBuffer[] buffers = new ByteBuffer[1]; - buffers[0] = src; - return wrapAndSend(buffers, 0, 1, ignoreClose); - } - - /* try to wrap and send the data in src. Handles OVERFLOW. - * Might block if there is an outbound blockage or if another - * thread is calling wrap(). Also, might not send any data - * if an unwrap is needed. - */ - WrapperResult wrapAndSend(ByteBuffer[] src, - int offset, - int len, - boolean ignoreClose) - throws IOException - { - if (closed && !ignoreClose) { - throw new IOException ("Engine is closed"); - } - Status status; - WrapperResult r = new WrapperResult(); - synchronized (wrapLock) { - wrap_dst.clear(); - do { - r.result = engine.wrap (src, offset, len, wrap_dst); - status = r.result.getStatus(); - if (status == Status.BUFFER_OVERFLOW) { - wrap_dst = realloc (wrap_dst, true, BufType.PACKET); - } - } while (status == Status.BUFFER_OVERFLOW); - if (status == Status.CLOSED && !ignoreClose) { - closed = true; - return r; - } - if (r.result.bytesProduced() > 0) { - wrap_dst.flip(); - int l = wrap_dst.remaining(); - assert l == r.result.bytesProduced(); - while (l>0) { - l -= chan.write (wrap_dst); - } - } - } - return r; - } - - /* block until a complete message is available and return it - * in dst, together with the Result. dst may have been re-allocated - * so caller should check the returned value in Result - * If handshaking is in progress then, possibly no data is returned - */ - WrapperResult recvAndUnwrap(ByteBuffer dst) throws IOException { - Status status; - WrapperResult r = new WrapperResult(); - r.buf = dst; - if (closed) { - throw new IOException ("Engine is closed"); - } - boolean needData; - if (u_remaining > 0) { - unwrap_src.compact(); - unwrap_src.flip(); - needData = false; - } else { - unwrap_src.clear(); - needData = true; - } - synchronized (unwrapLock) { - int x; - do { - if (needData) { - x = chan.read (unwrap_src); - if (x == -1) { - throw new IOException ("connection closed for reading"); - } - unwrap_src.flip(); - } - r.result = engine.unwrap (unwrap_src, r.buf); - status = r.result.getStatus(); - if (status == Status.BUFFER_UNDERFLOW) { - if (unwrap_src.limit() == unwrap_src.capacity()) { - /* buffer not big enough */ - unwrap_src = realloc ( - unwrap_src, false, BufType.PACKET - ); - } else { - /* Buffer not full, just need to read more - * data off the channel. Reset pointers - * for reading off SocketChannel - */ - unwrap_src.position (unwrap_src.limit()); - unwrap_src.limit (unwrap_src.capacity()); - } - needData = true; - } else if (status == Status.BUFFER_OVERFLOW) { - r.buf = realloc (r.buf, true, BufType.APPLICATION); - needData = false; - } else if (status == Status.CLOSED) { - closed = true; - r.buf.flip(); - return r; - } - } while (status != Status.OK); - } - u_remaining = unwrap_src.remaining(); - return r; - } - } - -// WrapperResult sendData (ByteBuffer src) throws IOException { -// ByteBuffer[] buffers = new ByteBuffer[1]; -// buffers[0] = src; -// return sendData(buffers, 0, 1); -// } - - /** - * send the data in the given ByteBuffer. If a handshake is needed - * then this is handled within this method. When this call returns, - * all of the given user data has been sent and any handshake has been - * completed. Caller should check if engine has been closed. - */ - WrapperResult sendData (ByteBuffer[] src, int offset, int len) throws IOException { - WrapperResult r = WrapperResult.createOK(); - while (countBytes(src, offset, len) > 0) { - r = wrapper.wrapAndSend(src, offset, len, false); - Status status = r.result.getStatus(); - if (status == Status.CLOSED) { - doClosure (); - return r; - } - HandshakeStatus hs_status = r.result.getHandshakeStatus(); - if (hs_status != HandshakeStatus.FINISHED && - hs_status != HandshakeStatus.NOT_HANDSHAKING) - { - doHandshake(hs_status); - } - } - return r; - } - - /** - * read data thru the engine into the given ByteBuffer. If the - * given buffer was not large enough, a new one is allocated - * and returned. This call handles handshaking automatically. - * Caller should check if engine has been closed. - */ - WrapperResult recvData (ByteBuffer dst) throws IOException { - /* we wait until some user data arrives */ - int mark = dst.position(); - WrapperResult r = null; - int pos = dst.position(); - while (dst.position() == pos) { - r = wrapper.recvAndUnwrap (dst); - dst = (r.buf != dst) ? r.buf: dst; - Status status = r.result.getStatus(); - if (status == Status.CLOSED) { - doClosure (); - return r; - } - - HandshakeStatus hs_status = r.result.getHandshakeStatus(); - if (hs_status != HandshakeStatus.FINISHED && - hs_status != HandshakeStatus.NOT_HANDSHAKING) - { - doHandshake (hs_status); - } - } - Utils.flipToMark(dst, mark); - return r; - } - - /* we've received a close notify. Need to call wrap to send - * the response - */ - void doClosure () throws IOException { - try { - handshaking.lock(); - ByteBuffer tmp = allocate(BufType.APPLICATION); - WrapperResult r; - do { - tmp.clear(); - tmp.flip (); - r = wrapper.wrapAndSend(tmp, true); - } while (r.result.getStatus() != Status.CLOSED); - } finally { - handshaking.unlock(); - } - } - - /* do the (complete) handshake after acquiring the handshake lock. - * If two threads call this at the same time, then we depend - * on the wrapper methods being idempotent. eg. if wrapAndSend() - * is called with no data to send then there must be no problem - */ - @SuppressWarnings("fallthrough") - void doHandshake (HandshakeStatus hs_status) throws IOException { - boolean wasBlocking; - try { - wasBlocking = chan.isBlocking(); - handshaking.lock(); - chan.configureBlocking(true); - ByteBuffer tmp = allocate(BufType.APPLICATION); - while (hs_status != HandshakeStatus.FINISHED && - hs_status != HandshakeStatus.NOT_HANDSHAKING) - { - WrapperResult r = null; - switch (hs_status) { - case NEED_TASK: - Runnable task; - while ((task = engine.getDelegatedTask()) != null) { - /* run in current thread, because we are already - * running an external Executor - */ - task.run(); - } - /* fall thru - call wrap again */ - case NEED_WRAP: - tmp.clear(); - tmp.flip(); - r = wrapper.wrapAndSend(tmp, false); - break; - - case NEED_UNWRAP: - tmp.clear(); - r = wrapper.recvAndUnwrap (tmp); - if (r.buf != tmp) { - tmp = r.buf; - } - assert tmp.position() == 0; - break; - } - if (r != null) { - hs_status = r.result.getHandshakeStatus(); - } - } - Log.logSSL(getSessionInfo()); - if (!wasBlocking) { - chan.configureBlocking(false); - } - } finally { - handshaking.unlock(); - } - } - -// static void printParams(SSLParameters p) { -// System.out.println("SSLParameters:"); -// if (p == null) { -// System.out.println("Null params"); -// return; -// } -// for (String cipher : p.getCipherSuites()) { -// System.out.printf("cipher: %s\n", cipher); -// } -// // JDK 8 EXCL START -// for (String approto : p.getApplicationProtocols()) { -// System.out.printf("application protocol: %s\n", approto); -// } -// // JDK 8 EXCL END -// for (String protocol : p.getProtocols()) { -// System.out.printf("protocol: %s\n", protocol); -// } -// if (p.getServerNames() != null) { -// for (SNIServerName sname : p.getServerNames()) { -// System.out.printf("server name: %s\n", sname.toString()); -// } -// } -// } - - String getSessionInfo() { - StringBuilder sb = new StringBuilder(); - String application = engine.getApplicationProtocol(); - SSLSession sess = engine.getSession(); - String cipher = sess.getCipherSuite(); - String protocol = sess.getProtocol(); - sb.append("Handshake complete alpn: ") - .append(application) - .append(", Cipher: ") - .append(cipher) - .append(", Protocol: ") - .append(protocol); - return sb.toString(); - } -} diff -r ca27c57cb857 -r 7e56c39fa1fa src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Wed Mar 21 14:11:38 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Wed Mar 21 15:12:11 2018 +0000 @@ -65,7 +65,6 @@ private final Supplier buffersSource; private final Object lock = new Object(); private final AtomicReference errorRef = new AtomicReference<>(); - private final AtomicBoolean detached = new AtomicBoolean(); private final InternalReadPublisher readPublisher; private final InternalWriteSubscriber writeSubscriber; private final long id = IDS.incrementAndGet(); @@ -164,23 +163,6 @@ new IOException("connection closed locally")); } - void detach() { - if (detached.compareAndSet(false, true)) { - debug.log(Level.DEBUG, "detaching tube"); - readPublisher.subscriptionImpl.readScheduler.stop(); - debug.log(Level.DEBUG, "scheduler stopped"); - SocketFlowEvent[] events = { - readPublisher.subscriptionImpl.readEvent, - writeSubscriber.writeEvent - }; - for (SocketFlowEvent event : events) { - event.pause(); - } - debug.log(Level.DEBUG, "asking HttpClientImpl to detach channel"); - client.detachChannel(channel, events); - } - } - /** * A restartable task used to process tasks in sequence. */ @@ -444,18 +426,12 @@ resumeEvent(writeEvent, this::signalError); } -// void pauseWriteEvent() { -// debug.log(Level.DEBUG, "pausing write event"); -// pauseEvent(writeEvent, this::signalError); -// } - void signalWritable() { debug.log(Level.DEBUG, "channel is writable"); tryFlushCurrent(true); } void signalError(Throwable error) { - if (detached.get()) return; debug.log(Level.DEBUG, () -> "write error: " + error); completed = true; readPublisher.signalError(error); @@ -548,7 +524,6 @@ } void signalError(Throwable error) { - if (detached.get()) return; debug.log(Level.DEBUG, () -> "error signalled " + error); if (!errorRef.compareAndSet(null, error)) { return; @@ -716,7 +691,6 @@ } final void signalError(Throwable error) { - if (detached.get()) return; if (!errorRef.compareAndSet(null, error)) { return; } @@ -725,7 +699,6 @@ } final void signalReadable() { - if (detached.get()) return; readScheduler.runOrSchedule(); } @@ -740,7 +713,6 @@ try { while(!readScheduler.isStopped()) { if (completed) return; - if (detached.get()) return; // make sure we have a subscriber if (handlePending()) { @@ -879,7 +851,6 @@ } @Override protected final void signalEvent() { - if (detached.get()) return; try { client.eventUpdated(this); sub.signalReadable(); @@ -890,7 +861,6 @@ @Override protected final void signalError(Throwable error) { - if (detached.get()) return; sub.signalError(error); } diff -r ca27c57cb857 -r 7e56c39fa1fa test/jdk/java/net/httpclient/websocket/PendingBinaryPingClose.java --- a/test/jdk/java/net/httpclient/websocket/PendingBinaryPingClose.java Wed Mar 21 14:11:38 2018 +0000 +++ b/test/jdk/java/net/httpclient/websocket/PendingBinaryPingClose.java Wed Mar 21 15:12:11 2018 +0000 @@ -55,7 +55,7 @@ System.out.printf("begin cycle #%s at %s%n", i, start); cfBinary = webSocket.sendBinary(data, last); try { - cfBinary.get(5, TimeUnit.SECONDS); + cfBinary.get(MAX_WAIT_SEC, TimeUnit.SECONDS); data.clear(); } catch (TimeoutException e) { break; diff -r ca27c57cb857 -r 7e56c39fa1fa test/jdk/java/net/httpclient/websocket/PendingBinaryPongClose.java --- a/test/jdk/java/net/httpclient/websocket/PendingBinaryPongClose.java Wed Mar 21 14:11:38 2018 +0000 +++ b/test/jdk/java/net/httpclient/websocket/PendingBinaryPongClose.java Wed Mar 21 15:12:11 2018 +0000 @@ -55,7 +55,7 @@ System.out.printf("begin cycle #%s at %s%n", i, start); cfBinary = webSocket.sendBinary(data, last); try { - cfBinary.get(5, TimeUnit.SECONDS); + cfBinary.get(MAX_WAIT_SEC, TimeUnit.SECONDS); data.clear(); } catch (TimeoutException e) { break; diff -r ca27c57cb857 -r 7e56c39fa1fa test/jdk/java/net/httpclient/websocket/PendingOperations.java --- a/test/jdk/java/net/httpclient/websocket/PendingOperations.java Wed Mar 21 14:11:38 2018 +0000 +++ b/test/jdk/java/net/httpclient/websocket/PendingOperations.java Wed Mar 21 15:12:11 2018 +0000 @@ -33,6 +33,11 @@ static final Class ISE = IllegalStateException.class; static final Class IOE = IOException.class; + // Time after which we deem that the local send buffer and remote + // receive buffer must be full. This has been heuristically determined. + // At the time of writing, using anything <= 5s on Mac will make the + // tests fail intermittently. + static final long MAX_WAIT_SEC = 10; // seconds. DummyWebSocketServer server; WebSocket webSocket; diff -r ca27c57cb857 -r 7e56c39fa1fa test/jdk/java/net/httpclient/websocket/PendingPingBinaryClose.java --- a/test/jdk/java/net/httpclient/websocket/PendingPingBinaryClose.java Wed Mar 21 14:11:38 2018 +0000 +++ b/test/jdk/java/net/httpclient/websocket/PendingPingBinaryClose.java Wed Mar 21 15:12:11 2018 +0000 @@ -25,10 +25,13 @@ * @test * @build DummyWebSocketServer * @run testng/othervm - * -Djdk.internal.httpclient.websocket.debug=true * PendingPingBinaryClose */ +// This test produce huge logs (14Mb+) so disable logging by default +// * -Djdk.internal.httpclient.debug=true +// * -Djdk.internal.httpclient.websocket.debug=true + import org.testng.annotations.Test; import java.net.http.WebSocket; @@ -55,7 +58,7 @@ System.out.printf("begin cycle #%s at %s%n", i, start); cfPing = webSocket.sendPing(data); try { - cfPing.get(5, TimeUnit.SECONDS); + cfPing.get(MAX_WAIT_SEC, TimeUnit.SECONDS); data.clear(); } catch (TimeoutException e) { break; diff -r ca27c57cb857 -r 7e56c39fa1fa test/jdk/java/net/httpclient/websocket/PendingPingTextClose.java --- a/test/jdk/java/net/httpclient/websocket/PendingPingTextClose.java Wed Mar 21 14:11:38 2018 +0000 +++ b/test/jdk/java/net/httpclient/websocket/PendingPingTextClose.java Wed Mar 21 15:12:11 2018 +0000 @@ -25,10 +25,13 @@ * @test * @build DummyWebSocketServer * @run testng/othervm - * -Djdk.internal.httpclient.websocket.debug=true * PendingPingTextClose */ +// This test produce huge logs (14Mb+) so disable logging by default +// * -Djdk.internal.httpclient.debug=true +// * -Djdk.internal.httpclient.websocket.debug=true + import org.testng.annotations.Test; import java.net.http.WebSocket; @@ -55,7 +58,7 @@ System.out.printf("begin cycle #%s at %s%n", i, start); cfPing = webSocket.sendPing(data); try { - cfPing.get(5, TimeUnit.SECONDS); + cfPing.get(MAX_WAIT_SEC, TimeUnit.SECONDS); data.clear(); } catch (TimeoutException e) { break; diff -r ca27c57cb857 -r 7e56c39fa1fa test/jdk/java/net/httpclient/websocket/PendingPongBinaryClose.java --- a/test/jdk/java/net/httpclient/websocket/PendingPongBinaryClose.java Wed Mar 21 14:11:38 2018 +0000 +++ b/test/jdk/java/net/httpclient/websocket/PendingPongBinaryClose.java Wed Mar 21 15:12:11 2018 +0000 @@ -25,10 +25,13 @@ * @test * @build DummyWebSocketServer * @run testng/othervm - * -Djdk.internal.httpclient.websocket.debug=true * PendingPongBinaryClose */ +// This test produce huge logs (14Mb+) so disable logging by default +// * -Djdk.internal.httpclient.debug=true +// * -Djdk.internal.httpclient.websocket.debug=true + import org.testng.annotations.Test; import java.net.http.WebSocket; @@ -56,7 +59,7 @@ System.out.printf("begin cycle #%s at %s%n", i, start); cfPong = webSocket.sendPong(data); try { - cfPong.get(5, TimeUnit.SECONDS); + cfPong.get(MAX_WAIT_SEC, TimeUnit.SECONDS); data.clear(); } catch (TimeoutException e) { break; diff -r ca27c57cb857 -r 7e56c39fa1fa test/jdk/java/net/httpclient/websocket/PendingPongTextClose.java --- a/test/jdk/java/net/httpclient/websocket/PendingPongTextClose.java Wed Mar 21 14:11:38 2018 +0000 +++ b/test/jdk/java/net/httpclient/websocket/PendingPongTextClose.java Wed Mar 21 15:12:11 2018 +0000 @@ -25,10 +25,13 @@ * @test * @build DummyWebSocketServer * @run testng/othervm - * -Djdk.internal.httpclient.websocket.debug=true * PendingPongTextClose */ +// This test produce huge logs (14Mb+) so disable logging by default +// * -Djdk.internal.httpclient.debug=true +// * -Djdk.internal.httpclient.websocket.debug=true + import org.testng.annotations.Test; import java.net.http.WebSocket; @@ -55,7 +58,7 @@ System.out.printf("begin cycle #%s at %s%n", i, start); cfPong = webSocket.sendPong(data); try { - cfPong.get(5, TimeUnit.SECONDS); + cfPong.get(MAX_WAIT_SEC, TimeUnit.SECONDS); data.clear(); } catch (TimeoutException e) { break; diff -r ca27c57cb857 -r 7e56c39fa1fa test/jdk/java/net/httpclient/websocket/PendingTextPingClose.java --- a/test/jdk/java/net/httpclient/websocket/PendingTextPingClose.java Wed Mar 21 14:11:38 2018 +0000 +++ b/test/jdk/java/net/httpclient/websocket/PendingTextPingClose.java Wed Mar 21 15:12:11 2018 +0000 @@ -56,7 +56,7 @@ System.out.printf("begin cycle #%s at %s%n", i, start); cfText = webSocket.sendText(data, last); try { - cfText.get(5, TimeUnit.SECONDS); + cfText.get(MAX_WAIT_SEC, TimeUnit.SECONDS); data.clear(); } catch (TimeoutException e) { break; diff -r ca27c57cb857 -r 7e56c39fa1fa test/jdk/java/net/httpclient/websocket/PendingTextPongClose.java --- a/test/jdk/java/net/httpclient/websocket/PendingTextPongClose.java Wed Mar 21 14:11:38 2018 +0000 +++ b/test/jdk/java/net/httpclient/websocket/PendingTextPongClose.java Wed Mar 21 15:12:11 2018 +0000 @@ -56,7 +56,7 @@ System.out.printf("begin cycle #%s at %s%n", i, start); cfText = webSocket.sendText(data, last); try { - cfText.get(5, TimeUnit.SECONDS); + cfText.get(MAX_WAIT_SEC, TimeUnit.SECONDS); data.clear(); } catch (TimeoutException e) { break; diff -r ca27c57cb857 -r 7e56c39fa1fa test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/ConnectionPoolTest.java --- a/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/ConnectionPoolTest.java Wed Mar 21 14:11:38 2018 +0000 +++ b/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/ConnectionPoolTest.java Wed Mar 21 15:12:11 2018 +0000 @@ -193,8 +193,6 @@ @Override boolean isSecure() {return secured;} @Override boolean isProxied() {return proxy!=null;} @Override ConnectionPool.CacheKey cacheKey() {return key;} - @Override void shutdownInput() throws IOException {} - @Override void shutdownOutput() throws IOException {} @Override public void close() { closed=true; @@ -210,10 +208,6 @@ @Override public CompletableFuture connectAsync() {return error();} @Override SocketChannel channel() {return error();} @Override - HttpConnection.DetachedConnectionChannel detachChannel() { - return error(); - } - @Override FlowTube getConnectionFlow() {return flow;} } // Emulates an HttpClient that has a strong reference to its connection pool.