29 import java.io.IOException; |
29 import java.io.IOException; |
30 import java.lang.System.Logger.Level; |
30 import java.lang.System.Logger.Level; |
31 import java.net.InetSocketAddress; |
31 import java.net.InetSocketAddress; |
32 import java.nio.ByteBuffer; |
32 import java.nio.ByteBuffer; |
33 import java.nio.channels.SocketChannel; |
33 import java.nio.channels.SocketChannel; |
34 import java.util.Arrays; |
|
35 import java.util.IdentityHashMap; |
34 import java.util.IdentityHashMap; |
36 import java.util.List; |
35 import java.util.List; |
37 import java.util.Map; |
36 import java.util.Map; |
38 import java.util.concurrent.CompletableFuture; |
37 import java.util.concurrent.CompletableFuture; |
39 import java.util.concurrent.ConcurrentLinkedDeque; |
38 import java.util.concurrent.ConcurrentLinkedDeque; |
40 import java.util.concurrent.Flow; |
39 import java.util.concurrent.Flow; |
41 import jdk.incubator.http.HttpClient.Version; |
40 import jdk.incubator.http.HttpClient.Version; |
42 import jdk.incubator.http.internal.common.ByteBufferReference; |
|
43 import jdk.incubator.http.internal.common.Demand; |
41 import jdk.incubator.http.internal.common.Demand; |
44 import jdk.incubator.http.internal.common.FlowTube; |
42 import jdk.incubator.http.internal.common.FlowTube; |
45 import jdk.incubator.http.internal.common.SequentialScheduler; |
43 import jdk.incubator.http.internal.common.SequentialScheduler; |
46 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter; |
44 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter; |
47 import jdk.incubator.http.internal.common.Log; |
45 import jdk.incubator.http.internal.common.Log; |
56 * PlainProxyConnection: plain text proxy connection |
54 * PlainProxyConnection: plain text proxy connection |
57 * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server |
55 * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server |
58 * AsyncSSLConnection: TLS channel direct to server |
56 * AsyncSSLConnection: TLS channel direct to server |
59 * AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel |
57 * AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel |
60 */ |
58 */ |
61 abstract class HttpConnection implements Closeable, AsyncConnection { |
59 abstract class HttpConnection implements Closeable { |
62 |
60 |
63 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. |
61 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. |
64 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); |
62 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); |
65 final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger( |
63 final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger( |
66 () -> "HttpConnection(SocketTube(?))", DEBUG); |
64 () -> "HttpConnection(SocketTube(?))", DEBUG); |
121 final boolean isOpen() { |
119 final boolean isOpen() { |
122 return channel().isOpen() && |
120 return channel().isOpen() && |
123 (connected() ? !getConnectionFlow().isFinished() : true); |
121 (connected() ? !getConnectionFlow().isFinished() : true); |
124 } |
122 } |
125 |
123 |
126 interface HttpPublisher extends FlowTube.TubePublisher { } |
124 interface HttpPublisher extends FlowTube.TubePublisher { |
|
125 void enqueue(List<ByteBuffer> buffers) throws IOException; |
|
126 void enqueueUnordered(List<ByteBuffer> buffers) throws IOException; |
|
127 void signalEnqueued() throws IOException; |
|
128 } |
127 |
129 |
128 /** |
130 /** |
129 * Returns the HTTP publisher associated with this connection. May be null |
131 * Returns the HTTP publisher associated with this connection. May be null |
130 * if invoked before connecting. |
132 * if invoked before connecting. |
131 */ |
133 */ |
287 // It should be removed when RawChannelImpl moves to using asynchronous APIs. |
289 // It should be removed when RawChannelImpl moves to using asynchronous APIs. |
288 abstract DetachedConnectionChannel detachChannel(); |
290 abstract DetachedConnectionChannel detachChannel(); |
289 |
291 |
290 abstract FlowTube getConnectionFlow(); |
292 abstract FlowTube getConnectionFlow(); |
291 |
293 |
292 // This queue and publisher are temporary, and only needed because |
294 /** |
293 // the calling code still uses writeAsync/flushAsync |
295 * A publisher that makes it possible to publish (write) |
|
296 * ordered (normal priority) and unordered (high priority) |
|
297 * buffers downstream. |
|
298 */ |
294 final class PlainHttpPublisher implements HttpPublisher { |
299 final class PlainHttpPublisher implements HttpPublisher { |
295 final Object reading; |
300 final Object reading; |
296 PlainHttpPublisher() { |
301 PlainHttpPublisher() { |
297 this(new Object()); |
302 this(new Object()); |
298 } |
303 } |
312 if (subscription == null) { |
317 if (subscription == null) { |
313 subscription = new HttpWriteSubscription(); |
318 subscription = new HttpWriteSubscription(); |
314 } |
319 } |
315 this.subscriber = subscriber; |
320 this.subscriber = subscriber; |
316 } |
321 } |
|
322 // TODO: should we do this in the flow? |
317 subscriber.onSubscribe(subscription); |
323 subscriber.onSubscribe(subscription); |
318 signal(); |
324 signal(); |
319 } |
325 } |
320 |
326 |
321 void flushTask(DeferredCompleter completer) { |
327 void flushTask(DeferredCompleter completer) { |
362 subscriber.onNext(elem); |
368 subscriber.onNext(elem); |
363 } |
369 } |
364 } |
370 } |
365 } |
371 } |
366 |
372 |
367 public void writeAsync(ByteBufferReference[] buffers) throws IOException { |
373 @Override |
368 List<ByteBuffer> l = Arrays.asList(ByteBufferReference.toBuffers(buffers)); |
374 public void enqueue(List<ByteBuffer> buffers) throws IOException { |
369 queue.add(l); |
375 queue.add(buffers); |
370 int bytes = l.stream().mapToInt(ByteBuffer::remaining).sum(); |
376 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); |
371 debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes); |
377 debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes); |
372 } |
378 } |
373 |
379 |
374 public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException { |
380 @Override |
|
381 public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException { |
375 // Unordered frames are sent before existing frames. |
382 // Unordered frames are sent before existing frames. |
376 List<ByteBuffer> l = Arrays.asList(ByteBufferReference.toBuffers(buffers)); |
383 int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum(); |
377 int bytes = l.stream().mapToInt(ByteBuffer::remaining).sum(); |
384 queue.addFirst(buffers); |
378 queue.addFirst(l); |
|
379 debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes); |
385 debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes); |
380 } |
386 } |
381 |
387 |
382 public void flushAsync() throws IOException { |
388 @Override |
383 // ### Remove flushAsync |
389 public void signalEnqueued() throws IOException { |
384 // no-op. Should not be needed now with Tube. |
|
385 // Tube.write will initiate the low-level write |
|
386 debug.log(Level.DEBUG, "signalling the publisher of the write queue"); |
390 debug.log(Level.DEBUG, "signalling the publisher of the write queue"); |
387 signal(); |
391 signal(); |
388 } |
392 } |
389 } |
393 } |
390 |
394 |