23 * questions. |
23 * questions. |
24 */ |
24 */ |
25 |
25 |
26 package jdk.incubator.http; |
26 package jdk.incubator.http; |
27 |
27 |
28 import jdk.incubator.http.internal.common.ByteBufferReference; |
|
29 import jdk.incubator.http.internal.common.MinimalFuture; |
|
30 import jdk.incubator.http.HttpResponse.BodyHandler; |
|
31 |
|
32 import java.io.IOException; |
28 import java.io.IOException; |
|
29 import java.lang.System.Logger.Level; |
33 import java.net.InetSocketAddress; |
30 import java.net.InetSocketAddress; |
34 import java.nio.ByteBuffer; |
31 import java.nio.ByteBuffer; |
35 import java.nio.channels.SocketChannel; |
32 import java.nio.channels.SocketChannel; |
36 import java.util.concurrent.CompletableFuture; |
33 import java.util.concurrent.CompletableFuture; |
37 import java.util.function.Consumer; |
34 import jdk.incubator.http.internal.common.ByteBufferReference; |
38 import java.util.function.Supplier; |
35 import jdk.incubator.http.internal.common.FlowTube; |
|
36 import jdk.incubator.http.internal.common.MinimalFuture; |
|
37 import static jdk.incubator.http.HttpResponse.BodyHandler.discard; |
39 |
38 |
40 /** |
39 /** |
41 * A plain text socket tunnel through a proxy. Uses "CONNECT" but does not |
40 * A plain text socket tunnel through a proxy. Uses "CONNECT" but does not |
42 * encrypt. Used by WebSocket, as well as HTTP over SSL + Proxy. |
41 * encrypt. Used by WebSocket, as well as HTTP over SSL + Proxy. |
43 * Wrapped in SSLTunnelConnection or AsyncSSLTunnelConnection for encryption. |
42 * Wrapped in SSLTunnelConnection or AsyncSSLTunnelConnection for encryption. |
44 */ |
43 */ |
45 class PlainTunnelingConnection extends HttpConnection implements AsyncConnection { |
44 final class PlainTunnelingConnection extends HttpConnection { |
46 |
45 |
47 final PlainHttpConnection delegate; |
46 final PlainHttpConnection delegate; |
48 protected final InetSocketAddress proxyAddr; |
47 protected final InetSocketAddress proxyAddr; |
49 private volatile boolean connected; |
48 private volatile boolean connected; |
50 |
|
51 @Override |
|
52 public CompletableFuture<Void> connectAsync() { |
|
53 return delegate.connectAsync() |
|
54 .thenCompose((Void v) -> { |
|
55 HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address); |
|
56 MultiExchange<Void,Void> mconnectExchange = new MultiExchange<>(req, client, this::ignore); |
|
57 return mconnectExchange.responseAsync() |
|
58 .thenCompose((HttpResponseImpl<Void> resp) -> { |
|
59 CompletableFuture<Void> cf = new MinimalFuture<>(); |
|
60 if (resp.statusCode() != 200) { |
|
61 cf.completeExceptionally(new IOException("Tunnel failed")); |
|
62 } else { |
|
63 connected = true; |
|
64 cf.complete(null); |
|
65 } |
|
66 return cf; |
|
67 }); |
|
68 }); |
|
69 } |
|
70 |
|
71 private HttpResponse.BodyProcessor<Void> ignore(int status, HttpHeaders hdrs) { |
|
72 return HttpResponse.BodyProcessor.discard((Void)null); |
|
73 } |
|
74 |
|
75 @Override |
|
76 public void connect() throws IOException, InterruptedException { |
|
77 delegate.connect(); |
|
78 HttpRequestImpl req = new HttpRequestImpl("CONNECT", client, address); |
|
79 MultiExchange<Void,Void> mul = new MultiExchange<>(req, client, BodyHandler.<Void>discard(null)); |
|
80 Exchange<Void> connectExchange = new Exchange<>(req, mul); |
|
81 Response r = connectExchange.responseImpl(delegate); |
|
82 if (r.statusCode() != 200) { |
|
83 throw new IOException("Tunnel failed"); |
|
84 } |
|
85 connected = true; |
|
86 } |
|
87 |
|
88 @Override |
|
89 boolean connected() { |
|
90 return connected; |
|
91 } |
|
92 |
49 |
93 protected PlainTunnelingConnection(InetSocketAddress addr, |
50 protected PlainTunnelingConnection(InetSocketAddress addr, |
94 InetSocketAddress proxy, |
51 InetSocketAddress proxy, |
95 HttpClientImpl client) { |
52 HttpClientImpl client) { |
96 super(addr, client); |
53 super(addr, client); |
97 this.proxyAddr = proxy; |
54 this.proxyAddr = proxy; |
98 delegate = new PlainHttpConnection(proxy, client); |
55 delegate = new PlainHttpConnection(proxy, client); |
99 } |
56 } |
100 |
57 |
101 @Override |
58 @Override |
|
59 public CompletableFuture<Void> connectAsync() { |
|
60 debug.log(Level.DEBUG, "Connecting plain connection"); |
|
61 return delegate.connectAsync() |
|
62 .thenCompose((Void v) -> { |
|
63 debug.log(Level.DEBUG, "sending HTTP/1.1 CONNECT"); |
|
64 HttpClientImpl client = client(); |
|
65 assert client != null; |
|
66 HttpRequestImpl req = new HttpRequestImpl("CONNECT", address); |
|
67 MultiExchange<Void,Void> mulEx = new MultiExchange<>(req, client, discard(null), null); |
|
68 Exchange<Void> connectExchange = new Exchange<>(req, mulEx); |
|
69 |
|
70 return connectExchange |
|
71 .responseAsyncImpl(delegate) |
|
72 .thenCompose((Response resp) -> { |
|
73 CompletableFuture<Void> cf = new MinimalFuture<>(); |
|
74 debug.log(Level.DEBUG, "got response: %d", resp.statusCode()); |
|
75 if (resp.statusCode() != 200) { |
|
76 cf.completeExceptionally(new IOException( |
|
77 "Tunnel failed, got: "+ resp.statusCode())); |
|
78 } else { |
|
79 // get the initial/remaining bytes |
|
80 ByteBuffer b = ((Http1Exchange<?>)connectExchange.exchImpl).getBuffer(); |
|
81 int remaining = b.remaining(); |
|
82 assert remaining == 0: "Unexpected remaining: " + remaining; |
|
83 connected = true; |
|
84 cf.complete(null); |
|
85 } |
|
86 return cf; |
|
87 }); |
|
88 }); |
|
89 } |
|
90 |
|
91 @Override |
|
92 HttpPublisher publisher() { return delegate.publisher(); } |
|
93 |
|
94 @Override |
|
95 boolean connected() { |
|
96 return connected; |
|
97 } |
|
98 |
|
99 @Override |
102 SocketChannel channel() { |
100 SocketChannel channel() { |
103 return delegate.channel(); |
101 return delegate.channel(); |
104 } |
102 } |
105 |
103 |
106 @Override |
104 @Override |
107 ConnectionPool.CacheKey cacheKey() { |
105 FlowTube getConnectionFlow() { |
108 return new ConnectionPool.CacheKey(null, proxyAddr); |
106 return delegate.getConnectionFlow(); |
109 } |
107 } |
110 |
108 |
111 @Override |
109 @Override |
112 long write(ByteBuffer[] buffers, int start, int number) throws IOException { |
110 ConnectionPool.CacheKey cacheKey() { |
113 return delegate.write(buffers, start, number); |
111 return new ConnectionPool.CacheKey(null, proxyAddr); |
114 } |
|
115 |
|
116 @Override |
|
117 long write(ByteBuffer buffer) throws IOException { |
|
118 return delegate.write(buffer); |
|
119 } |
112 } |
120 |
113 |
121 @Override |
114 @Override |
122 public void writeAsync(ByteBufferReference[] buffers) throws IOException { |
115 public void writeAsync(ByteBufferReference[] buffers) throws IOException { |
123 delegate.writeAsync(buffers); |
116 delegate.writeAsync(buffers); |
148 void shutdownOutput() throws IOException { |
141 void shutdownOutput() throws IOException { |
149 delegate.shutdownOutput(); |
142 delegate.shutdownOutput(); |
150 } |
143 } |
151 |
144 |
152 @Override |
145 @Override |
153 CompletableFuture<Void> whenReceivingResponse() { |
|
154 return delegate.whenReceivingResponse(); |
|
155 } |
|
156 |
|
157 @Override |
|
158 protected ByteBuffer readImpl() throws IOException { |
|
159 return delegate.readImpl(); |
|
160 } |
|
161 |
|
162 @Override |
|
163 boolean isSecure() { |
146 boolean isSecure() { |
164 return false; |
147 return false; |
165 } |
148 } |
166 |
149 |
167 @Override |
150 @Override |
168 boolean isProxied() { |
151 boolean isProxied() { |
169 return true; |
152 return true; |
170 } |
153 } |
171 |
154 |
|
155 // Support for WebSocket/RawChannelImpl which unfortunately |
|
156 // still depends on synchronous read/writes. |
|
157 // It should be removed when RawChannelImpl moves to using asynchronous APIs. |
172 @Override |
158 @Override |
173 public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver, |
159 DetachedConnectionChannel detachChannel() { |
174 Consumer<Throwable> errorReceiver, |
160 return delegate.detachChannel(); |
175 Supplier<ByteBufferReference> readBufferSupplier) { |
|
176 delegate.setAsyncCallbacks(asyncReceiver, errorReceiver, readBufferSupplier); |
|
177 } |
|
178 |
|
179 @Override |
|
180 public void startReading() { |
|
181 delegate.startReading(); |
|
182 } |
|
183 |
|
184 @Override |
|
185 public void stopAsyncReading() { |
|
186 delegate.stopAsyncReading(); |
|
187 } |
|
188 |
|
189 @Override |
|
190 public void enableCallback() { |
|
191 delegate.enableCallback(); |
|
192 } |
|
193 |
|
194 @Override |
|
195 synchronized void configureMode(Mode mode) throws IOException { |
|
196 super.configureMode(mode); |
|
197 delegate.configureMode(mode); |
|
198 } |
161 } |
199 } |
162 } |