src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java
branchhttp-client-branch
changeset 55763 634d8e14c172
parent 47216 71c04702a3d5
child 55764 34d7cc00f87a
equal deleted inserted replaced
55762:e947a3a50a95 55763:634d8e14c172
    23  * questions.
    23  * questions.
    24  */
    24  */
    25 
    25 
    26 package jdk.incubator.http;
    26 package jdk.incubator.http;
    27 
    27 
    28 import jdk.incubator.http.internal.common.ByteBufferReference;
       
    29 import jdk.incubator.http.internal.common.MinimalFuture;
       
    30 import jdk.incubator.http.HttpResponse.BodyHandler;
       
    31 
       
    32 import java.io.IOException;
    28 import java.io.IOException;
       
    29 import java.lang.System.Logger.Level;
    33 import java.net.InetSocketAddress;
    30 import java.net.InetSocketAddress;
    34 import java.nio.ByteBuffer;
    31 import java.nio.ByteBuffer;
    35 import java.nio.channels.SocketChannel;
    32 import java.nio.channels.SocketChannel;
    36 import java.util.concurrent.CompletableFuture;
    33 import java.util.concurrent.CompletableFuture;
    37 import java.util.function.Consumer;
    34 import jdk.incubator.http.internal.common.ByteBufferReference;
    38 import java.util.function.Supplier;
    35 import jdk.incubator.http.internal.common.FlowTube;
       
    36 import jdk.incubator.http.internal.common.MinimalFuture;
       
    37 import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
    39 
    38 
    40 /**
    39 /**
    41  * A plain text socket tunnel through a proxy. Uses "CONNECT" but does not
    40  * A plain text socket tunnel through a proxy. Uses "CONNECT" but does not
    42  * encrypt. Used by WebSocket, as well as HTTP over SSL + Proxy.
    41  * encrypt. Used by WebSocket, as well as HTTP over SSL + Proxy.
    43  * Wrapped in SSLTunnelConnection or AsyncSSLTunnelConnection for encryption.
    42  * Wrapped in SSLTunnelConnection or AsyncSSLTunnelConnection for encryption.
    44  */
    43  */
    45 class PlainTunnelingConnection extends HttpConnection implements AsyncConnection {
    44 final class PlainTunnelingConnection extends HttpConnection {
    46 
    45 
    47     final PlainHttpConnection delegate;
    46     final PlainHttpConnection delegate;
    48     protected final InetSocketAddress proxyAddr;
    47     protected final InetSocketAddress proxyAddr;
    49     private volatile boolean connected;
    48     private volatile boolean connected;
    50 
       
    51     @Override
       
    52     public CompletableFuture<Void> connectAsync() {
       
    53         return delegate.connectAsync()
       
    54             .thenCompose((Void v) -> {
       
    55                 HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address);
       
    56                 MultiExchange<Void,Void> mconnectExchange = new MultiExchange<>(req, client, this::ignore);
       
    57                 return mconnectExchange.responseAsync()
       
    58                     .thenCompose((HttpResponseImpl<Void> resp) -> {
       
    59                         CompletableFuture<Void> cf = new MinimalFuture<>();
       
    60                         if (resp.statusCode() != 200) {
       
    61                             cf.completeExceptionally(new IOException("Tunnel failed"));
       
    62                         } else {
       
    63                             connected = true;
       
    64                             cf.complete(null);
       
    65                         }
       
    66                         return cf;
       
    67                     });
       
    68             });
       
    69     }
       
    70 
       
    71     private HttpResponse.BodyProcessor<Void> ignore(int status, HttpHeaders hdrs) {
       
    72         return HttpResponse.BodyProcessor.discard((Void)null);
       
    73     }
       
    74 
       
    75     @Override
       
    76     public void connect() throws IOException, InterruptedException {
       
    77         delegate.connect();
       
    78         HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address);
       
    79         MultiExchange<Void,Void> mul = new MultiExchange<>(req, client, BodyHandler.<Void>discard(null));
       
    80         Exchange<Void> connectExchange = new Exchange<>(req, mul);
       
    81         Response r = connectExchange.responseImpl(delegate);
       
    82         if (r.statusCode() != 200) {
       
    83             throw new IOException("Tunnel failed");
       
    84         }
       
    85         connected = true;
       
    86     }
       
    87 
       
    88     @Override
       
    89     boolean connected() {
       
    90         return connected;
       
    91     }
       
    92 
    49 
    93     protected PlainTunnelingConnection(InetSocketAddress addr,
    50     protected PlainTunnelingConnection(InetSocketAddress addr,
    94                                        InetSocketAddress proxy,
    51                                        InetSocketAddress proxy,
    95                                        HttpClientImpl client) {
    52                                        HttpClientImpl client) {
    96         super(addr, client);
    53         super(addr, client);
    97         this.proxyAddr = proxy;
    54         this.proxyAddr = proxy;
    98         delegate = new PlainHttpConnection(proxy, client);
    55         delegate = new PlainHttpConnection(proxy, client);
    99     }
    56     }
   100 
    57 
   101     @Override
    58     @Override
       
    59     public CompletableFuture<Void> connectAsync() {
       
    60         debug.log(Level.DEBUG, "Connecting plain connection");
       
    61         return delegate.connectAsync()
       
    62             .thenCompose((Void v) -> {
       
    63                 debug.log(Level.DEBUG, "sending HTTP/1.1 CONNECT");
       
    64                 HttpClientImpl client = client();
       
    65                 assert client != null;
       
    66                 HttpRequestImpl req = new HttpRequestImpl("CONNECT", address);
       
    67                 MultiExchange<Void,Void> mulEx = new MultiExchange<>(req, client, discard(null), null);
       
    68                 Exchange<Void> connectExchange = new Exchange<>(req, mulEx);
       
    69 
       
    70                 return connectExchange
       
    71                         .responseAsyncImpl(delegate)
       
    72                         .thenCompose((Response resp) -> {
       
    73                             CompletableFuture<Void> cf = new MinimalFuture<>();
       
    74                             debug.log(Level.DEBUG, "got response: %d", resp.statusCode());
       
    75                             if (resp.statusCode() != 200) {
       
    76                                 cf.completeExceptionally(new IOException(
       
    77                                         "Tunnel failed, got: "+ resp.statusCode()));
       
    78                             } else {
       
    79                                 // get the initial/remaining bytes
       
    80                                 ByteBuffer b = ((Http1Exchange<?>)connectExchange.exchImpl).getBuffer();
       
    81                                 int remaining = b.remaining();
       
    82                                 assert remaining == 0: "Unexpected remaining: " + remaining;
       
    83                                 connected = true;
       
    84                                 cf.complete(null);
       
    85                             }
       
    86                             return cf;
       
    87                         });
       
    88             });
       
    89     }
       
    90 
       
    91     @Override
       
    92     HttpPublisher publisher() { return delegate.publisher(); }
       
    93 
       
    94     @Override
       
    95     boolean connected() {
       
    96         return connected;
       
    97     }
       
    98 
       
    99     @Override
   102     SocketChannel channel() {
   100     SocketChannel channel() {
   103         return delegate.channel();
   101         return delegate.channel();
   104     }
   102     }
   105 
   103 
   106     @Override
   104     @Override
   107     ConnectionPool.CacheKey cacheKey() {
   105     FlowTube getConnectionFlow() {
   108         return new ConnectionPool.CacheKey(null, proxyAddr);
   106         return delegate.getConnectionFlow();
   109     }
   107     }
   110 
   108 
   111     @Override
   109     @Override
   112     long write(ByteBuffer[] buffers, int start, int number) throws IOException {
   110     ConnectionPool.CacheKey cacheKey() {
   113         return delegate.write(buffers, start, number);
   111         return new ConnectionPool.CacheKey(null, proxyAddr);
   114     }
       
   115 
       
   116     @Override
       
   117     long write(ByteBuffer buffer) throws IOException {
       
   118         return delegate.write(buffer);
       
   119     }
   112     }
   120 
   113 
   121     @Override
   114     @Override
   122     public void writeAsync(ByteBufferReference[] buffers) throws IOException {
   115     public void writeAsync(ByteBufferReference[] buffers) throws IOException {
   123         delegate.writeAsync(buffers);
   116         delegate.writeAsync(buffers);
   148     void shutdownOutput() throws IOException {
   141     void shutdownOutput() throws IOException {
   149         delegate.shutdownOutput();
   142         delegate.shutdownOutput();
   150     }
   143     }
   151 
   144 
   152     @Override
   145     @Override
   153     CompletableFuture<Void> whenReceivingResponse() {
       
   154         return delegate.whenReceivingResponse();
       
   155     }
       
   156 
       
   157     @Override
       
   158     protected ByteBuffer readImpl() throws IOException {
       
   159         return delegate.readImpl();
       
   160     }
       
   161 
       
   162     @Override
       
   163     boolean isSecure() {
   146     boolean isSecure() {
   164         return false;
   147         return false;
   165     }
   148     }
   166 
   149 
   167     @Override
   150     @Override
   168     boolean isProxied() {
   151     boolean isProxied() {
   169         return true;
   152         return true;
   170     }
   153     }
   171 
   154 
       
   155     // Support for WebSocket/RawChannelImpl which unfortunately
       
   156     // still depends on synchronous read/writes.
       
   157     // It should be removed when RawChannelImpl moves to using asynchronous APIs.
   172     @Override
   158     @Override
   173     public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver,
   159     DetachedConnectionChannel detachChannel() {
   174             Consumer<Throwable> errorReceiver,
   160         return delegate.detachChannel();
   175             Supplier<ByteBufferReference> readBufferSupplier) {
       
   176         delegate.setAsyncCallbacks(asyncReceiver, errorReceiver, readBufferSupplier);
       
   177     }
       
   178 
       
   179     @Override
       
   180     public void startReading() {
       
   181         delegate.startReading();
       
   182     }
       
   183 
       
   184     @Override
       
   185     public void stopAsyncReading() {
       
   186         delegate.stopAsyncReading();
       
   187     }
       
   188 
       
   189     @Override
       
   190     public void enableCallback() {
       
   191         delegate.enableCallback();
       
   192     }
       
   193 
       
   194     @Override
       
   195     synchronized void configureMode(Mode mode) throws IOException {
       
   196         super.configureMode(mode);
       
   197         delegate.configureMode(mode);
       
   198     }
   161     }
   199 }
   162 }