src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java
branchhttp-client-branch
changeset 55763 634d8e14c172
parent 47216 71c04702a3d5
child 55795 074bb951658a
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java	Sun Nov 05 17:05:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java	Sun Nov 05 17:32:13 2017 +0000
@@ -26,37 +26,30 @@
 package jdk.incubator.http;
 
 import java.io.IOException;
+import java.lang.System.Logger.Level;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import javax.net.ssl.SSLEngine;
+import jdk.incubator.http.internal.common.ByteBufferReference;
+import jdk.incubator.http.internal.common.SSLTube;
+import jdk.incubator.http.internal.common.Utils;
 
-import jdk.incubator.http.internal.common.ByteBufferReference;
-import jdk.incubator.http.internal.common.Utils;
 
 /**
  * Asynchronous version of SSLConnection.
  */
 class AsyncSSLConnection extends AbstractAsyncSSLConnection {
 
-    final AsyncSSLDelegate sslDelegate;
     final PlainHttpConnection plainConnection;
-    final String serverName;
+    final PlainHttpPublisher writePublisher;
+    private volatile SSLTube flow;
 
-    AsyncSSLConnection(InetSocketAddress addr, HttpClientImpl client, String[] ap) {
-        super(addr, client);
+    AsyncSSLConnection(InetSocketAddress addr,
+                       HttpClientImpl client,
+                       String[] alpn) {
+        super(addr, client, Utils.getServerName(addr), alpn);
         plainConnection = new PlainHttpConnection(addr, client);
-        serverName = Utils.getServerName(addr);
-        sslDelegate = new AsyncSSLDelegate(plainConnection, client, ap, serverName);
-    }
-
-    @Override
-    synchronized void configureMode(Mode mode) throws IOException {
-        super.configureMode(mode);
-        plainConnection.configureMode(mode);
+        writePublisher = new PlainHttpPublisher();
     }
 
     @Override
@@ -65,30 +58,26 @@
     }
 
     @Override
-    AsyncSSLDelegate sslDelegate() {
-        return sslDelegate;
-    }
-
-    @Override
-    public void connect() throws IOException, InterruptedException {
-        plainConnection.connect();
-        configureMode(Mode.ASYNC);
-        startReading();
-        sslDelegate.connect();
-    }
-
-    @Override
     public CompletableFuture<Void> connectAsync() {
-        // not used currently
-        throw new InternalError();
+        return plainConnection
+                .connectAsync()
+                .thenApply( unused -> {
+                    // create the SSLTube wrapping the SocketTube, with the given engine
+                    flow = new SSLTube(engine,
+                                       client().theExecutor(),
+                                       plainConnection.getConnectionFlow());
+                    return null; } );
     }
 
     @Override
     boolean connected() {
-        return plainConnection.connected() && sslDelegate.connected();
+        return plainConnection.connected();
     }
 
     @Override
+    HttpPublisher publisher() { return writePublisher; }
+
+    @Override
     boolean isProxied() {
         return false;
     }
@@ -99,97 +88,50 @@
     }
 
     @Override
-    public void enableCallback() {
-        sslDelegate.enableCallback();
-    }
-
-    @Override
     ConnectionPool.CacheKey cacheKey() {
         return ConnectionPool.cacheKey(address, null);
     }
 
     @Override
-    long write(ByteBuffer[] buffers, int start, int number)
-        throws IOException
-    {
-        ByteBuffer[] bufs = Utils.reduce(buffers, start, number);
-        long n = Utils.remaining(bufs);
-        sslDelegate.writeAsync(ByteBufferReference.toReferences(bufs));
-        sslDelegate.flushAsync();
-        return n;
-    }
-
-    @Override
-    long write(ByteBuffer buffer) throws IOException {
-        long n = buffer.remaining();
-        sslDelegate.writeAsync(ByteBufferReference.toReferences(buffer));
-        sslDelegate.flushAsync();
-        return n;
+    public void writeAsync(ByteBufferReference[] buffers) throws IOException {
+        writePublisher.writeAsync(buffers);
     }
 
     @Override
     public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
-        assert getMode() == Mode.ASYNC;
-        sslDelegate.writeAsyncUnordered(buffers);
-    }
-
-    @Override
-    public void writeAsync(ByteBufferReference[] buffers) throws IOException {
-        assert getMode() == Mode.ASYNC;
-        sslDelegate.writeAsync(buffers);
+        writePublisher.writeAsyncUnordered(buffers);
     }
 
     @Override
     public void flushAsync() throws IOException {
-        sslDelegate.flushAsync();
+        writePublisher.flushAsync();
     }
 
     @Override
     public void closeExceptionally(Throwable cause) {
-        Utils.close(cause, sslDelegate, plainConnection.channel());
+        debug.log(Level.DEBUG, () -> "closing: " + cause);
+        plainConnection.close();
     }
 
     @Override
     public void close() {
-        Utils.close(sslDelegate, plainConnection.channel());
+        plainConnection.close();
     }
 
     @Override
     void shutdownInput() throws IOException {
+        debug.log(Level.DEBUG, "plainConnection.channel().shutdownInput()");
         plainConnection.channel().shutdownInput();
     }
 
     @Override
     void shutdownOutput() throws IOException {
+        debug.log(Level.DEBUG, "plainConnection.channel().shutdownOutput()");
         plainConnection.channel().shutdownOutput();
     }
 
-    @Override
-    SSLEngine getEngine() {
-        return sslDelegate.getEngine();
-    }
-
-    @Override
-    public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver,
-                                  Consumer<Throwable> errorReceiver,
-                                  Supplier<ByteBufferReference> readBufferSupplier) {
-        sslDelegate.setAsyncCallbacks(asyncReceiver, errorReceiver, readBufferSupplier);
-        plainConnection.setAsyncCallbacks(sslDelegate::asyncReceive, errorReceiver, sslDelegate::getNetBuffer);
-    }
-
-    @Override
-    public void startReading() {
-        plainConnection.startReading();
-        sslDelegate.startReading();
-    }
-
-    @Override
-    public void stopAsyncReading() {
-        plainConnection.stopAsyncReading();
-    }
-
-    @Override
-    SSLConnection downgrade() {
-        return new SSLConnection(this);
-    }
+   @Override
+   SSLTube getConnectionFlow() {
+       return flow;
+   }
 }