26 package jdk.incubator.http; |
26 package jdk.incubator.http; |
27 |
27 |
28 import javax.net.ssl.SSLParameters; |
28 import javax.net.ssl.SSLParameters; |
29 import java.io.Closeable; |
29 import java.io.Closeable; |
30 import java.io.IOException; |
30 import java.io.IOException; |
|
31 import java.lang.System.Logger.Level; |
31 import java.net.InetSocketAddress; |
32 import java.net.InetSocketAddress; |
32 import java.nio.ByteBuffer; |
33 import java.nio.ByteBuffer; |
33 import java.nio.channels.SocketChannel; |
34 import java.nio.channels.SocketChannel; |
|
35 import java.util.Arrays; |
|
36 import java.util.IdentityHashMap; |
|
37 import java.util.List; |
|
38 import java.util.Map; |
34 import java.util.concurrent.CompletableFuture; |
39 import java.util.concurrent.CompletableFuture; |
35 |
40 import java.util.concurrent.ConcurrentLinkedDeque; |
|
41 import java.util.concurrent.Flow; |
|
42 import jdk.incubator.http.HttpClient.Version; |
36 import jdk.incubator.http.internal.common.ByteBufferReference; |
43 import jdk.incubator.http.internal.common.ByteBufferReference; |
|
44 import jdk.incubator.http.internal.common.Demand; |
|
45 import jdk.incubator.http.internal.common.FlowTube; |
|
46 import jdk.incubator.http.internal.common.SequentialScheduler; |
|
47 import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter; |
|
48 import jdk.incubator.http.internal.common.Log; |
|
49 import jdk.incubator.http.internal.common.Utils; |
|
50 import static jdk.incubator.http.HttpClient.Version.HTTP_2; |
37 |
51 |
38 /** |
52 /** |
39 * Wraps socket channel layer and takes care of SSL also. |
53 * Wraps socket channel layer and takes care of SSL also. |
40 * |
54 * |
41 * Subtypes are: |
55 * Subtypes are: |
42 * PlainHttpConnection: regular direct TCP connection to server |
56 * PlainHttpConnection: regular direct TCP connection to server |
43 * PlainProxyConnection: plain text proxy connection |
57 * PlainProxyConnection: plain text proxy connection |
44 * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server |
58 * PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server |
45 * SSLConnection: TLS channel direct to server |
59 * AsyncSSLConnection: TLS channel direct to server |
46 * SSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel |
60 * AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel |
47 */ |
61 */ |
48 abstract class HttpConnection implements Closeable { |
62 abstract class HttpConnection implements Closeable, AsyncConnection { |
49 |
63 |
50 enum Mode { |
64 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. |
51 BLOCKING, |
65 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); |
52 NON_BLOCKING, |
66 final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger( |
53 ASYNC |
67 () -> "HttpConnection(SocketTube(?))", DEBUG); |
54 } |
68 |
55 |
69 /** The address this connection is connected to. Could be a server or a proxy. */ |
56 protected Mode mode; |
|
57 |
|
58 // address we are connected to. Could be a server or a proxy |
|
59 final InetSocketAddress address; |
70 final InetSocketAddress address; |
60 final HttpClientImpl client; |
71 private final HttpClientImpl client; |
|
72 private final TrailingOperations trailingOperations; |
61 |
73 |
62 HttpConnection(InetSocketAddress address, HttpClientImpl client) { |
74 HttpConnection(InetSocketAddress address, HttpClientImpl client) { |
63 this.address = address; |
75 this.address = address; |
64 this.client = client; |
76 this.client = client; |
65 } |
77 trailingOperations = new TrailingOperations(); |
|
78 } |
|
79 |
|
80 private static final class TrailingOperations { |
|
81 private final Map<CompletableFuture<?>, Boolean> operations = |
|
82 new IdentityHashMap<>(); |
|
83 void add(CompletableFuture<?> cf) { |
|
84 synchronized(operations) { |
|
85 cf.whenComplete((r,t)-> remove(cf)); |
|
86 operations.put(cf, Boolean.TRUE); |
|
87 } |
|
88 } |
|
89 boolean remove(CompletableFuture<?> cf) { |
|
90 synchronized(operations) { |
|
91 return operations.remove(cf); |
|
92 } |
|
93 } |
|
94 } |
|
95 |
|
96 final void addTrailingOperation(CompletableFuture<?> cf) { |
|
97 trailingOperations.add(cf); |
|
98 } |
|
99 |
|
100 final void removeTrailingOperation(CompletableFuture<?> cf) { |
|
101 trailingOperations.remove(cf); |
|
102 } |
|
103 |
|
104 final HttpClientImpl client() { |
|
105 return client; |
|
106 } |
|
107 |
|
108 //public abstract void connect() throws IOException, InterruptedException; |
|
109 |
|
110 public abstract CompletableFuture<Void> connectAsync(); |
|
111 |
|
112 /** Tells whether, or not, this connection is connected to its destination. */ |
|
113 abstract boolean connected(); |
|
114 |
|
115 /** Tells whether, or not, this connection is secure ( over SSL ) */ |
|
116 abstract boolean isSecure(); |
|
117 |
|
118 /** Tells whether, or not, this connection is proxied. */ |
|
119 abstract boolean isProxied(); |
|
120 |
|
121 /** Tells whether, or not, this connection is open. */ |
|
122 final boolean isOpen() { |
|
123 return channel().isOpen() && |
|
124 (connected() ? !getConnectionFlow().isFinished() : true); |
|
125 } |
|
126 |
|
127 interface HttpPublisher extends FlowTube.TubePublisher { } |
66 |
128 |
67 /** |
129 /** |
68 * Public API to this class. addr is the ultimate destination. Any proxies |
130 * Returns the HTTP publisher associated with this connection. May be null |
69 * etc are figured out from the request. Returns an instance of one of the |
131 * if invoked before connecting. |
70 * following |
132 */ |
71 * PlainHttpConnection |
133 abstract HttpPublisher publisher(); |
72 * PlainTunnelingConnection |
134 |
73 * SSLConnection |
135 /** |
74 * SSLTunnelConnection |
136 * Factory for retrieving HttpConnections. A connection can be retrieved |
|
137 * from the connection pool, or a new one created if none available. |
75 * |
138 * |
76 * When object returned, connect() or connectAsync() must be called, which |
139 * The given {@code addr} is the ultimate destination. Any proxies, |
77 * when it returns/completes, the connection is usable for requests. |
140 * etc, are determined from the request. Returns a concrete instance which |
78 */ |
141 * is one of the following: |
79 public static HttpConnection getConnection( |
142 * {@link PlainHttpConnection} |
80 InetSocketAddress addr, HttpClientImpl client, HttpRequestImpl request) |
143 * {@link PlainTunnelingConnection} |
81 { |
144 * {@link SSLConnection} |
82 return getConnectionImpl(addr, client, request, false); |
145 * {@link SSLTunnelConnection} |
83 } |
146 * |
84 |
147 * The returned connection, if not from the connection pool, must have its, |
85 /** |
148 * connect() or connectAsync() method invoked, which ( when it completes |
86 * Called specifically to get an async connection for HTTP/2 over SSL. |
149 * successfully ) renders the connection usable for requests. |
87 */ |
150 */ |
88 public static HttpConnection getConnection(InetSocketAddress addr, |
151 public static HttpConnection getConnection(InetSocketAddress addr, |
89 HttpClientImpl client, HttpRequestImpl request, boolean isHttp2) { |
152 HttpClientImpl client, |
90 |
153 HttpRequestImpl request, |
91 return getConnectionImpl(addr, client, request, isHttp2); |
154 Version version) { |
92 } |
155 HttpConnection c = null; |
93 |
156 InetSocketAddress proxy = request.proxy(client); |
94 public abstract void connect() throws IOException, InterruptedException; |
157 if (proxy != null && proxy.isUnresolved()) { |
95 |
158 // The default proxy selector may select a proxy whose address is |
96 public abstract CompletableFuture<Void> connectAsync(); |
159 // unresolved. We must resolve the address before connecting to it. |
97 |
160 proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort()); |
98 /** |
161 } |
99 * Returns whether this connection is connected to its destination |
162 boolean secure = request.secure(); |
100 */ |
163 ConnectionPool pool = client.connectionPool(); |
101 abstract boolean connected(); |
164 |
102 |
165 if (!secure) { |
103 abstract boolean isSecure(); |
166 c = pool.getConnection(false, addr, proxy); |
104 |
167 if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) { |
105 abstract boolean isProxied(); |
168 final HttpConnection conn = c; |
106 |
169 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow() |
107 /** |
170 + ": plain connection retrieved from HTTP/1.1 pool"); |
108 * Completes when the first byte of the response is available to be read. |
171 return c; |
109 */ |
172 } else { |
110 abstract CompletableFuture<Void> whenReceivingResponse(); |
173 return getPlainConnection(addr, proxy, request, client); |
111 |
174 } |
112 final boolean isOpen() { |
175 } else { // secure |
113 return channel().isOpen(); |
176 if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool |
|
177 c = pool.getConnection(true, addr, proxy); |
|
178 } |
|
179 if (c != null && c.isOpen()) { |
|
180 final HttpConnection conn = c; |
|
181 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow() |
|
182 + ": SSL connection retrieved from HTTP/1.1 pool"); |
|
183 return c; |
|
184 } else { |
|
185 String[] alpn = null; |
|
186 if (version == HTTP_2) { |
|
187 alpn = new String[] { "h2", "http/1.1" }; |
|
188 } |
|
189 return getSSLConnection(addr, proxy, alpn, client); |
|
190 } |
|
191 } |
|
192 } |
|
193 |
|
194 private static HttpConnection getSSLConnection(InetSocketAddress addr, |
|
195 InetSocketAddress proxy, |
|
196 String[] alpn, |
|
197 HttpClientImpl client) { |
|
198 if (proxy != null) |
|
199 return new AsyncSSLTunnelConnection(addr, client, alpn, proxy); |
|
200 else |
|
201 return new AsyncSSLConnection(addr, client, alpn); |
114 } |
202 } |
115 |
203 |
116 /* Returns either a plain HTTP connection or a plain tunnelling connection |
204 /* Returns either a plain HTTP connection or a plain tunnelling connection |
117 * for proxied WebSocket */ |
205 * for proxied WebSocket */ |
118 private static HttpConnection getPlainConnection(InetSocketAddress addr, |
206 private static HttpConnection getPlainConnection(InetSocketAddress addr, |
119 InetSocketAddress proxy, |
207 InetSocketAddress proxy, |
120 HttpRequestImpl request, |
208 HttpRequestImpl request, |
121 HttpClientImpl client) { |
209 HttpClientImpl client) { |
122 if (request.isWebSocket() && proxy != null) { |
210 if (request.isWebSocket() && proxy != null) |
123 return new PlainTunnelingConnection(addr, proxy, client); |
211 return new PlainTunnelingConnection(addr, proxy, client); |
124 } else { |
212 |
125 if (proxy == null) { |
213 if (proxy == null) |
126 return new PlainHttpConnection(addr, client); |
214 return new PlainHttpConnection(addr, client); |
127 } else { |
215 else |
128 return new PlainProxyConnection(proxy, client); |
216 return new PlainProxyConnection(proxy, client); |
129 } |
217 } |
130 } |
218 |
131 } |
219 void closeOrReturnToCache(HttpHeaders hdrs) { |
132 |
|
133 private static HttpConnection getSSLConnection(InetSocketAddress addr, |
|
134 InetSocketAddress proxy, HttpRequestImpl request, |
|
135 String[] alpn, boolean isHttp2, HttpClientImpl client) |
|
136 { |
|
137 if (proxy != null) { |
|
138 if (!isHttp2) { |
|
139 return new SSLTunnelConnection(addr, client, proxy); |
|
140 } else { |
|
141 return new AsyncSSLTunnelConnection(addr, client, alpn, proxy); |
|
142 } |
|
143 } else if (!isHttp2) { |
|
144 return new SSLConnection(addr, client, alpn); |
|
145 } else { |
|
146 return new AsyncSSLConnection(addr, client, alpn); |
|
147 } |
|
148 } |
|
149 |
|
150 /** |
|
151 * Main factory method. Gets a HttpConnection, either cached or new if |
|
152 * none available. |
|
153 */ |
|
154 private static HttpConnection getConnectionImpl(InetSocketAddress addr, |
|
155 HttpClientImpl client, |
|
156 HttpRequestImpl request, boolean isHttp2) |
|
157 { |
|
158 HttpConnection c = null; |
|
159 InetSocketAddress proxy = request.proxy(client); |
|
160 if (proxy != null && proxy.isUnresolved()) { |
|
161 // The default proxy selector may select a proxy whose |
|
162 // address is unresolved. We must resolve the address |
|
163 // before using it to connect. |
|
164 proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort()); |
|
165 } |
|
166 boolean secure = request.secure(); |
|
167 ConnectionPool pool = client.connectionPool(); |
|
168 String[] alpn = null; |
|
169 |
|
170 if (secure && isHttp2) { |
|
171 alpn = new String[2]; |
|
172 alpn[0] = "h2"; |
|
173 alpn[1] = "http/1.1"; |
|
174 } |
|
175 |
|
176 if (!secure) { |
|
177 c = pool.getConnection(false, addr, proxy); |
|
178 if (c != null) { |
|
179 return c; |
|
180 } else { |
|
181 return getPlainConnection(addr, proxy, request, client); |
|
182 } |
|
183 } else { |
|
184 if (!isHttp2) { // if http2 we don't cache connections |
|
185 c = pool.getConnection(true, addr, proxy); |
|
186 } |
|
187 if (c != null) { |
|
188 return c; |
|
189 } else { |
|
190 return getSSLConnection(addr, proxy, request, alpn, isHttp2, client); |
|
191 } |
|
192 } |
|
193 } |
|
194 |
|
195 void returnToCache(HttpHeaders hdrs) { |
|
196 if (hdrs == null) { |
220 if (hdrs == null) { |
197 // the connection was closed by server |
221 // the connection was closed by server, eof |
198 close(); |
222 close(); |
199 return; |
223 return; |
200 } |
224 } |
201 if (!isOpen()) { |
225 if (!isOpen()) { |
|
226 return; |
|
227 } |
|
228 HttpClientImpl client = client(); |
|
229 if (client == null) { |
|
230 close(); |
202 return; |
231 return; |
203 } |
232 } |
204 ConnectionPool pool = client.connectionPool(); |
233 ConnectionPool pool = client.connectionPool(); |
205 boolean keepAlive = hdrs.firstValue("Connection") |
234 boolean keepAlive = hdrs.firstValue("Connection") |
206 .map((s) -> !s.equalsIgnoreCase("close")) |
235 .map((s) -> !s.equalsIgnoreCase("close")) |
207 .orElse(true); |
236 .orElse(true); |
208 |
237 |
209 if (keepAlive) { |
238 if (keepAlive) { |
|
239 Log.logTrace("Returning connection to the pool: {0}", this); |
210 pool.returnToPool(this); |
240 pool.returnToPool(this); |
211 } else { |
241 } else { |
212 close(); |
242 close(); |
213 } |
243 } |
214 } |
244 } |
215 |
245 |
216 /** |
|
217 * Also check that the number of bytes written is what was expected. This |
|
218 * could be different if the buffer is user-supplied and its internal |
|
219 * pointers were manipulated in a race condition. |
|
220 */ |
|
221 final void checkWrite(long expected, ByteBuffer buffer) throws IOException { |
|
222 long written = write(buffer); |
|
223 if (written != expected) { |
|
224 throw new IOException("incorrect number of bytes written"); |
|
225 } |
|
226 } |
|
227 |
|
228 final void checkWrite(long expected, |
|
229 ByteBuffer[] buffers, |
|
230 int start, |
|
231 int length) |
|
232 throws IOException |
|
233 { |
|
234 long written = write(buffers, start, length); |
|
235 if (written != expected) { |
|
236 throw new IOException("incorrect number of bytes written"); |
|
237 } |
|
238 } |
|
239 |
|
240 abstract SocketChannel channel(); |
246 abstract SocketChannel channel(); |
241 |
247 |
242 final InetSocketAddress address() { |
248 final InetSocketAddress address() { |
243 return address; |
249 return address; |
244 } |
|
245 |
|
246 synchronized void configureMode(Mode mode) throws IOException { |
|
247 this.mode = mode; |
|
248 if (mode == Mode.BLOCKING) { |
|
249 channel().configureBlocking(true); |
|
250 } else { |
|
251 channel().configureBlocking(false); |
|
252 } |
|
253 } |
|
254 |
|
255 synchronized Mode getMode() { |
|
256 return mode; |
|
257 } |
250 } |
258 |
251 |
259 abstract ConnectionPool.CacheKey cacheKey(); |
252 abstract ConnectionPool.CacheKey cacheKey(); |
260 |
253 |
261 // overridden in SSL only |
254 // overridden in SSL only |
262 SSLParameters sslParameters() { |
255 SSLParameters sslParameters() { |
263 return null; |
256 return null; |
264 } |
257 } |
265 |
|
266 // Methods to be implemented for Plain TCP and SSL |
|
267 |
|
268 abstract long write(ByteBuffer[] buffers, int start, int number) |
|
269 throws IOException; |
|
270 |
|
271 abstract long write(ByteBuffer buffer) throws IOException; |
|
272 |
|
273 // Methods to be implemented for Plain TCP (async mode) and AsyncSSL |
|
274 |
|
275 /** |
|
276 * In {@linkplain Mode#ASYNC async mode}, this method puts buffers at the |
|
277 * end of the send queue; Otherwise, it is equivalent to {@link |
|
278 * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}. |
|
279 * When in async mode, calling this method should later be followed by |
|
280 * subsequent flushAsync invocation. |
|
281 * That allows multiple threads to put buffers into the queue while some other |
|
282 * thread is writing. |
|
283 */ |
|
284 abstract void writeAsync(ByteBufferReference[] buffers) throws IOException; |
|
285 |
|
286 /** |
|
287 * In {@linkplain Mode#ASYNC async mode}, this method may put |
|
288 * buffers at the beginning of send queue, breaking frames sequence and |
|
289 * allowing to write these buffers before other buffers in the queue; |
|
290 * Otherwise, it is equivalent to {@link |
|
291 * #write(ByteBuffer[], int, int) write(buffers, 0, buffers.length)}. |
|
292 * When in async mode, calling this method should later be followed by |
|
293 * subsequent flushAsync invocation. |
|
294 * That allows multiple threads to put buffers into the queue while some other |
|
295 * thread is writing. |
|
296 */ |
|
297 abstract void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException; |
|
298 |
|
299 /** |
|
300 * This method should be called after any writeAsync/writeAsyncUnordered |
|
301 * invocation. |
|
302 * If there is a race to flushAsync from several threads one thread |
|
303 * (race winner) capture flush operation and write the whole queue content. |
|
304 * Other threads (race losers) exits from the method (not blocking) |
|
305 * and continue execution. |
|
306 */ |
|
307 abstract void flushAsync() throws IOException; |
|
308 |
258 |
309 /** |
259 /** |
310 * Closes this connection, by returning the socket to its connection pool. |
260 * Closes this connection, by returning the socket to its connection pool. |
311 */ |
261 */ |
312 @Override |
262 @Override |
314 |
264 |
315 abstract void shutdownInput() throws IOException; |
265 abstract void shutdownInput() throws IOException; |
316 |
266 |
317 abstract void shutdownOutput() throws IOException; |
267 abstract void shutdownOutput() throws IOException; |
318 |
268 |
319 /** |
269 // Support for WebSocket/RawChannelImpl which unfortunately |
320 * Puts position to limit and limit to capacity so we can resume reading |
270 // still depends on synchronous read/writes. |
321 * into this buffer, but if required > 0 then limit may be reduced so that |
271 // It should be removed when RawChannelImpl moves to using asynchronous APIs. |
322 * no more than required bytes are read next time. |
272 abstract static class DetachedConnectionChannel implements Closeable { |
323 */ |
273 DetachedConnectionChannel() {} |
324 static void resumeChannelRead(ByteBuffer buf, int required) { |
274 abstract SocketChannel channel(); |
325 int limit = buf.limit(); |
275 abstract long write(ByteBuffer[] buffers, int start, int number) |
326 buf.position(limit); |
276 throws IOException; |
327 int capacity = buf.capacity() - limit; |
277 abstract void shutdownInput() throws IOException; |
328 if (required > 0 && required < capacity) { |
278 abstract void shutdownOutput() throws IOException; |
329 buf.limit(limit + required); |
279 abstract ByteBuffer read() throws IOException; |
330 } else { |
280 @Override |
331 buf.limit(buf.capacity()); |
281 public abstract void close(); |
332 } |
282 @Override |
333 } |
283 public String toString() { |
334 |
284 return this.getClass().getSimpleName() + ": " + channel().toString(); |
335 final ByteBuffer read() throws IOException { |
285 } |
336 ByteBuffer b = readImpl(); |
286 } |
337 return b; |
287 |
338 } |
288 // Support for WebSocket/RawChannelImpl which unfortunately |
339 |
289 // still depends on synchronous read/writes. |
340 /* |
290 // It should be removed when RawChannelImpl moves to using asynchronous APIs. |
341 * Returns a ByteBuffer with the data available at the moment, or null if |
291 abstract DetachedConnectionChannel detachChannel(); |
342 * reached EOF. |
292 |
343 */ |
293 abstract FlowTube getConnectionFlow(); |
344 protected abstract ByteBuffer readImpl() throws IOException; |
294 |
|
295 // This queue and publisher are temporary, and only needed because |
|
296 // the calling code still uses writeAsync/flushAsync |
|
297 final class PlainHttpPublisher implements HttpPublisher { |
|
298 final Object reading; |
|
299 PlainHttpPublisher() { |
|
300 this(new Object()); |
|
301 } |
|
302 PlainHttpPublisher(Object readingLock) { |
|
303 this.reading = readingLock; |
|
304 } |
|
305 final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>(); |
|
306 volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; |
|
307 volatile HttpWriteSubscription subscription; |
|
308 final SequentialScheduler writeScheduler = |
|
309 new SequentialScheduler(this::flushTask); |
|
310 @Override |
|
311 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) { |
|
312 synchronized (reading) { |
|
313 //assert this.subscription == null; |
|
314 //assert this.subscriber == null; |
|
315 if (subscription == null) { |
|
316 subscription = new HttpWriteSubscription(); |
|
317 } |
|
318 this.subscriber = subscriber; |
|
319 } |
|
320 subscriber.onSubscribe(subscription); |
|
321 signal(); |
|
322 } |
|
323 |
|
324 void flushTask(DeferredCompleter completer) { |
|
325 try { |
|
326 HttpWriteSubscription sub = subscription; |
|
327 if (sub != null) sub.flush(); |
|
328 } finally { |
|
329 completer.complete(); |
|
330 } |
|
331 } |
|
332 |
|
333 void signal() { |
|
334 writeScheduler.runOrSchedule(); |
|
335 } |
|
336 |
|
337 final class HttpWriteSubscription implements Flow.Subscription { |
|
338 volatile boolean cancelled; |
|
339 final Demand demand = new Demand(); |
|
340 |
|
341 @Override |
|
342 public void request(long n) { |
|
343 if (n <= 0) throw new IllegalArgumentException("non-positive request"); |
|
344 demand.increase(n); |
|
345 debug.log(Level.DEBUG, () -> "HttpPublisher: got request of " |
|
346 + n + " from " |
|
347 + getConnectionFlow()); |
|
348 writeScheduler.runOrSchedule(); |
|
349 } |
|
350 |
|
351 @Override |
|
352 public void cancel() { |
|
353 debug.log(Level.DEBUG, () -> "HttpPublisher: cancelled by " |
|
354 + getConnectionFlow()); |
|
355 cancelled = true; |
|
356 } |
|
357 |
|
358 void flush() { |
|
359 while (!queue.isEmpty() && demand.tryDecrement()) { |
|
360 List<ByteBuffer> elem = queue.poll(); |
|
361 debug.log(Level.DEBUG, () -> "HttpPublisher: sending " |
|
362 + Utils.remaining(elem) + " bytes (" |
|
363 + elem.size() + " buffers) to " |
|
364 + getConnectionFlow()); |
|
365 subscriber.onNext(elem); |
|
366 } |
|
367 } |
|
368 } |
|
369 |
|
370 public void writeAsync(ByteBufferReference[] buffers) throws IOException { |
|
371 List<ByteBuffer> l = Arrays.asList(ByteBufferReference.toBuffers(buffers)); |
|
372 queue.add(l); |
|
373 int bytes = l.stream().mapToInt(ByteBuffer::remaining).sum(); |
|
374 debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes); |
|
375 } |
|
376 |
|
377 public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException { |
|
378 // Unordered frames are sent before existing frames. |
|
379 List<ByteBuffer> l = Arrays.asList(ByteBufferReference.toBuffers(buffers)); |
|
380 int bytes = l.stream().mapToInt(ByteBuffer::remaining).sum(); |
|
381 queue.addFirst(l); |
|
382 debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes); |
|
383 } |
|
384 |
|
385 public void flushAsync() throws IOException { |
|
386 // ### Remove flushAsync |
|
387 // no-op. Should not be needed now with Tube. |
|
388 // Tube.write will initiate the low-level write |
|
389 debug.log(Level.DEBUG, "signalling the publisher of the write queue"); |
|
390 signal(); |
|
391 } |
|
392 } |
|
393 |
|
394 String dbgTag = null; |
|
395 final String dbgString() { |
|
396 FlowTube flow = getConnectionFlow(); |
|
397 String tag = dbgTag; |
|
398 if (tag == null && flow != null) { |
|
399 dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")"; |
|
400 } else if (tag == null) { |
|
401 tag = this.getClass().getSimpleName() + "(?)"; |
|
402 } |
|
403 return tag; |
|
404 } |
345 |
405 |
346 @Override |
406 @Override |
347 public String toString() { |
407 public String toString() { |
348 return "HttpConnection: " + channel().toString(); |
408 return "HttpConnection: " + channel().toString(); |
349 } |
409 } |