--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLTunnelConnection.java Sun Nov 05 17:05:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLTunnelConnection.java Sun Nov 05 17:32:13 2017 +0000
@@ -26,15 +26,12 @@
package jdk.incubator.http;
import java.io.IOException;
+import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
import jdk.incubator.http.internal.common.ByteBufferReference;
+import jdk.incubator.http.internal.common.SSLTube;
import jdk.incubator.http.internal.common.Utils;
/**
@@ -43,48 +40,43 @@
class AsyncSSLTunnelConnection extends AbstractAsyncSSLConnection {
final PlainTunnelingConnection plainConnection;
- final AsyncSSLDelegate sslDelegate;
- final String serverName;
+ final PlainHttpPublisher writePublisher;
+ volatile SSLTube flow;
- @Override
- public void connect() throws IOException, InterruptedException {
- plainConnection.connect();
- configureMode(Mode.ASYNC);
- startReading();
- sslDelegate.connect();
- }
-
- @Override
- boolean connected() {
- return plainConnection.connected() && sslDelegate.connected();
+ AsyncSSLTunnelConnection(InetSocketAddress addr,
+ HttpClientImpl client,
+ String[] alpn,
+ InetSocketAddress proxy)
+ {
+ super(addr, client, Utils.getServerName(addr), alpn);
+ this.plainConnection = new PlainTunnelingConnection(addr, proxy, client);
+ this.writePublisher = new PlainHttpPublisher();
}
@Override
public CompletableFuture<Void> connectAsync() {
- throw new InternalError();
- }
-
- AsyncSSLTunnelConnection(InetSocketAddress addr,
- HttpClientImpl client,
- String[] alpn,
- InetSocketAddress proxy)
- {
- super(addr, client);
- this.serverName = Utils.getServerName(addr);
- this.plainConnection = new PlainTunnelingConnection(addr, proxy, client);
- this.sslDelegate = new AsyncSSLDelegate(plainConnection, client, alpn, serverName);
+ debug.log(Level.DEBUG, "Connecting plain tunnel connection");
+ // This will connect the PlainHttpConnection flow, so that
+ // its HttpSubscriber and HttpPublisher are subscribed to the
+ // SocketTube
+ return plainConnection
+ .connectAsync()
+ .thenApply( unused -> {
+ debug.log(Level.DEBUG, "creating SSLTube");
+ // create the SSLTube wrapping the SocketTube, with the given engine
+ flow = new SSLTube(engine,
+ client().theExecutor(),
+ plainConnection.getConnectionFlow());
+ return null;} );
}
@Override
- synchronized void configureMode(Mode mode) throws IOException {
- super.configureMode(mode);
- plainConnection.configureMode(mode);
+ boolean connected() {
+ return plainConnection.connected(); // && sslDelegate.connected();
}
@Override
- SSLParameters sslParameters() {
- return sslDelegate.getSSLParameters();
- }
+ HttpPublisher publisher() { return writePublisher; }
@Override
public String toString() {
@@ -97,52 +89,28 @@
}
@Override
- AsyncSSLDelegate sslDelegate() {
- return sslDelegate;
- }
-
- @Override
ConnectionPool.CacheKey cacheKey() {
return ConnectionPool.cacheKey(address, plainConnection.proxyAddr);
}
@Override
- long write(ByteBuffer[] buffers, int start, int number) throws IOException {
- //debugPrint("Send", buffers, start, number);
- ByteBuffer[] bufs = Utils.reduce(buffers, start, number);
- long n = Utils.remaining(bufs);
- sslDelegate.writeAsync(ByteBufferReference.toReferences(bufs));
- sslDelegate.flushAsync();
- return n;
- }
-
- @Override
- long write(ByteBuffer buffer) throws IOException {
- //debugPrint("Send", buffer);
- long n = buffer.remaining();
- sslDelegate.writeAsync(ByteBufferReference.toReferences(buffer));
- sslDelegate.flushAsync();
- return n;
- }
-
- @Override
public void writeAsync(ByteBufferReference[] buffers) throws IOException {
- sslDelegate.writeAsync(buffers);
+ writePublisher.writeAsync(buffers);
}
@Override
public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
- sslDelegate.writeAsyncUnordered(buffers);
+ writePublisher.writeAsyncUnordered(buffers);
}
@Override
public void flushAsync() throws IOException {
- sslDelegate.flushAsync();
+ writePublisher.flushAsync();
}
@Override
public void close() {
- Utils.close(sslDelegate, plainConnection.channel());
+ plainConnection.close();
}
@Override
@@ -166,41 +134,13 @@
}
@Override
- public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver,
- Consumer<Throwable> errorReceiver,
- Supplier<ByteBufferReference> readBufferSupplier) {
- sslDelegate.setAsyncCallbacks(asyncReceiver, errorReceiver, readBufferSupplier);
- plainConnection.setAsyncCallbacks(sslDelegate::asyncReceive, errorReceiver, sslDelegate::getNetBuffer);
- }
-
- @Override
- public void startReading() {
- plainConnection.startReading();
- sslDelegate.startReading();
- }
-
- @Override
- public void stopAsyncReading() {
- plainConnection.stopAsyncReading();
+ public void closeExceptionally(Throwable cause) {
+ debug.log(Level.DEBUG, "Closing connection: ", cause);
+ plainConnection.close();
}
@Override
- public void enableCallback() {
- sslDelegate.enableCallback();
- }
-
- @Override
- public void closeExceptionally(Throwable cause) throws IOException {
- Utils.close(cause, sslDelegate, plainConnection.channel());
- }
-
- @Override
- SSLEngine getEngine() {
- return sslDelegate.getEngine();
- }
-
- @Override
- SSLTunnelConnection downgrade() {
- return new SSLTunnelConnection(this);
- }
+ SSLTube getConnectionFlow() {
+ return flow;
+ }
}