--- a/src/java.net.http/share/classes/jdk/internal/net/http/AbstractAsyncSSLConnection.java Wed Mar 21 14:11:38 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/AbstractAsyncSSLConnection.java Wed Mar 21 15:12:11 2018 +0000
@@ -25,17 +25,13 @@
package jdk.internal.net.http;
-import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.net.ssl.SNIHostName;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLParameters;
import jdk.internal.net.http.common.SSLTube;
@@ -126,69 +122,4 @@
return true;
}
- // Support for WebSocket/RawChannelImpl which unfortunately
- // still depends on synchronous read/writes.
- // It should be removed when RawChannelImpl moves to using asynchronous APIs.
- static final class SSLConnectionChannel extends DetachedConnectionChannel {
- final DetachedConnectionChannel delegate;
- final SSLDelegate sslDelegate;
- SSLConnectionChannel(DetachedConnectionChannel delegate, SSLDelegate sslDelegate) {
- this.delegate = delegate;
- this.sslDelegate = sslDelegate;
- }
-
- SocketChannel channel() {
- return delegate.channel();
- }
-
- @Override
- ByteBuffer read() throws IOException {
- SSLDelegate.WrapperResult r = sslDelegate.recvData(ByteBuffer.allocate(8192));
- // TODO: check for closure
- int n = r.result.bytesProduced();
- if (n > 0) {
- return r.buf;
- } else if (n == 0) {
- return Utils.EMPTY_BYTEBUFFER;
- } else {
- return null;
- }
- }
- @Override
- long write(ByteBuffer[] buffers, int start, int number) throws IOException {
- long l = SSLDelegate.countBytes(buffers, start, number);
- SSLDelegate.WrapperResult r = sslDelegate.sendData(buffers, start, number);
- if (r.result.getStatus() == SSLEngineResult.Status.CLOSED) {
- if (l > 0) {
- throw new IOException("SSLHttpConnection closed");
- }
- }
- return l;
- }
- @Override
- public void shutdownInput() throws IOException {
- delegate.shutdownInput();
- }
- @Override
- public void shutdownOutput() throws IOException {
- delegate.shutdownOutput();
- }
- @Override
- public void close() {
- delegate.close();
- }
- }
-
- // Support for WebSocket/RawChannelImpl which unfortunately
- // still depends on synchronous read/writes.
- // It should be removed when RawChannelImpl moves to using asynchronous APIs.
- @Override
- DetachedConnectionChannel detachChannel() {
- assert client() != null;
- DetachedConnectionChannel detachedChannel = plainConnection().detachChannel();
- SSLDelegate sslDelegate = new SSLDelegate(engine,
- detachedChannel.channel());
- return new SSLConnectionChannel(detachedChannel, sslDelegate);
- }
-
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLConnection.java Wed Mar 21 14:11:38 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLConnection.java Wed Mar 21 15:12:11 2018 +0000
@@ -25,8 +25,6 @@
package jdk.internal.net.http;
-import java.io.IOException;
-import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CompletableFuture;
@@ -97,19 +95,7 @@
}
@Override
- void shutdownInput() throws IOException {
- debug.log(Level.DEBUG, "plainConnection.channel().shutdownInput()");
- plainConnection.channel().shutdownInput();
- }
-
- @Override
- void shutdownOutput() throws IOException {
- debug.log(Level.DEBUG, "plainConnection.channel().shutdownOutput()");
- plainConnection.channel().shutdownOutput();
- }
-
- @Override
- SSLTube getConnectionFlow() {
+ SSLTube getConnectionFlow() {
return flow;
}
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLTunnelConnection.java Wed Mar 21 14:11:38 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLTunnelConnection.java Wed Mar 21 15:12:11 2018 +0000
@@ -25,7 +25,6 @@
package jdk.internal.net.http;
-import java.io.IOException;
import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
@@ -103,16 +102,6 @@
}
@Override
- void shutdownInput() throws IOException {
- plainConnection.channel().shutdownInput();
- }
-
- @Override
- void shutdownOutput() throws IOException {
- plainConnection.channel().shutdownOutput();
- }
-
- @Override
SocketChannel channel() {
return plainConnection.channel();
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Wed Mar 21 14:11:38 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Wed Mar 21 15:12:11 2018 +0000
@@ -367,18 +367,6 @@
}
/**
- * Only used from RawChannel to disconnect the channel from
- * the selector
- */
- void cancelRegistration(SocketChannel s) {
- selmgr.cancel(s);
- }
-
- void detachChannel(SocketChannel s, AsyncEvent... events) {
- selmgr.detach(s, events);
- }
-
- /**
* Allows an AsyncEvent to modify its interestOps.
* @param event The modified event.
*/
@@ -533,40 +521,6 @@
selector = Selector.open();
}
- void detach(SelectableChannel channel, AsyncEvent... events) {
- if (Thread.currentThread() == this) {
- debug.log(Level.DEBUG, "detaching channel");
- SelectionKey key = channel.keyFor(selector);
- if (key != null) {
- boolean removed = false;
- SelectorAttachment sa = (SelectorAttachment) key.attachment();
- if (sa != null) {
- for (AsyncEvent e : events) {
- if (sa.pending.remove(e)) removed = true;
- }
- // The key could already have been cancelled, in which
- // case the events would already have been removed.
- if (removed) {
- // We found at least one of the events, so we
- // should now cancel the key.
- sa.resetInterestOps(0);
- key.cancel();
- }
- }
- }
- registrations.removeAll(Arrays.asList(events));
- debug.log(Level.DEBUG, "channel detached");
- } else {
- synchronized (this) {
- debug.log(Level.DEBUG, "scheduling event to detach channel");
- deregistrations.add(new AsyncTriggerEvent(
- (x) -> debug.log(Level.DEBUG,
- "Unexpected exception raised while detaching channel", x),
- () -> detach(channel, events)));
- }
- }
- }
-
void eventUpdated(AsyncEvent e) throws ClosedChannelException {
if (Thread.currentThread() == this) {
SelectionKey key = e.channel().keyFor(selector);
@@ -879,10 +833,6 @@
}
}
- boolean deregister(AsyncEvent e) {
- return pending.remove(e);
- }
-
/**
* Returns a Stream<AsyncEvents> containing only events that are
* registered with the given {@code interestOps}.
@@ -1008,8 +958,8 @@
// Make sure to pass the HttpClientFacade to the WebSocket builder.
// This will ensure that the facade is not released before the
// WebSocket has been created, at which point the pendingOperationCount
- // will have been incremented by the DetachedConnectionChannel
- // (see PlainHttpConnection.detachChannel())
+ // will have been incremented by the RawChannelTube.
+ // See RawChannelTube.
return new BuilderImpl(this.facade(), proxySelector);
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java Wed Mar 21 14:11:38 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java Wed Mar 21 15:12:11 2018 +0000
@@ -346,34 +346,6 @@
@Override
public abstract void close();
- abstract void shutdownInput() throws IOException;
-
- abstract void shutdownOutput() throws IOException;
-
- // Support for WebSocket/RawChannelImpl which unfortunately
- // still depends on synchronous read/writes.
- // It should be removed when RawChannelImpl moves to using asynchronous APIs.
- abstract static class DetachedConnectionChannel implements Closeable {
- DetachedConnectionChannel() {}
- abstract SocketChannel channel();
- abstract long write(ByteBuffer[] buffers, int start, int number)
- throws IOException;
- abstract void shutdownInput() throws IOException;
- abstract void shutdownOutput() throws IOException;
- abstract ByteBuffer read() throws IOException;
- @Override
- public abstract void close();
- @Override
- public String toString() {
- return this.getClass().getSimpleName() + ": " + channel().toString();
- }
- }
-
- // Support for WebSocket/RawChannelImpl which unfortunately
- // still depends on synchronous read/writes.
- // It should be removed when RawChannelImpl moves to using asynchronous APIs.
- abstract DetachedConnectionChannel detachChannel();
-
abstract FlowTube getConnectionFlow();
/**
--- a/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Wed Mar 21 14:11:38 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Wed Mar 21 15:12:11 2018 +0000
@@ -199,17 +199,6 @@
}
}
- @Override
- void shutdownInput() throws IOException {
- debug.log(Level.DEBUG, "Shutting down input");
- chan.shutdownInput();
- }
-
- @Override
- void shutdownOutput() throws IOException {
- debug.log(Level.DEBUG, "Shutting down output");
- chan.shutdownOutput();
- }
@Override
ConnectionPool.CacheKey cacheKey() {
@@ -232,91 +221,4 @@
return false;
}
- // Support for WebSocket/RawChannelImpl which unfortunately
- // still depends on synchronous read/writes.
- // It should be removed when RawChannelImpl moves to using asynchronous APIs.
- private static final class PlainDetachedChannel
- extends DetachedConnectionChannel {
- final PlainHttpConnection plainConnection;
- boolean closed;
- PlainDetachedChannel(PlainHttpConnection conn) {
- // We're handing the connection channel over to a web socket.
- // We need the selector manager's thread to stay alive until
- // the WebSocket is closed.
- conn.client().webSocketOpen();
- this.plainConnection = conn;
- }
-
- @Override
- SocketChannel channel() {
- return plainConnection.channel();
- }
-
- @Override
- ByteBuffer read() throws IOException {
- ByteBuffer dst = ByteBuffer.allocate(8192);
- int n = readImpl(dst);
- if (n > 0) {
- return dst;
- } else if (n == 0) {
- return Utils.EMPTY_BYTEBUFFER;
- } else {
- return null;
- }
- }
-
- @Override
- public void close() {
- HttpClientImpl client = plainConnection.client();
- try {
- plainConnection.close();
- } finally {
- // notify the HttpClientImpl that the websocket is no
- // no longer operating.
- synchronized(this) {
- if (closed == true) return;
- closed = true;
- }
- client.webSocketClose();
- }
- }
-
- @Override
- public long write(ByteBuffer[] buffers, int start, int number)
- throws IOException
- {
- return channel().write(buffers, start, number);
- }
-
- @Override
- public void shutdownInput() throws IOException {
- plainConnection.shutdownInput();
- }
-
- @Override
- public void shutdownOutput() throws IOException {
- plainConnection.shutdownOutput();
- }
-
- private int readImpl(ByteBuffer buf) throws IOException {
- int mark = buf.position();
- int n;
- n = channel().read(buf);
- if (n == -1) {
- return -1;
- }
- Utils.flipToMark(buf, mark);
- return n;
- }
- }
-
- // Support for WebSocket/RawChannelImpl which unfortunately
- // still depends on synchronous read/writes.
- // It should be removed when RawChannelImpl moves to using asynchronous APIs.
- @Override
- DetachedConnectionChannel detachChannel() {
- tube.detach();
- return new PlainDetachedChannel(this);
- }
-
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/PlainTunnelingConnection.java Wed Mar 21 14:11:38 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/PlainTunnelingConnection.java Wed Mar 21 15:12:11 2018 +0000
@@ -137,16 +137,6 @@
}
@Override
- void shutdownInput() throws IOException {
- delegate.shutdownInput();
- }
-
- @Override
- void shutdownOutput() throws IOException {
- delegate.shutdownOutput();
- }
-
- @Override
boolean isSecure() {
return false;
}
@@ -156,11 +146,4 @@
return true;
}
- // Support for WebSocket/RawChannelImpl which unfortunately
- // still depends on synchronous read/writes.
- // It should be removed when RawChannelImpl moves to using asynchronous APIs.
- @Override
- DetachedConnectionChannel detachChannel() {
- return delegate.detachChannel();
- }
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/RawChannelImpl.java Wed Mar 21 14:11:38 2018 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,156 +0,0 @@
-/*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.internal.net.http;
-
-import jdk.internal.net.http.common.Utils;
-import jdk.internal.net.http.websocket.RawChannel;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SocketChannel;
-import java.util.function.Supplier;
-
-/*
- * Each RawChannel corresponds to a TCP connection (SocketChannel) but is
- * connected to a Selector and an ExecutorService for invoking the send and
- * receive callbacks. Also includes SSL processing.
- */
-final class RawChannelImpl implements RawChannel {
-
- private final HttpClientImpl client;
- private final HttpConnection.DetachedConnectionChannel detachedChannel;
- private final Object initialLock = new Object();
- private Supplier<ByteBuffer> initial;
-
- RawChannelImpl(HttpClientImpl client,
- HttpConnection connection,
- Supplier<ByteBuffer> initial)
- throws IOException
- {
- this.client = client;
- this.detachedChannel = connection.detachChannel();
- this.initial = initial;
-
- SocketChannel chan = connection.channel();
- client.cancelRegistration(chan);
- // Constructing a RawChannel is supposed to have a "hand over"
- // semantics, in other words if construction fails, the channel won't be
- // needed by anyone, in which case someone still needs to close it
- try {
- chan.configureBlocking(false);
- } catch (IOException e) {
- try {
- chan.close();
- } catch (IOException e1) {
- e.addSuppressed(e1);
- } finally {
- detachedChannel.close();
- }
- throw e;
- }
- }
-
- private class NonBlockingRawAsyncEvent extends AsyncEvent {
-
- private final RawEvent re;
-
- NonBlockingRawAsyncEvent(RawEvent re) {
- // !BLOCKING & !REPEATING
- this.re = re;
- }
-
- @Override
- public SelectableChannel channel() {
- return detachedChannel.channel();
- }
-
- @Override
- public int interestOps() {
- return re.interestOps();
- }
-
- @Override
- public void handle() {
- re.handle();
- }
-
- @Override
- public void abort(IOException ioe) { }
- }
-
- @Override
- public void registerEvent(RawEvent event) throws IOException {
- client.registerEvent(new NonBlockingRawAsyncEvent(event));
- }
-
- @Override
- public ByteBuffer read() throws IOException {
- assert !detachedChannel.channel().isBlocking();
- // connection.read() will no longer be available.
- return detachedChannel.read();
- }
-
- @Override
- public ByteBuffer initialByteBuffer() {
- synchronized (initialLock) {
- if (initial == null) {
- throw new IllegalStateException();
- }
- ByteBuffer ref = initial.get();
- ref = ref.hasRemaining() ? Utils.copy(ref)
- : Utils.EMPTY_BYTEBUFFER;
- initial = null;
- return ref;
- }
- }
-
- @Override
- public long write(ByteBuffer[] src, int offset, int len) throws IOException {
- // this makes the whitebox driver test fail.
- return detachedChannel.write(src, offset, len);
- }
-
- @Override
- public void shutdownInput() throws IOException {
- detachedChannel.shutdownInput();
- }
-
- @Override
- public void shutdownOutput() throws IOException {
- detachedChannel.shutdownOutput();
- }
-
- @Override
- public void close() {
- detachedChannel.close();
- }
-
- @Override
- public String toString() {
- return super.toString() + "(" + detachedChannel.toString() + ")";
- }
-}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SSLDelegate.java Wed Mar 21 14:11:38 2018 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,491 +0,0 @@
-/*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.internal.net.http;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
-import javax.net.ssl.SSLEngineResult.Status;
-import javax.net.ssl.*;
-import jdk.internal.net.http.common.Log;
-import jdk.internal.net.http.common.Utils;
-import static javax.net.ssl.SSLEngineResult.HandshakeStatus.*;
-
-/**
- * Implements the mechanics of SSL by managing an SSLEngine object.
- * <p>
- * This class is only used to implement the {@link
- * AbstractAsyncSSLConnection.SSLConnectionChannel} which is handed of
- * to RawChannelImpl when creating a WebSocket.
- */
-class SSLDelegate {
-
- final SSLEngine engine;
- final EngineWrapper wrapper;
- final Lock handshaking = new ReentrantLock();
- final SocketChannel chan;
-
- SSLDelegate(SSLEngine eng, SocketChannel chan)
- {
- this.engine = eng;
- this.chan = chan;
- this.wrapper = new EngineWrapper(chan, engine);
- }
-
- // alpn[] may be null
-// SSLDelegate(SocketChannel chan, HttpClientImpl client, String[] alpn, String sn)
-// throws IOException
-// {
-// serverName = sn;
-// SSLContext context = client.sslContext();
-// engine = context.createSSLEngine();
-// engine.setUseClientMode(true);
-// SSLParameters sslp = client.sslParameters();
-// sslParameters = Utils.copySSLParameters(sslp);
-// if (sn != null) {
-// SNIHostName sni = new SNIHostName(sn);
-// sslParameters.setServerNames(List.of(sni));
-// }
-// if (alpn != null) {
-// sslParameters.setApplicationProtocols(alpn);
-// Log.logSSL("SSLDelegate: Setting application protocols: {0}" + Arrays.toString(alpn));
-// } else {
-// Log.logSSL("SSLDelegate: No application protocols proposed");
-// }
-// engine.setSSLParameters(sslParameters);
-// wrapper = new EngineWrapper(chan, engine);
-// this.chan = chan;
-// this.client = client;
-// }
-
-// SSLParameters getSSLParameters() {
-// return sslParameters;
-// }
-
- static long countBytes(ByteBuffer[] buffers, int start, int number) {
- long c = 0;
- for (int i=0; i<number; i++) {
- c+= buffers[start+i].remaining();
- }
- return c;
- }
-
-
- static class WrapperResult {
- static WrapperResult createOK() {
- WrapperResult r = new WrapperResult();
- r.buf = null;
- r.result = new SSLEngineResult(Status.OK, NOT_HANDSHAKING, 0, 0);
- return r;
- }
- SSLEngineResult result;
-
- ByteBuffer buf; // buffer containing result data
- }
-
- int app_buf_size;
- int packet_buf_size;
-
- enum BufType {
- PACKET,
- APPLICATION
- }
-
- ByteBuffer allocate (BufType type) {
- return allocate (type, -1);
- }
-
- // TODO: Use buffer pool for this
- ByteBuffer allocate (BufType type, int len) {
- assert engine != null;
- synchronized (this) {
- int size;
- if (type == BufType.PACKET) {
- if (packet_buf_size == 0) {
- SSLSession sess = engine.getSession();
- packet_buf_size = sess.getPacketBufferSize();
- }
- if (len > packet_buf_size) {
- packet_buf_size = len;
- }
- size = packet_buf_size;
- } else {
- if (app_buf_size == 0) {
- SSLSession sess = engine.getSession();
- app_buf_size = sess.getApplicationBufferSize();
- }
- if (len > app_buf_size) {
- app_buf_size = len;
- }
- size = app_buf_size;
- }
- return ByteBuffer.allocate (size);
- }
- }
-
- /* reallocates the buffer by :-
- * 1. creating a new buffer double the size of the old one
- * 2. putting the contents of the old buffer into the new one
- * 3. set xx_buf_size to the new size if it was smaller than new size
- *
- * flip is set to true if the old buffer needs to be flipped
- * before it is copied.
- */
- private ByteBuffer realloc (ByteBuffer b, boolean flip, BufType type) {
- // TODO: there should be the linear growth, rather than exponential as
- // we definitely know the maximum amount of space required to unwrap
- synchronized (this) {
- int nsize = 2 * b.capacity();
- ByteBuffer n = allocate (type, nsize);
- if (flip) {
- b.flip();
- }
- n.put(b);
- b = n;
- }
- return b;
- }
-
- /**
- * This is a thin wrapper over SSLEngine and the SocketChannel, which
- * guarantees the ordering of wraps/unwraps with respect to the underlying
- * channel read/writes. It handles the UNDER/OVERFLOW status codes
- * It does not handle the handshaking status codes, or the CLOSED status code
- * though once the engine is closed, any attempt to read/write to it
- * will get an exception. The overall result is returned.
- * It functions synchronously/blocking
- */
- class EngineWrapper {
-
- SocketChannel chan;
- SSLEngine engine;
- final Object wrapLock;
- final Object unwrapLock;
- ByteBuffer unwrap_src, wrap_dst;
- boolean closed = false;
- int u_remaining; // the number of bytes left in unwrap_src after an unwrap()
-
- EngineWrapper (SocketChannel chan, SSLEngine engine) {
- this.chan = chan;
- this.engine = engine;
- wrapLock = new Object();
- unwrapLock = new Object();
- unwrap_src = allocate(BufType.PACKET);
- wrap_dst = allocate(BufType.PACKET);
- }
-
-// void close () throws IOException {
-// }
-
- WrapperResult wrapAndSend(ByteBuffer src, boolean ignoreClose)
- throws IOException
- {
- ByteBuffer[] buffers = new ByteBuffer[1];
- buffers[0] = src;
- return wrapAndSend(buffers, 0, 1, ignoreClose);
- }
-
- /* try to wrap and send the data in src. Handles OVERFLOW.
- * Might block if there is an outbound blockage or if another
- * thread is calling wrap(). Also, might not send any data
- * if an unwrap is needed.
- */
- WrapperResult wrapAndSend(ByteBuffer[] src,
- int offset,
- int len,
- boolean ignoreClose)
- throws IOException
- {
- if (closed && !ignoreClose) {
- throw new IOException ("Engine is closed");
- }
- Status status;
- WrapperResult r = new WrapperResult();
- synchronized (wrapLock) {
- wrap_dst.clear();
- do {
- r.result = engine.wrap (src, offset, len, wrap_dst);
- status = r.result.getStatus();
- if (status == Status.BUFFER_OVERFLOW) {
- wrap_dst = realloc (wrap_dst, true, BufType.PACKET);
- }
- } while (status == Status.BUFFER_OVERFLOW);
- if (status == Status.CLOSED && !ignoreClose) {
- closed = true;
- return r;
- }
- if (r.result.bytesProduced() > 0) {
- wrap_dst.flip();
- int l = wrap_dst.remaining();
- assert l == r.result.bytesProduced();
- while (l>0) {
- l -= chan.write (wrap_dst);
- }
- }
- }
- return r;
- }
-
- /* block until a complete message is available and return it
- * in dst, together with the Result. dst may have been re-allocated
- * so caller should check the returned value in Result
- * If handshaking is in progress then, possibly no data is returned
- */
- WrapperResult recvAndUnwrap(ByteBuffer dst) throws IOException {
- Status status;
- WrapperResult r = new WrapperResult();
- r.buf = dst;
- if (closed) {
- throw new IOException ("Engine is closed");
- }
- boolean needData;
- if (u_remaining > 0) {
- unwrap_src.compact();
- unwrap_src.flip();
- needData = false;
- } else {
- unwrap_src.clear();
- needData = true;
- }
- synchronized (unwrapLock) {
- int x;
- do {
- if (needData) {
- x = chan.read (unwrap_src);
- if (x == -1) {
- throw new IOException ("connection closed for reading");
- }
- unwrap_src.flip();
- }
- r.result = engine.unwrap (unwrap_src, r.buf);
- status = r.result.getStatus();
- if (status == Status.BUFFER_UNDERFLOW) {
- if (unwrap_src.limit() == unwrap_src.capacity()) {
- /* buffer not big enough */
- unwrap_src = realloc (
- unwrap_src, false, BufType.PACKET
- );
- } else {
- /* Buffer not full, just need to read more
- * data off the channel. Reset pointers
- * for reading off SocketChannel
- */
- unwrap_src.position (unwrap_src.limit());
- unwrap_src.limit (unwrap_src.capacity());
- }
- needData = true;
- } else if (status == Status.BUFFER_OVERFLOW) {
- r.buf = realloc (r.buf, true, BufType.APPLICATION);
- needData = false;
- } else if (status == Status.CLOSED) {
- closed = true;
- r.buf.flip();
- return r;
- }
- } while (status != Status.OK);
- }
- u_remaining = unwrap_src.remaining();
- return r;
- }
- }
-
-// WrapperResult sendData (ByteBuffer src) throws IOException {
-// ByteBuffer[] buffers = new ByteBuffer[1];
-// buffers[0] = src;
-// return sendData(buffers, 0, 1);
-// }
-
- /**
- * send the data in the given ByteBuffer. If a handshake is needed
- * then this is handled within this method. When this call returns,
- * all of the given user data has been sent and any handshake has been
- * completed. Caller should check if engine has been closed.
- */
- WrapperResult sendData (ByteBuffer[] src, int offset, int len) throws IOException {
- WrapperResult r = WrapperResult.createOK();
- while (countBytes(src, offset, len) > 0) {
- r = wrapper.wrapAndSend(src, offset, len, false);
- Status status = r.result.getStatus();
- if (status == Status.CLOSED) {
- doClosure ();
- return r;
- }
- HandshakeStatus hs_status = r.result.getHandshakeStatus();
- if (hs_status != HandshakeStatus.FINISHED &&
- hs_status != HandshakeStatus.NOT_HANDSHAKING)
- {
- doHandshake(hs_status);
- }
- }
- return r;
- }
-
- /**
- * read data thru the engine into the given ByteBuffer. If the
- * given buffer was not large enough, a new one is allocated
- * and returned. This call handles handshaking automatically.
- * Caller should check if engine has been closed.
- */
- WrapperResult recvData (ByteBuffer dst) throws IOException {
- /* we wait until some user data arrives */
- int mark = dst.position();
- WrapperResult r = null;
- int pos = dst.position();
- while (dst.position() == pos) {
- r = wrapper.recvAndUnwrap (dst);
- dst = (r.buf != dst) ? r.buf: dst;
- Status status = r.result.getStatus();
- if (status == Status.CLOSED) {
- doClosure ();
- return r;
- }
-
- HandshakeStatus hs_status = r.result.getHandshakeStatus();
- if (hs_status != HandshakeStatus.FINISHED &&
- hs_status != HandshakeStatus.NOT_HANDSHAKING)
- {
- doHandshake (hs_status);
- }
- }
- Utils.flipToMark(dst, mark);
- return r;
- }
-
- /* we've received a close notify. Need to call wrap to send
- * the response
- */
- void doClosure () throws IOException {
- try {
- handshaking.lock();
- ByteBuffer tmp = allocate(BufType.APPLICATION);
- WrapperResult r;
- do {
- tmp.clear();
- tmp.flip ();
- r = wrapper.wrapAndSend(tmp, true);
- } while (r.result.getStatus() != Status.CLOSED);
- } finally {
- handshaking.unlock();
- }
- }
-
- /* do the (complete) handshake after acquiring the handshake lock.
- * If two threads call this at the same time, then we depend
- * on the wrapper methods being idempotent. eg. if wrapAndSend()
- * is called with no data to send then there must be no problem
- */
- @SuppressWarnings("fallthrough")
- void doHandshake (HandshakeStatus hs_status) throws IOException {
- boolean wasBlocking;
- try {
- wasBlocking = chan.isBlocking();
- handshaking.lock();
- chan.configureBlocking(true);
- ByteBuffer tmp = allocate(BufType.APPLICATION);
- while (hs_status != HandshakeStatus.FINISHED &&
- hs_status != HandshakeStatus.NOT_HANDSHAKING)
- {
- WrapperResult r = null;
- switch (hs_status) {
- case NEED_TASK:
- Runnable task;
- while ((task = engine.getDelegatedTask()) != null) {
- /* run in current thread, because we are already
- * running an external Executor
- */
- task.run();
- }
- /* fall thru - call wrap again */
- case NEED_WRAP:
- tmp.clear();
- tmp.flip();
- r = wrapper.wrapAndSend(tmp, false);
- break;
-
- case NEED_UNWRAP:
- tmp.clear();
- r = wrapper.recvAndUnwrap (tmp);
- if (r.buf != tmp) {
- tmp = r.buf;
- }
- assert tmp.position() == 0;
- break;
- }
- if (r != null) {
- hs_status = r.result.getHandshakeStatus();
- }
- }
- Log.logSSL(getSessionInfo());
- if (!wasBlocking) {
- chan.configureBlocking(false);
- }
- } finally {
- handshaking.unlock();
- }
- }
-
-// static void printParams(SSLParameters p) {
-// System.out.println("SSLParameters:");
-// if (p == null) {
-// System.out.println("Null params");
-// return;
-// }
-// for (String cipher : p.getCipherSuites()) {
-// System.out.printf("cipher: %s\n", cipher);
-// }
-// // JDK 8 EXCL START
-// for (String approto : p.getApplicationProtocols()) {
-// System.out.printf("application protocol: %s\n", approto);
-// }
-// // JDK 8 EXCL END
-// for (String protocol : p.getProtocols()) {
-// System.out.printf("protocol: %s\n", protocol);
-// }
-// if (p.getServerNames() != null) {
-// for (SNIServerName sname : p.getServerNames()) {
-// System.out.printf("server name: %s\n", sname.toString());
-// }
-// }
-// }
-
- String getSessionInfo() {
- StringBuilder sb = new StringBuilder();
- String application = engine.getApplicationProtocol();
- SSLSession sess = engine.getSession();
- String cipher = sess.getCipherSuite();
- String protocol = sess.getProtocol();
- sb.append("Handshake complete alpn: ")
- .append(application)
- .append(", Cipher: ")
- .append(cipher)
- .append(", Protocol: ")
- .append(protocol);
- return sb.toString();
- }
-}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Wed Mar 21 14:11:38 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Wed Mar 21 15:12:11 2018 +0000
@@ -65,7 +65,6 @@
private final Supplier<ByteBuffer> buffersSource;
private final Object lock = new Object();
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
- private final AtomicBoolean detached = new AtomicBoolean();
private final InternalReadPublisher readPublisher;
private final InternalWriteSubscriber writeSubscriber;
private final long id = IDS.incrementAndGet();
@@ -164,23 +163,6 @@
new IOException("connection closed locally"));
}
- void detach() {
- if (detached.compareAndSet(false, true)) {
- debug.log(Level.DEBUG, "detaching tube");
- readPublisher.subscriptionImpl.readScheduler.stop();
- debug.log(Level.DEBUG, "scheduler stopped");
- SocketFlowEvent[] events = {
- readPublisher.subscriptionImpl.readEvent,
- writeSubscriber.writeEvent
- };
- for (SocketFlowEvent event : events) {
- event.pause();
- }
- debug.log(Level.DEBUG, "asking HttpClientImpl to detach channel");
- client.detachChannel(channel, events);
- }
- }
-
/**
* A restartable task used to process tasks in sequence.
*/
@@ -444,18 +426,12 @@
resumeEvent(writeEvent, this::signalError);
}
-// void pauseWriteEvent() {
-// debug.log(Level.DEBUG, "pausing write event");
-// pauseEvent(writeEvent, this::signalError);
-// }
-
void signalWritable() {
debug.log(Level.DEBUG, "channel is writable");
tryFlushCurrent(true);
}
void signalError(Throwable error) {
- if (detached.get()) return;
debug.log(Level.DEBUG, () -> "write error: " + error);
completed = true;
readPublisher.signalError(error);
@@ -548,7 +524,6 @@
}
void signalError(Throwable error) {
- if (detached.get()) return;
debug.log(Level.DEBUG, () -> "error signalled " + error);
if (!errorRef.compareAndSet(null, error)) {
return;
@@ -716,7 +691,6 @@
}
final void signalError(Throwable error) {
- if (detached.get()) return;
if (!errorRef.compareAndSet(null, error)) {
return;
}
@@ -725,7 +699,6 @@
}
final void signalReadable() {
- if (detached.get()) return;
readScheduler.runOrSchedule();
}
@@ -740,7 +713,6 @@
try {
while(!readScheduler.isStopped()) {
if (completed) return;
- if (detached.get()) return;
// make sure we have a subscriber
if (handlePending()) {
@@ -879,7 +851,6 @@
}
@Override
protected final void signalEvent() {
- if (detached.get()) return;
try {
client.eventUpdated(this);
sub.signalReadable();
@@ -890,7 +861,6 @@
@Override
protected final void signalError(Throwable error) {
- if (detached.get()) return;
sub.signalError(error);
}
--- a/test/jdk/java/net/httpclient/websocket/PendingBinaryPingClose.java Wed Mar 21 14:11:38 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/PendingBinaryPingClose.java Wed Mar 21 15:12:11 2018 +0000
@@ -55,7 +55,7 @@
System.out.printf("begin cycle #%s at %s%n", i, start);
cfBinary = webSocket.sendBinary(data, last);
try {
- cfBinary.get(5, TimeUnit.SECONDS);
+ cfBinary.get(MAX_WAIT_SEC, TimeUnit.SECONDS);
data.clear();
} catch (TimeoutException e) {
break;
--- a/test/jdk/java/net/httpclient/websocket/PendingBinaryPongClose.java Wed Mar 21 14:11:38 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/PendingBinaryPongClose.java Wed Mar 21 15:12:11 2018 +0000
@@ -55,7 +55,7 @@
System.out.printf("begin cycle #%s at %s%n", i, start);
cfBinary = webSocket.sendBinary(data, last);
try {
- cfBinary.get(5, TimeUnit.SECONDS);
+ cfBinary.get(MAX_WAIT_SEC, TimeUnit.SECONDS);
data.clear();
} catch (TimeoutException e) {
break;
--- a/test/jdk/java/net/httpclient/websocket/PendingOperations.java Wed Mar 21 14:11:38 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/PendingOperations.java Wed Mar 21 15:12:11 2018 +0000
@@ -33,6 +33,11 @@
static final Class<IllegalStateException> ISE = IllegalStateException.class;
static final Class<IOException> IOE = IOException.class;
+ // Time after which we deem that the local send buffer and remote
+ // receive buffer must be full. This has been heuristically determined.
+ // At the time of writing, using anything <= 5s on Mac will make the
+ // tests fail intermittently.
+ static final long MAX_WAIT_SEC = 10; // seconds.
DummyWebSocketServer server;
WebSocket webSocket;
--- a/test/jdk/java/net/httpclient/websocket/PendingPingBinaryClose.java Wed Mar 21 14:11:38 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/PendingPingBinaryClose.java Wed Mar 21 15:12:11 2018 +0000
@@ -25,10 +25,13 @@
* @test
* @build DummyWebSocketServer
* @run testng/othervm
- * -Djdk.internal.httpclient.websocket.debug=true
* PendingPingBinaryClose
*/
+// This test produce huge logs (14Mb+) so disable logging by default
+// * -Djdk.internal.httpclient.debug=true
+// * -Djdk.internal.httpclient.websocket.debug=true
+
import org.testng.annotations.Test;
import java.net.http.WebSocket;
@@ -55,7 +58,7 @@
System.out.printf("begin cycle #%s at %s%n", i, start);
cfPing = webSocket.sendPing(data);
try {
- cfPing.get(5, TimeUnit.SECONDS);
+ cfPing.get(MAX_WAIT_SEC, TimeUnit.SECONDS);
data.clear();
} catch (TimeoutException e) {
break;
--- a/test/jdk/java/net/httpclient/websocket/PendingPingTextClose.java Wed Mar 21 14:11:38 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/PendingPingTextClose.java Wed Mar 21 15:12:11 2018 +0000
@@ -25,10 +25,13 @@
* @test
* @build DummyWebSocketServer
* @run testng/othervm
- * -Djdk.internal.httpclient.websocket.debug=true
* PendingPingTextClose
*/
+// This test produce huge logs (14Mb+) so disable logging by default
+// * -Djdk.internal.httpclient.debug=true
+// * -Djdk.internal.httpclient.websocket.debug=true
+
import org.testng.annotations.Test;
import java.net.http.WebSocket;
@@ -55,7 +58,7 @@
System.out.printf("begin cycle #%s at %s%n", i, start);
cfPing = webSocket.sendPing(data);
try {
- cfPing.get(5, TimeUnit.SECONDS);
+ cfPing.get(MAX_WAIT_SEC, TimeUnit.SECONDS);
data.clear();
} catch (TimeoutException e) {
break;
--- a/test/jdk/java/net/httpclient/websocket/PendingPongBinaryClose.java Wed Mar 21 14:11:38 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/PendingPongBinaryClose.java Wed Mar 21 15:12:11 2018 +0000
@@ -25,10 +25,13 @@
* @test
* @build DummyWebSocketServer
* @run testng/othervm
- * -Djdk.internal.httpclient.websocket.debug=true
* PendingPongBinaryClose
*/
+// This test produce huge logs (14Mb+) so disable logging by default
+// * -Djdk.internal.httpclient.debug=true
+// * -Djdk.internal.httpclient.websocket.debug=true
+
import org.testng.annotations.Test;
import java.net.http.WebSocket;
@@ -56,7 +59,7 @@
System.out.printf("begin cycle #%s at %s%n", i, start);
cfPong = webSocket.sendPong(data);
try {
- cfPong.get(5, TimeUnit.SECONDS);
+ cfPong.get(MAX_WAIT_SEC, TimeUnit.SECONDS);
data.clear();
} catch (TimeoutException e) {
break;
--- a/test/jdk/java/net/httpclient/websocket/PendingPongTextClose.java Wed Mar 21 14:11:38 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/PendingPongTextClose.java Wed Mar 21 15:12:11 2018 +0000
@@ -25,10 +25,13 @@
* @test
* @build DummyWebSocketServer
* @run testng/othervm
- * -Djdk.internal.httpclient.websocket.debug=true
* PendingPongTextClose
*/
+// This test produce huge logs (14Mb+) so disable logging by default
+// * -Djdk.internal.httpclient.debug=true
+// * -Djdk.internal.httpclient.websocket.debug=true
+
import org.testng.annotations.Test;
import java.net.http.WebSocket;
@@ -55,7 +58,7 @@
System.out.printf("begin cycle #%s at %s%n", i, start);
cfPong = webSocket.sendPong(data);
try {
- cfPong.get(5, TimeUnit.SECONDS);
+ cfPong.get(MAX_WAIT_SEC, TimeUnit.SECONDS);
data.clear();
} catch (TimeoutException e) {
break;
--- a/test/jdk/java/net/httpclient/websocket/PendingTextPingClose.java Wed Mar 21 14:11:38 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/PendingTextPingClose.java Wed Mar 21 15:12:11 2018 +0000
@@ -56,7 +56,7 @@
System.out.printf("begin cycle #%s at %s%n", i, start);
cfText = webSocket.sendText(data, last);
try {
- cfText.get(5, TimeUnit.SECONDS);
+ cfText.get(MAX_WAIT_SEC, TimeUnit.SECONDS);
data.clear();
} catch (TimeoutException e) {
break;
--- a/test/jdk/java/net/httpclient/websocket/PendingTextPongClose.java Wed Mar 21 14:11:38 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/PendingTextPongClose.java Wed Mar 21 15:12:11 2018 +0000
@@ -56,7 +56,7 @@
System.out.printf("begin cycle #%s at %s%n", i, start);
cfText = webSocket.sendText(data, last);
try {
- cfText.get(5, TimeUnit.SECONDS);
+ cfText.get(MAX_WAIT_SEC, TimeUnit.SECONDS);
data.clear();
} catch (TimeoutException e) {
break;
--- a/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/ConnectionPoolTest.java Wed Mar 21 14:11:38 2018 +0000
+++ b/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/ConnectionPoolTest.java Wed Mar 21 15:12:11 2018 +0000
@@ -193,8 +193,6 @@
@Override boolean isSecure() {return secured;}
@Override boolean isProxied() {return proxy!=null;}
@Override ConnectionPool.CacheKey cacheKey() {return key;}
- @Override void shutdownInput() throws IOException {}
- @Override void shutdownOutput() throws IOException {}
@Override
public void close() {
closed=true;
@@ -210,10 +208,6 @@
@Override public CompletableFuture<Void> connectAsync() {return error();}
@Override SocketChannel channel() {return error();}
@Override
- HttpConnection.DetachedConnectionChannel detachChannel() {
- return error();
- }
- @Override
FlowTube getConnectionFlow() {return flow;}
}
// Emulates an HttpClient that has a strong reference to its connection pool.