src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java
branchhttp-client-branch
changeset 55795 074bb951658a
parent 55792 0936888d5a4a
child 55799 c71f52f48d97
equal deleted inserted replaced
55794:08e58c1c75fb 55795:074bb951658a
    29 import java.io.IOException;
    29 import java.io.IOException;
    30 import java.lang.System.Logger.Level;
    30 import java.lang.System.Logger.Level;
    31 import java.net.InetSocketAddress;
    31 import java.net.InetSocketAddress;
    32 import java.nio.ByteBuffer;
    32 import java.nio.ByteBuffer;
    33 import java.nio.channels.SocketChannel;
    33 import java.nio.channels.SocketChannel;
    34 import java.util.Arrays;
       
    35 import java.util.IdentityHashMap;
    34 import java.util.IdentityHashMap;
    36 import java.util.List;
    35 import java.util.List;
    37 import java.util.Map;
    36 import java.util.Map;
    38 import java.util.concurrent.CompletableFuture;
    37 import java.util.concurrent.CompletableFuture;
    39 import java.util.concurrent.ConcurrentLinkedDeque;
    38 import java.util.concurrent.ConcurrentLinkedDeque;
    40 import java.util.concurrent.Flow;
    39 import java.util.concurrent.Flow;
    41 import jdk.incubator.http.HttpClient.Version;
    40 import jdk.incubator.http.HttpClient.Version;
    42 import jdk.incubator.http.internal.common.ByteBufferReference;
       
    43 import jdk.incubator.http.internal.common.Demand;
    41 import jdk.incubator.http.internal.common.Demand;
    44 import jdk.incubator.http.internal.common.FlowTube;
    42 import jdk.incubator.http.internal.common.FlowTube;
    45 import jdk.incubator.http.internal.common.SequentialScheduler;
    43 import jdk.incubator.http.internal.common.SequentialScheduler;
    46 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
    44 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
    47 import jdk.incubator.http.internal.common.Log;
    45 import jdk.incubator.http.internal.common.Log;
    56  *      PlainProxyConnection: plain text proxy connection
    54  *      PlainProxyConnection: plain text proxy connection
    57  *      PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
    55  *      PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
    58  *      AsyncSSLConnection: TLS channel direct to server
    56  *      AsyncSSLConnection: TLS channel direct to server
    59  *      AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
    57  *      AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
    60  */
    58  */
    61 abstract class HttpConnection implements Closeable, AsyncConnection {
    59 abstract class HttpConnection implements Closeable {
    62 
    60 
    63     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
    61     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
    64     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
    62     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
    65     final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger(
    63     final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger(
    66             () -> "HttpConnection(SocketTube(?))", DEBUG);
    64             () -> "HttpConnection(SocketTube(?))", DEBUG);
   121     final boolean isOpen() {
   119     final boolean isOpen() {
   122         return channel().isOpen() &&
   120         return channel().isOpen() &&
   123                 (connected() ? !getConnectionFlow().isFinished() : true);
   121                 (connected() ? !getConnectionFlow().isFinished() : true);
   124     }
   122     }
   125 
   123 
   126     interface HttpPublisher extends FlowTube.TubePublisher { }
   124     interface HttpPublisher extends FlowTube.TubePublisher {
       
   125         void enqueue(List<ByteBuffer> buffers) throws IOException;
       
   126         void enqueueUnordered(List<ByteBuffer> buffers) throws IOException;
       
   127         void signalEnqueued() throws IOException;
       
   128     }
   127 
   129 
   128     /**
   130     /**
   129      * Returns the HTTP publisher associated with this connection.  May be null
   131      * Returns the HTTP publisher associated with this connection.  May be null
   130      * if invoked before connecting.
   132      * if invoked before connecting.
   131      */
   133      */
   287     // It should be removed when RawChannelImpl moves to using asynchronous APIs.
   289     // It should be removed when RawChannelImpl moves to using asynchronous APIs.
   288     abstract DetachedConnectionChannel detachChannel();
   290     abstract DetachedConnectionChannel detachChannel();
   289 
   291 
   290     abstract FlowTube getConnectionFlow();
   292     abstract FlowTube getConnectionFlow();
   291 
   293 
   292     // This queue and publisher are temporary, and only needed because
   294     /**
   293     // the calling code still uses writeAsync/flushAsync
   295      * A publisher that makes it possible to publish (write)
       
   296      * ordered (normal priority) and unordered (high priority)
       
   297      * buffers downstream.
       
   298      */
   294     final class PlainHttpPublisher implements HttpPublisher {
   299     final class PlainHttpPublisher implements HttpPublisher {
   295         final Object reading;
   300         final Object reading;
   296         PlainHttpPublisher() {
   301         PlainHttpPublisher() {
   297             this(new Object());
   302             this(new Object());
   298         }
   303         }
   312                 if (subscription == null) {
   317                 if (subscription == null) {
   313                     subscription = new HttpWriteSubscription();
   318                     subscription = new HttpWriteSubscription();
   314                 }
   319                 }
   315                 this.subscriber = subscriber;
   320                 this.subscriber = subscriber;
   316             }
   321             }
       
   322             // TODO: should we do this in the flow?
   317             subscriber.onSubscribe(subscription);
   323             subscriber.onSubscribe(subscription);
   318             signal();
   324             signal();
   319         }
   325         }
   320 
   326 
   321         void flushTask(DeferredCompleter completer) {
   327         void flushTask(DeferredCompleter completer) {
   362                     subscriber.onNext(elem);
   368                     subscriber.onNext(elem);
   363                 }
   369                 }
   364             }
   370             }
   365         }
   371         }
   366 
   372 
   367         public void writeAsync(ByteBufferReference[] buffers) throws IOException {
   373         @Override
   368             List<ByteBuffer> l = Arrays.asList(ByteBufferReference.toBuffers(buffers));
   374         public void enqueue(List<ByteBuffer> buffers) throws IOException {
   369             queue.add(l);
   375             queue.add(buffers);
   370             int bytes = l.stream().mapToInt(ByteBuffer::remaining).sum();
   376             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
   371             debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes);
   377             debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes);
   372         }
   378         }
   373 
   379 
   374         public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
   380         @Override
       
   381         public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
   375             // Unordered frames are sent before existing frames.
   382             // Unordered frames are sent before existing frames.
   376             List<ByteBuffer> l = Arrays.asList(ByteBufferReference.toBuffers(buffers));
   383             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
   377             int bytes = l.stream().mapToInt(ByteBuffer::remaining).sum();
   384             queue.addFirst(buffers);
   378             queue.addFirst(l);
       
   379             debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes);
   385             debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes);
   380         }
   386         }
   381 
   387 
   382         public void flushAsync() throws IOException {
   388         @Override
   383             // ### Remove flushAsync
   389         public void signalEnqueued() throws IOException {
   384             // no-op. Should not be needed now with Tube.
       
   385             // Tube.write will initiate the low-level write
       
   386             debug.log(Level.DEBUG, "signalling the publisher of the write queue");
   390             debug.log(Level.DEBUG, "signalling the publisher of the write queue");
   387             signal();
   391             signal();
   388         }
   392         }
   389     }
   393     }
   390 
   394