src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java
branchhttp-client-branch
changeset 55763 634d8e14c172
parent 47216 71c04702a3d5
child 55764 34d7cc00f87a
equal deleted inserted replaced
55762:e947a3a50a95 55763:634d8e14c172
    26 package jdk.incubator.http;
    26 package jdk.incubator.http;
    27 
    27 
    28 import javax.net.ssl.SSLParameters;
    28 import javax.net.ssl.SSLParameters;
    29 import java.io.Closeable;
    29 import java.io.Closeable;
    30 import java.io.IOException;
    30 import java.io.IOException;
       
    31 import java.lang.System.Logger.Level;
    31 import java.net.InetSocketAddress;
    32 import java.net.InetSocketAddress;
    32 import java.nio.ByteBuffer;
    33 import java.nio.ByteBuffer;
    33 import java.nio.channels.SocketChannel;
    34 import java.nio.channels.SocketChannel;
       
    35 import java.util.Arrays;
       
    36 import java.util.IdentityHashMap;
       
    37 import java.util.List;
       
    38 import java.util.Map;
    34 import java.util.concurrent.CompletableFuture;
    39 import java.util.concurrent.CompletableFuture;
    35 
    40 import java.util.concurrent.ConcurrentLinkedDeque;
       
    41 import java.util.concurrent.Flow;
       
    42 import jdk.incubator.http.HttpClient.Version;
    36 import jdk.incubator.http.internal.common.ByteBufferReference;
    43 import jdk.incubator.http.internal.common.ByteBufferReference;
       
    44 import jdk.incubator.http.internal.common.Demand;
       
    45 import jdk.incubator.http.internal.common.FlowTube;
       
    46 import jdk.incubator.http.internal.common.SequentialScheduler;
       
    47 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
       
    48 import jdk.incubator.http.internal.common.Log;
       
    49 import jdk.incubator.http.internal.common.Utils;
       
    50 import static jdk.incubator.http.HttpClient.Version.HTTP_2;
    37 
    51 
    38 /**
    52 /**
    39  * Wraps socket channel layer and takes care of SSL also.
    53  * Wraps socket channel layer and takes care of SSL also.
    40  *
    54  *
    41  * Subtypes are:
    55  * Subtypes are:
    42  *      PlainHttpConnection: regular direct TCP connection to server
    56  *      PlainHttpConnection: regular direct TCP connection to server
    43  *      PlainProxyConnection: plain text proxy connection
    57  *      PlainProxyConnection: plain text proxy connection
    44  *      PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
    58  *      PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
    45  *      SSLConnection: TLS channel direct to server
    59  *      AsyncSSLConnection: TLS channel direct to server
    46  *      SSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
    60  *      AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
    47  */
    61  */
    48 abstract class HttpConnection implements Closeable {
    62 abstract class HttpConnection implements Closeable, AsyncConnection {
    49 
    63 
    50     enum Mode {
    64     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
    51         BLOCKING,
    65     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
    52         NON_BLOCKING,
    66     final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger(
    53         ASYNC
    67             () -> "HttpConnection(SocketTube(?))", DEBUG);
    54     }
    68 
    55 
    69     /** The address this connection is connected to. Could be a server or a proxy. */
    56     protected Mode mode;
       
    57 
       
    58     // address we are connected to. Could be a server or a proxy
       
    59     final InetSocketAddress address;
    70     final InetSocketAddress address;
    60     final HttpClientImpl client;
    71     private final HttpClientImpl client;
       
    72     private final TrailingOperations trailingOperations;
    61 
    73 
    62     HttpConnection(InetSocketAddress address, HttpClientImpl client) {
    74     HttpConnection(InetSocketAddress address, HttpClientImpl client) {
    63         this.address = address;
    75         this.address = address;
    64         this.client = client;
    76         this.client = client;
    65     }
    77         trailingOperations = new TrailingOperations();
       
    78     }
       
    79 
       
    80     private static final class TrailingOperations {
       
    81         private final Map<CompletableFuture<?>, Boolean> operations =
       
    82                 new IdentityHashMap<>();
       
    83         void add(CompletableFuture<?> cf) {
       
    84             synchronized(operations) {
       
    85                 cf.whenComplete((r,t)-> remove(cf));
       
    86                 operations.put(cf, Boolean.TRUE);
       
    87             }
       
    88         }
       
    89         boolean remove(CompletableFuture<?> cf) {
       
    90             synchronized(operations) {
       
    91                 return operations.remove(cf);
       
    92             }
       
    93         }
       
    94     }
       
    95 
       
    96     final void addTrailingOperation(CompletableFuture<?> cf) {
       
    97         trailingOperations.add(cf);
       
    98     }
       
    99 
       
   100     final void removeTrailingOperation(CompletableFuture<?> cf) {
       
   101         trailingOperations.remove(cf);
       
   102     }
       
   103 
       
   104     final HttpClientImpl client() {
       
   105         return client;
       
   106     }
       
   107 
       
   108     //public abstract void connect() throws IOException, InterruptedException;
       
   109 
       
   110     public abstract CompletableFuture<Void> connectAsync();
       
   111 
       
   112     /** Tells whether, or not, this connection is connected to its destination. */
       
   113     abstract boolean connected();
       
   114 
       
   115     /** Tells whether, or not, this connection is secure ( over SSL ) */
       
   116     abstract boolean isSecure();
       
   117 
       
   118     /** Tells whether, or not, this connection is proxied. */
       
   119     abstract boolean isProxied();
       
   120 
       
   121     /** Tells whether, or not, this connection is open. */
       
   122     final boolean isOpen() {
       
   123         return channel().isOpen() &&
       
   124                 (connected() ? !getConnectionFlow().isFinished() : true);
       
   125     }
       
   126 
       
   127     interface HttpPublisher extends FlowTube.TubePublisher { }
    66 
   128 
    67     /**
   129     /**
    68      * Public API to this class. addr is the ultimate destination. Any proxies
   130      * Returns the HTTP publisher associated with this connection.  May be null
    69      * etc are figured out from the request. Returns an instance of one of the
   131      * if invoked before connecting.
    70      * following
   132      */
    71      *      PlainHttpConnection
   133     abstract HttpPublisher publisher();
    72      *      PlainTunnelingConnection
   134 
    73      *      SSLConnection
   135     /**
    74      *      SSLTunnelConnection
   136      * Factory for retrieving HttpConnections. A connection can be retrieved
       
   137      * from the connection pool, or a new one created if none available.
    75      *
   138      *
    76      * When object returned, connect() or connectAsync() must be called, which
   139      * The given {@code addr} is the ultimate destination. Any proxies,
    77      * when it returns/completes, the connection is usable for requests.
   140      * etc, are determined from the request. Returns a concrete instance which
    78      */
   141      * is one of the following:
    79     public static HttpConnection getConnection(
   142      *      {@link PlainHttpConnection}
    80             InetSocketAddress addr, HttpClientImpl client, HttpRequestImpl request)
   143      *      {@link PlainTunnelingConnection}
    81     {
   144      *      {@link SSLConnection}
    82         return getConnectionImpl(addr, client, request, false);
   145      *      {@link SSLTunnelConnection}
    83     }
   146      *
    84 
   147      * The returned connection, if not from the connection pool, must have its,
    85     /**
   148      * connect() or connectAsync() method invoked, which ( when it completes
    86      * Called specifically to get an async connection for HTTP/2 over SSL.
   149      * successfully ) renders the connection usable for requests.
    87      */
   150      */
    88     public static HttpConnection getConnection(InetSocketAddress addr,
   151     public static HttpConnection getConnection(InetSocketAddress addr,
    89         HttpClientImpl client, HttpRequestImpl request, boolean isHttp2) {
   152                                                HttpClientImpl client,
    90 
   153                                                HttpRequestImpl request,
    91         return getConnectionImpl(addr, client, request, isHttp2);
   154                                                Version version) {
    92     }
   155         HttpConnection c = null;
    93 
   156         InetSocketAddress proxy = request.proxy(client);
    94     public abstract void connect() throws IOException, InterruptedException;
   157         if (proxy != null && proxy.isUnresolved()) {
    95 
   158             // The default proxy selector may select a proxy whose  address is
    96     public abstract CompletableFuture<Void> connectAsync();
   159             // unresolved. We must resolve the address before connecting to it.
    97 
   160             proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort());
    98     /**
   161         }
    99      * Returns whether this connection is connected to its destination
   162         boolean secure = request.secure();
   100      */
   163         ConnectionPool pool = client.connectionPool();
   101     abstract boolean connected();
   164 
   102 
   165         if (!secure) {
   103     abstract boolean isSecure();
   166             c = pool.getConnection(false, addr, proxy);
   104 
   167             if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) {
   105     abstract boolean isProxied();
   168                 final HttpConnection conn = c;
   106 
   169                 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow()
   107     /**
   170                             + ": plain connection retrieved from HTTP/1.1 pool");
   108      * Completes when the first byte of the response is available to be read.
   171                 return c;
   109      */
   172             } else {
   110     abstract CompletableFuture<Void> whenReceivingResponse();
   173                 return getPlainConnection(addr, proxy, request, client);
   111 
   174             }
   112     final boolean isOpen() {
   175         } else {  // secure
   113         return channel().isOpen();
   176             if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
       
   177                 c = pool.getConnection(true, addr, proxy);
       
   178             }
       
   179             if (c != null && c.isOpen()) {
       
   180                 final HttpConnection conn = c;
       
   181                 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow()
       
   182                             + ": SSL connection retrieved from HTTP/1.1 pool");
       
   183                 return c;
       
   184             } else {
       
   185                 String[] alpn = null;
       
   186                 if (version == HTTP_2) {
       
   187                     alpn = new String[] { "h2", "http/1.1" };
       
   188                 }
       
   189                 return getSSLConnection(addr, proxy, alpn, client);
       
   190             }
       
   191         }
       
   192     }
       
   193 
       
   194     private static HttpConnection getSSLConnection(InetSocketAddress addr,
       
   195                                                    InetSocketAddress proxy,
       
   196                                                    String[] alpn,
       
   197                                                    HttpClientImpl client) {
       
   198         if (proxy != null)
       
   199             return new AsyncSSLTunnelConnection(addr, client, alpn, proxy);
       
   200         else
       
   201             return new AsyncSSLConnection(addr, client, alpn);
   114     }
   202     }
   115 
   203 
   116     /* Returns either a plain HTTP connection or a plain tunnelling connection
   204     /* Returns either a plain HTTP connection or a plain tunnelling connection
   117      * for proxied WebSocket */
   205      * for proxied WebSocket */
   118     private static HttpConnection getPlainConnection(InetSocketAddress addr,
   206     private static HttpConnection getPlainConnection(InetSocketAddress addr,
   119                                                      InetSocketAddress proxy,
   207                                                      InetSocketAddress proxy,
   120                                                      HttpRequestImpl request,
   208                                                      HttpRequestImpl request,
   121                                                      HttpClientImpl client) {
   209                                                      HttpClientImpl client) {
   122         if (request.isWebSocket() && proxy != null) {
   210         if (request.isWebSocket() && proxy != null)
   123             return new PlainTunnelingConnection(addr, proxy, client);
   211             return new PlainTunnelingConnection(addr, proxy, client);
   124         } else {
   212 
   125             if (proxy == null) {
   213         if (proxy == null)
   126                 return new PlainHttpConnection(addr, client);
   214             return new PlainHttpConnection(addr, client);
   127             } else {
   215         else
   128                 return new PlainProxyConnection(proxy, client);
   216             return new PlainProxyConnection(proxy, client);
   129             }
   217     }
   130         }
   218 
   131     }
   219     void closeOrReturnToCache(HttpHeaders hdrs) {
   132 
       
   133     private static HttpConnection getSSLConnection(InetSocketAddress addr,
       
   134             InetSocketAddress proxy, HttpRequestImpl request,
       
   135             String[] alpn, boolean isHttp2, HttpClientImpl client)
       
   136     {
       
   137         if (proxy != null) {
       
   138             if (!isHttp2) {
       
   139                 return new SSLTunnelConnection(addr, client, proxy);
       
   140             } else {
       
   141                 return new AsyncSSLTunnelConnection(addr, client, alpn, proxy);
       
   142             }
       
   143         } else if (!isHttp2) {
       
   144             return new SSLConnection(addr, client, alpn);
       
   145         } else {
       
   146             return new AsyncSSLConnection(addr, client, alpn);
       
   147         }
       
   148     }
       
   149 
       
   150     /**
       
   151      * Main factory method.   Gets a HttpConnection, either cached or new if
       
   152      * none available.
       
   153      */
       
   154     private static HttpConnection getConnectionImpl(InetSocketAddress addr,
       
   155             HttpClientImpl client,
       
   156             HttpRequestImpl request, boolean isHttp2)
       
   157     {
       
   158         HttpConnection c = null;
       
   159         InetSocketAddress proxy = request.proxy(client);
       
   160         if (proxy != null && proxy.isUnresolved()) {
       
   161             // The default proxy selector may select a proxy whose
       
   162             // address is unresolved. We must resolve the address
       
   163             // before using it to connect.
       
   164             proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort());
       
   165         }
       
   166         boolean secure = request.secure();
       
   167         ConnectionPool pool = client.connectionPool();
       
   168         String[] alpn =  null;
       
   169 
       
   170         if (secure && isHttp2) {
       
   171             alpn = new String[2];
       
   172             alpn[0] = "h2";
       
   173             alpn[1] = "http/1.1";
       
   174         }
       
   175 
       
   176         if (!secure) {
       
   177             c = pool.getConnection(false, addr, proxy);
       
   178             if (c != null) {
       
   179                 return c;
       
   180             } else {
       
   181                 return getPlainConnection(addr, proxy, request, client);
       
   182             }
       
   183         } else {
       
   184             if (!isHttp2) { // if http2 we don't cache connections
       
   185                 c = pool.getConnection(true, addr, proxy);
       
   186             }
       
   187             if (c != null) {
       
   188                 return c;
       
   189             } else {
       
   190                 return getSSLConnection(addr, proxy, request, alpn, isHttp2, client);
       
   191             }
       
   192         }
       
   193     }
       
   194 
       
   195     void returnToCache(HttpHeaders hdrs) {
       
   196         if (hdrs == null) {
   220         if (hdrs == null) {
   197             // the connection was closed by server
   221             // the connection was closed by server, eof
   198             close();
   222             close();
   199             return;
   223             return;
   200         }
   224         }
   201         if (!isOpen()) {
   225         if (!isOpen()) {
       
   226             return;
       
   227         }
       
   228         HttpClientImpl client = client();
       
   229         if (client == null) {
       
   230             close();
   202             return;
   231             return;
   203         }
   232         }
   204         ConnectionPool pool = client.connectionPool();
   233         ConnectionPool pool = client.connectionPool();
   205         boolean keepAlive = hdrs.firstValue("Connection")
   234         boolean keepAlive = hdrs.firstValue("Connection")
   206                 .map((s) -> !s.equalsIgnoreCase("close"))
   235                 .map((s) -> !s.equalsIgnoreCase("close"))
   207                 .orElse(true);
   236                 .orElse(true);
   208 
   237 
   209         if (keepAlive) {
   238         if (keepAlive) {
       
   239             Log.logTrace("Returning connection to the pool: {0}", this);
   210             pool.returnToPool(this);
   240             pool.returnToPool(this);
   211         } else {
   241         } else {
   212             close();
   242             close();
   213         }
   243         }
   214     }
   244     }
   215 
   245 
   216     /**
       
   217      * Also check that the number of bytes written is what was expected. This
       
   218      * could be different if the buffer is user-supplied and its internal
       
   219      * pointers were manipulated in a race condition.
       
   220      */
       
   221     final void checkWrite(long expected, ByteBuffer buffer) throws IOException {
       
   222         long written = write(buffer);
       
   223         if (written != expected) {
       
   224             throw new IOException("incorrect number of bytes written");
       
   225         }
       
   226     }
       
   227 
       
   228     final void checkWrite(long expected,
       
   229                           ByteBuffer[] buffers,
       
   230                           int start,
       
   231                           int length)
       
   232         throws IOException
       
   233     {
       
   234         long written = write(buffers, start, length);
       
   235         if (written != expected) {
       
   236             throw new IOException("incorrect number of bytes written");
       
   237         }
       
   238     }
       
   239 
       
   240     abstract SocketChannel channel();
   246     abstract SocketChannel channel();
   241 
   247 
   242     final InetSocketAddress address() {
   248     final InetSocketAddress address() {
   243         return address;
   249         return address;
   244     }
       
   245 
       
   246     synchronized void configureMode(Mode mode) throws IOException {
       
   247         this.mode = mode;
       
   248         if (mode == Mode.BLOCKING) {
       
   249             channel().configureBlocking(true);
       
   250         } else {
       
   251             channel().configureBlocking(false);
       
   252         }
       
   253     }
       
   254 
       
   255     synchronized Mode getMode() {
       
   256         return mode;
       
   257     }
   250     }
   258 
   251 
   259     abstract ConnectionPool.CacheKey cacheKey();
   252     abstract ConnectionPool.CacheKey cacheKey();
   260 
   253 
   261     // overridden in SSL only
   254     // overridden in SSL only
   262     SSLParameters sslParameters() {
   255     SSLParameters sslParameters() {
   263         return null;
   256         return null;
   264     }
   257     }
   265 
       
   266     // Methods to be implemented for Plain TCP and SSL
       
   267 
       
   268     abstract long write(ByteBuffer[] buffers, int start, int number)
       
   269         throws IOException;
       
   270 
       
   271     abstract long write(ByteBuffer buffer) throws IOException;
       
   272 
       
   273     // Methods to be implemented for Plain TCP (async mode) and AsyncSSL
       
   274 
       
   275     /**
       
   276      * In {@linkplain Mode#ASYNC async mode}, this method puts buffers at the
       
   277      * end of the send queue; Otherwise, it is equivalent to {@link
       
   278      * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}.
       
   279      * When in async mode, calling this method should later be followed by
       
   280      * subsequent flushAsync invocation.
       
   281      * That allows multiple threads to put buffers into the queue while some other
       
   282      * thread is writing.
       
   283      */
       
   284     abstract void writeAsync(ByteBufferReference[] buffers) throws IOException;
       
   285 
       
   286     /**
       
   287      * In {@linkplain Mode#ASYNC async mode}, this method may put
       
   288      * buffers at the beginning of send queue, breaking frames sequence and
       
   289      * allowing to write these buffers before other buffers in the queue;
       
   290      * Otherwise, it is equivalent to {@link
       
   291      * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}.
       
   292      * When in async mode, calling this method should later be followed by
       
   293      * subsequent flushAsync invocation.
       
   294      * That allows multiple threads to put buffers into the queue while some other
       
   295      * thread is writing.
       
   296      */
       
   297     abstract void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException;
       
   298 
       
   299     /**
       
   300      * This method should be called after  any writeAsync/writeAsyncUnordered
       
   301      * invocation.
       
   302      * If there is a race to flushAsync from several threads one thread
       
   303      * (race winner) capture flush operation and write the whole queue content.
       
   304      * Other threads (race losers) exits from the method (not blocking)
       
   305      * and continue execution.
       
   306      */
       
   307     abstract void flushAsync() throws IOException;
       
   308 
   258 
   309     /**
   259     /**
   310      * Closes this connection, by returning the socket to its connection pool.
   260      * Closes this connection, by returning the socket to its connection pool.
   311      */
   261      */
   312     @Override
   262     @Override
   314 
   264 
   315     abstract void shutdownInput() throws IOException;
   265     abstract void shutdownInput() throws IOException;
   316 
   266 
   317     abstract void shutdownOutput() throws IOException;
   267     abstract void shutdownOutput() throws IOException;
   318 
   268 
   319     /**
   269     // Support for WebSocket/RawChannelImpl which unfortunately
   320      * Puts position to limit and limit to capacity so we can resume reading
   270     // still depends on synchronous read/writes.
   321      * into this buffer, but if required > 0 then limit may be reduced so that
   271     // It should be removed when RawChannelImpl moves to using asynchronous APIs.
   322      * no more than required bytes are read next time.
   272     abstract static class DetachedConnectionChannel implements Closeable {
   323      */
   273         DetachedConnectionChannel() {}
   324     static void resumeChannelRead(ByteBuffer buf, int required) {
   274         abstract SocketChannel channel();
   325         int limit = buf.limit();
   275         abstract long write(ByteBuffer[] buffers, int start, int number)
   326         buf.position(limit);
   276                 throws IOException;
   327         int capacity = buf.capacity() - limit;
   277         abstract void shutdownInput() throws IOException;
   328         if (required > 0 && required < capacity) {
   278         abstract void shutdownOutput() throws IOException;
   329             buf.limit(limit + required);
   279         abstract ByteBuffer read() throws IOException;
   330         } else {
   280         @Override
   331             buf.limit(buf.capacity());
   281         public abstract void close();
   332         }
   282         @Override
   333     }
   283         public String toString() {
   334 
   284             return this.getClass().getSimpleName() + ": " + channel().toString();
   335     final ByteBuffer read() throws IOException {
   285         }
   336         ByteBuffer b = readImpl();
   286     }
   337         return b;
   287 
   338     }
   288     // Support for WebSocket/RawChannelImpl which unfortunately
   339 
   289     // still depends on synchronous read/writes.
   340     /*
   290     // It should be removed when RawChannelImpl moves to using asynchronous APIs.
   341      * Returns a ByteBuffer with the data available at the moment, or null if
   291     abstract DetachedConnectionChannel detachChannel();
   342      * reached EOF.
   292 
   343      */
   293     abstract FlowTube getConnectionFlow();
   344     protected abstract ByteBuffer readImpl() throws IOException;
   294 
       
   295     // This queue and publisher are temporary, and only needed because
       
   296     // the calling code still uses writeAsync/flushAsync
       
   297     final class PlainHttpPublisher implements HttpPublisher {
       
   298         final Object reading;
       
   299         PlainHttpPublisher() {
       
   300             this(new Object());
       
   301         }
       
   302         PlainHttpPublisher(Object readingLock) {
       
   303             this.reading = readingLock;
       
   304         }
       
   305         final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>();
       
   306         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
       
   307         volatile HttpWriteSubscription subscription;
       
   308         final SequentialScheduler writeScheduler =
       
   309                     new SequentialScheduler(this::flushTask);
       
   310         @Override
       
   311         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
       
   312             synchronized (reading) {
       
   313                 //assert this.subscription == null;
       
   314                 //assert this.subscriber == null;
       
   315                 if (subscription == null) {
       
   316                     subscription = new HttpWriteSubscription();
       
   317                 }
       
   318                 this.subscriber = subscriber;
       
   319             }
       
   320             subscriber.onSubscribe(subscription);
       
   321             signal();
       
   322         }
       
   323 
       
   324         void flushTask(DeferredCompleter completer) {
       
   325             try {
       
   326                 HttpWriteSubscription sub = subscription;
       
   327                 if (sub != null) sub.flush();
       
   328             } finally {
       
   329                 completer.complete();
       
   330             }
       
   331         }
       
   332 
       
   333         void signal() {
       
   334             writeScheduler.runOrSchedule();
       
   335         }
       
   336 
       
   337         final class HttpWriteSubscription implements Flow.Subscription {
       
   338             volatile boolean cancelled;
       
   339             final Demand demand = new Demand();
       
   340 
       
   341             @Override
       
   342             public void request(long n) {
       
   343                 if (n <= 0) throw new IllegalArgumentException("non-positive request");
       
   344                 demand.increase(n);
       
   345                 debug.log(Level.DEBUG, () -> "HttpPublisher: got request of "
       
   346                             + n + " from "
       
   347                             + getConnectionFlow());
       
   348                 writeScheduler.runOrSchedule();
       
   349             }
       
   350 
       
   351             @Override
       
   352             public void cancel() {
       
   353                 debug.log(Level.DEBUG, () -> "HttpPublisher: cancelled by "
       
   354                           + getConnectionFlow());
       
   355                 cancelled = true;
       
   356             }
       
   357 
       
   358             void flush() {
       
   359                 while (!queue.isEmpty() && demand.tryDecrement()) {
       
   360                     List<ByteBuffer> elem = queue.poll();
       
   361                     debug.log(Level.DEBUG, () -> "HttpPublisher: sending "
       
   362                                 + Utils.remaining(elem) + " bytes ("
       
   363                                 + elem.size() + " buffers) to "
       
   364                                 + getConnectionFlow());
       
   365                     subscriber.onNext(elem);
       
   366                 }
       
   367             }
       
   368         }
       
   369 
       
   370         public void writeAsync(ByteBufferReference[] buffers) throws IOException {
       
   371             List<ByteBuffer> l = Arrays.asList(ByteBufferReference.toBuffers(buffers));
       
   372             queue.add(l);
       
   373             int bytes = l.stream().mapToInt(ByteBuffer::remaining).sum();
       
   374             debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes);
       
   375         }
       
   376 
       
   377         public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
       
   378             // Unordered frames are sent before existing frames.
       
   379             List<ByteBuffer> l = Arrays.asList(ByteBufferReference.toBuffers(buffers));
       
   380             int bytes = l.stream().mapToInt(ByteBuffer::remaining).sum();
       
   381             queue.addFirst(l);
       
   382             debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes);
       
   383         }
       
   384 
       
   385         public void flushAsync() throws IOException {
       
   386             // ### Remove flushAsync
       
   387             // no-op. Should not be needed now with Tube.
       
   388             // Tube.write will initiate the low-level write
       
   389             debug.log(Level.DEBUG, "signalling the publisher of the write queue");
       
   390             signal();
       
   391         }
       
   392     }
       
   393 
       
   394     String dbgTag = null;
       
   395     final String dbgString() {
       
   396         FlowTube flow = getConnectionFlow();
       
   397         String tag = dbgTag;
       
   398         if (tag == null && flow != null) {
       
   399             dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")";
       
   400         } else if (tag == null) {
       
   401             tag = this.getClass().getSimpleName() + "(?)";
       
   402         }
       
   403         return tag;
       
   404     }
   345 
   405 
   346     @Override
   406     @Override
   347     public String toString() {
   407     public String toString() {
   348         return "HttpConnection: " + channel().toString();
   408         return "HttpConnection: " + channel().toString();
   349     }
   409     }