1 /* |
|
2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. |
|
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
|
4 * |
|
5 * This code is free software; you can redistribute it and/or modify it |
|
6 * under the terms of the GNU General Public License version 2 only, as |
|
7 * published by the Free Software Foundation. Oracle designates this |
|
8 * particular file as subject to the "Classpath" exception as provided |
|
9 * by Oracle in the LICENSE file that accompanied this code. |
|
10 * |
|
11 * This code is distributed in the hope that it will be useful, but WITHOUT |
|
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
|
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
|
14 * version 2 for more details (a copy is included in the LICENSE file that |
|
15 * accompanied this code). |
|
16 * |
|
17 * You should have received a copy of the GNU General Public License version |
|
18 * 2 along with this work; if not, write to the Free Software Foundation, |
|
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
|
20 * |
|
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
|
22 * or visit www.oracle.com if you need additional information or have any |
|
23 * questions. |
|
24 */ |
|
25 |
|
26 package java.net.http.internal; |
|
27 |
|
28 import java.io.IOException; |
|
29 import java.lang.System.Logger.Level; |
|
30 import java.net.InetSocketAddress; |
|
31 import java.net.http.HttpResponse.BodyHandler; |
|
32 import java.net.http.HttpResponse.BodySubscriber; |
|
33 import java.nio.ByteBuffer; |
|
34 import java.util.Objects; |
|
35 import java.util.concurrent.CompletableFuture; |
|
36 import java.util.LinkedList; |
|
37 import java.util.List; |
|
38 import java.util.concurrent.ConcurrentLinkedDeque; |
|
39 import java.util.concurrent.Executor; |
|
40 import java.util.concurrent.Flow; |
|
41 import java.net.http.internal.common.Demand; |
|
42 import java.net.http.internal.common.Log; |
|
43 import java.net.http.internal.common.FlowTube; |
|
44 import java.net.http.internal.common.SequentialScheduler; |
|
45 import java.net.http.internal.common.MinimalFuture; |
|
46 import java.net.http.internal.common.Utils; |
|
47 import static java.net.http.HttpClient.Version.HTTP_1_1; |
|
48 |
|
49 /** |
|
50 * Encapsulates one HTTP/1.1 request/response exchange. |
|
51 */ |
|
52 class Http1Exchange<T> extends ExchangeImpl<T> { |
|
53 |
|
54 static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag. |
|
55 final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG); |
|
56 private static final System.Logger DEBUG_LOGGER = |
|
57 Utils.getDebugLogger("Http1Exchange"::toString, DEBUG); |
|
58 |
|
59 final HttpRequestImpl request; // main request |
|
60 final Http1Request requestAction; |
|
61 private volatile Http1Response<T> response; |
|
62 final HttpConnection connection; |
|
63 final HttpClientImpl client; |
|
64 final Executor executor; |
|
65 private final Http1AsyncReceiver asyncReceiver; |
|
66 |
|
67 /** Records a possible cancellation raised before any operation |
|
68 * has been initiated, or an error received while sending the request. */ |
|
69 private Throwable failed; |
|
70 private final List<CompletableFuture<?>> operations; // used for cancel |
|
71 |
|
72 /** Must be held when operating on any internal state or data. */ |
|
73 private final Object lock = new Object(); |
|
74 |
|
75 /** Holds the outgoing data, either the headers or a request body part. Or |
|
76 * an error from the request body publisher. At most there can be ~2 pieces |
|
77 * of outgoing data ( onComplete|onError can be invoked without demand ).*/ |
|
78 final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>(); |
|
79 |
|
80 /** The write publisher, responsible for writing the complete request ( both |
|
81 * headers and body ( if any ). */ |
|
82 private final Http1Publisher writePublisher = new Http1Publisher(); |
|
83 |
|
84 /** Completed when the header have been published, or there is an error */ |
|
85 private final CompletableFuture<ExchangeImpl<T>> headersSentCF = new MinimalFuture<>(); |
|
86 /** Completed when the body has been published, or there is an error */ |
|
87 private final CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>(); |
|
88 |
|
89 /** The subscriber to the request's body published. Maybe null. */ |
|
90 private volatile Http1BodySubscriber bodySubscriber; |
|
91 |
|
92 enum State { INITIAL, |
|
93 HEADERS, |
|
94 BODY, |
|
95 ERROR, // terminal state |
|
96 COMPLETING, |
|
97 COMPLETED } // terminal state |
|
98 |
|
99 private State state = State.INITIAL; |
|
100 |
|
101 /** A carrier for either data or an error. Used to carry data, and communicate |
|
102 * errors from the request ( both headers and body ) to the exchange. */ |
|
103 static class DataPair { |
|
104 Throwable throwable; |
|
105 List<ByteBuffer> data; |
|
106 DataPair(List<ByteBuffer> data, Throwable throwable){ |
|
107 this.data = data; |
|
108 this.throwable = throwable; |
|
109 } |
|
110 @Override |
|
111 public String toString() { |
|
112 return "DataPair [data=" + data + ", throwable=" + throwable + "]"; |
|
113 } |
|
114 } |
|
115 |
|
116 /** An abstract supertype for HTTP/1.1 body subscribers. There are two |
|
117 * concrete implementations: {@link Http1Request.StreamSubscriber}, and |
|
118 * {@link Http1Request.FixedContentSubscriber}, for receiving chunked and |
|
119 * fixed length bodies, respectively. */ |
|
120 static abstract class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> { |
|
121 protected volatile Flow.Subscription subscription; |
|
122 protected volatile boolean complete; |
|
123 |
|
124 /** Final sentinel in the stream of request body. */ |
|
125 static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0)); |
|
126 |
|
127 void request(long n) { |
|
128 DEBUG_LOGGER.log(Level.DEBUG, () -> |
|
129 "Http1BodySubscriber requesting " + n + ", from " + subscription); |
|
130 subscription.request(n); |
|
131 } |
|
132 |
|
133 static Http1BodySubscriber completeSubscriber() { |
|
134 return new Http1BodySubscriber() { |
|
135 @Override public void onSubscribe(Flow.Subscription subscription) { error(); } |
|
136 @Override public void onNext(ByteBuffer item) { error(); } |
|
137 @Override public void onError(Throwable throwable) { error(); } |
|
138 @Override public void onComplete() { error(); } |
|
139 private void error() { |
|
140 throw new InternalError("should not reach here"); |
|
141 } |
|
142 }; |
|
143 } |
|
144 } |
|
145 |
|
146 @Override |
|
147 public String toString() { |
|
148 return "HTTP/1.1 " + request.toString(); |
|
149 } |
|
150 |
|
151 HttpRequestImpl request() { |
|
152 return request; |
|
153 } |
|
154 |
|
155 Http1Exchange(Exchange<T> exchange, HttpConnection connection) |
|
156 throws IOException |
|
157 { |
|
158 super(exchange); |
|
159 this.request = exchange.request(); |
|
160 this.client = exchange.client(); |
|
161 this.executor = exchange.executor(); |
|
162 this.operations = new LinkedList<>(); |
|
163 operations.add(headersSentCF); |
|
164 operations.add(bodySentCF); |
|
165 if (connection != null) { |
|
166 this.connection = connection; |
|
167 } else { |
|
168 InetSocketAddress addr = request.getAddress(); |
|
169 this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1); |
|
170 } |
|
171 this.requestAction = new Http1Request(request, this); |
|
172 this.asyncReceiver = new Http1AsyncReceiver(executor, this); |
|
173 asyncReceiver.subscribe(new InitialErrorReceiver()); |
|
174 } |
|
175 |
|
176 /** An initial receiver that handles no data, but cancels the request if |
|
177 * it receives an error. Will be replaced when reading response body. */ |
|
178 final class InitialErrorReceiver implements Http1AsyncReceiver.Http1AsyncDelegate { |
|
179 volatile AbstractSubscription s; |
|
180 @Override |
|
181 public boolean tryAsyncReceive(ByteBuffer ref) { |
|
182 return false; // no data has been processed, leave it in the queue |
|
183 } |
|
184 |
|
185 @Override |
|
186 public void onReadError(Throwable ex) { |
|
187 cancelImpl(ex); |
|
188 } |
|
189 |
|
190 @Override |
|
191 public void onSubscribe(AbstractSubscription s) { |
|
192 this.s = s; |
|
193 } |
|
194 |
|
195 public AbstractSubscription subscription() { |
|
196 return s; |
|
197 } |
|
198 } |
|
199 |
|
200 @Override |
|
201 HttpConnection connection() { |
|
202 return connection; |
|
203 } |
|
204 |
|
205 private void connectFlows(HttpConnection connection) { |
|
206 FlowTube tube = connection.getConnectionFlow(); |
|
207 debug.log(Level.DEBUG, "%s connecting flows", tube); |
|
208 |
|
209 // Connect the flow to our Http1TubeSubscriber: |
|
210 // asyncReceiver.subscriber(). |
|
211 tube.connectFlows(writePublisher, |
|
212 asyncReceiver.subscriber()); |
|
213 } |
|
214 |
|
215 @Override |
|
216 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { |
|
217 // create the response before sending the request headers, so that |
|
218 // the response can set the appropriate receivers. |
|
219 debug.log(Level.DEBUG, "Sending headers only"); |
|
220 if (response == null) { |
|
221 response = new Http1Response<>(connection, this, asyncReceiver); |
|
222 } |
|
223 |
|
224 debug.log(Level.DEBUG, "response created in advance"); |
|
225 // If the first attempt to read something triggers EOF, or |
|
226 // IOException("channel reset by peer"), we're going to retry. |
|
227 // Instruct the asyncReceiver to throw ConnectionExpiredException |
|
228 // to force a retry. |
|
229 asyncReceiver.setRetryOnError(true); |
|
230 |
|
231 CompletableFuture<Void> connectCF; |
|
232 if (!connection.connected()) { |
|
233 debug.log(Level.DEBUG, "initiating connect async"); |
|
234 connectCF = connection.connectAsync(); |
|
235 synchronized (lock) { |
|
236 operations.add(connectCF); |
|
237 } |
|
238 } else { |
|
239 connectCF = new MinimalFuture<>(); |
|
240 connectCF.complete(null); |
|
241 } |
|
242 |
|
243 return connectCF |
|
244 .thenCompose(unused -> { |
|
245 CompletableFuture<Void> cf = new MinimalFuture<>(); |
|
246 try { |
|
247 connectFlows(connection); |
|
248 |
|
249 debug.log(Level.DEBUG, "requestAction.headers"); |
|
250 List<ByteBuffer> data = requestAction.headers(); |
|
251 synchronized (lock) { |
|
252 state = State.HEADERS; |
|
253 } |
|
254 debug.log(Level.DEBUG, "setting outgoing with headers"); |
|
255 assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing; |
|
256 appendToOutgoing(data); |
|
257 cf.complete(null); |
|
258 return cf; |
|
259 } catch (Throwable t) { |
|
260 debug.log(Level.DEBUG, "Failed to send headers: %s", t); |
|
261 connection.close(); |
|
262 cf.completeExceptionally(t); |
|
263 return cf; |
|
264 } }) |
|
265 .thenCompose(unused -> headersSentCF); |
|
266 } |
|
267 |
|
268 @Override |
|
269 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { |
|
270 assert headersSentCF.isDone(); |
|
271 try { |
|
272 bodySubscriber = requestAction.continueRequest(); |
|
273 if (bodySubscriber == null) { |
|
274 bodySubscriber = Http1BodySubscriber.completeSubscriber(); |
|
275 appendToOutgoing(Http1BodySubscriber.COMPLETED); |
|
276 } else { |
|
277 bodySubscriber.request(1); // start |
|
278 } |
|
279 } catch (Throwable t) { |
|
280 connection.close(); |
|
281 bodySentCF.completeExceptionally(t); |
|
282 } |
|
283 return bodySentCF; |
|
284 } |
|
285 |
|
286 @Override |
|
287 CompletableFuture<Response> getResponseAsync(Executor executor) { |
|
288 CompletableFuture<Response> cf = response.readHeadersAsync(executor); |
|
289 Throwable cause; |
|
290 synchronized (lock) { |
|
291 operations.add(cf); |
|
292 cause = failed; |
|
293 failed = null; |
|
294 } |
|
295 |
|
296 if (cause != null) { |
|
297 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]" |
|
298 + "\n\tCompleting exceptionally with {2}\n", |
|
299 request.uri(), |
|
300 request.timeout().isPresent() ? |
|
301 // calling duration.toMillis() can throw an exception. |
|
302 // this is just debugging, we don't care if it overflows. |
|
303 (request.timeout().get().getSeconds() * 1000 |
|
304 + request.timeout().get().getNano() / 1000000) : -1, |
|
305 cause); |
|
306 boolean acknowledged = cf.completeExceptionally(cause); |
|
307 debug.log(Level.DEBUG, |
|
308 () -> acknowledged |
|
309 ? ("completed response with " + cause) |
|
310 : ("response already completed, ignoring " + cause)); |
|
311 } |
|
312 return cf; |
|
313 } |
|
314 |
|
315 @Override |
|
316 CompletableFuture<T> readBodyAsync(BodyHandler<T> handler, |
|
317 boolean returnConnectionToPool, |
|
318 Executor executor) |
|
319 { |
|
320 BodySubscriber<T> bs = handler.apply(response.responseCode(), |
|
321 response.responseHeaders()); |
|
322 CompletableFuture<T> bodyCF = response.readBody(bs, |
|
323 returnConnectionToPool, |
|
324 executor); |
|
325 return bodyCF; |
|
326 } |
|
327 |
|
328 @Override |
|
329 CompletableFuture<Void> ignoreBody() { |
|
330 return response.ignoreBody(executor); |
|
331 } |
|
332 |
|
333 ByteBuffer drainLeftOverBytes() { |
|
334 synchronized (lock) { |
|
335 asyncReceiver.stop(); |
|
336 return asyncReceiver.drain(Utils.EMPTY_BYTEBUFFER); |
|
337 } |
|
338 } |
|
339 |
|
340 void released() { |
|
341 Http1Response<T> resp = this.response; |
|
342 if (resp != null) resp.completed(); |
|
343 asyncReceiver.clear(); |
|
344 } |
|
345 |
|
346 void completed() { |
|
347 Http1Response<T> resp = this.response; |
|
348 if (resp != null) resp.completed(); |
|
349 } |
|
350 |
|
351 /** |
|
352 * Cancel checks to see if request and responseAsync finished already. |
|
353 * If not it closes the connection and completes all pending operations |
|
354 */ |
|
355 @Override |
|
356 void cancel() { |
|
357 cancelImpl(new IOException("Request cancelled")); |
|
358 } |
|
359 |
|
360 /** |
|
361 * Cancel checks to see if request and responseAsync finished already. |
|
362 * If not it closes the connection and completes all pending operations |
|
363 */ |
|
364 @Override |
|
365 void cancel(IOException cause) { |
|
366 cancelImpl(cause); |
|
367 } |
|
368 |
|
369 private void cancelImpl(Throwable cause) { |
|
370 LinkedList<CompletableFuture<?>> toComplete = null; |
|
371 int count = 0; |
|
372 synchronized (lock) { |
|
373 if (failed == null) |
|
374 failed = cause; |
|
375 if (requestAction != null && requestAction.finished() |
|
376 && response != null && response.finished()) { |
|
377 return; |
|
378 } |
|
379 connection.close(); // TODO: ensure non-blocking if holding the lock |
|
380 writePublisher.writeScheduler.stop(); |
|
381 if (operations.isEmpty()) { |
|
382 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation." |
|
383 + "\n\tCan''t cancel yet with {2}", |
|
384 request.uri(), |
|
385 request.timeout().isPresent() ? |
|
386 // calling duration.toMillis() can throw an exception. |
|
387 // this is just debugging, we don't care if it overflows. |
|
388 (request.timeout().get().getSeconds() * 1000 |
|
389 + request.timeout().get().getNano() / 1000000) : -1, |
|
390 cause); |
|
391 } else { |
|
392 for (CompletableFuture<?> cf : operations) { |
|
393 if (!cf.isDone()) { |
|
394 if (toComplete == null) toComplete = new LinkedList<>(); |
|
395 toComplete.add(cf); |
|
396 count++; |
|
397 } |
|
398 } |
|
399 operations.clear(); |
|
400 } |
|
401 } |
|
402 Log.logError("Http1Exchange.cancel: count=" + count); |
|
403 if (toComplete != null) { |
|
404 // We might be in the selector thread in case of timeout, when |
|
405 // the SelectorManager calls purgeTimeoutsAndReturnNextDeadline() |
|
406 // There may or may not be other places that reach here |
|
407 // from the SelectorManager thread, so just make sure we |
|
408 // don't complete any CF from within the selector manager |
|
409 // thread. |
|
410 Executor exec = client.isSelectorThread() |
|
411 ? executor |
|
412 : this::runInline; |
|
413 while (!toComplete.isEmpty()) { |
|
414 CompletableFuture<?> cf = toComplete.poll(); |
|
415 exec.execute(() -> { |
|
416 if (cf.completeExceptionally(cause)) { |
|
417 debug.log(Level.DEBUG, "completed cf with %s", |
|
418 (Object) cause); |
|
419 } |
|
420 }); |
|
421 } |
|
422 } |
|
423 } |
|
424 |
|
425 private void runInline(Runnable run) { |
|
426 assert !client.isSelectorThread(); |
|
427 run.run(); |
|
428 } |
|
429 |
|
430 /** Returns true if this exchange was canceled. */ |
|
431 boolean isCanceled() { |
|
432 synchronized (lock) { |
|
433 return failed != null; |
|
434 } |
|
435 } |
|
436 |
|
437 /** Returns the cause for which this exchange was canceled, if available. */ |
|
438 Throwable getCancelCause() { |
|
439 synchronized (lock) { |
|
440 return failed; |
|
441 } |
|
442 } |
|
443 |
|
444 /** Convenience for {@link #appendToOutgoing(DataPair)}, with just a Throwable. */ |
|
445 void appendToOutgoing(Throwable throwable) { |
|
446 appendToOutgoing(new DataPair(null, throwable)); |
|
447 } |
|
448 |
|
449 /** Convenience for {@link #appendToOutgoing(DataPair)}, with just data. */ |
|
450 void appendToOutgoing(List<ByteBuffer> item) { |
|
451 appendToOutgoing(new DataPair(item, null)); |
|
452 } |
|
453 |
|
454 private void appendToOutgoing(DataPair dp) { |
|
455 debug.log(Level.DEBUG, "appending to outgoing " + dp); |
|
456 outgoing.add(dp); |
|
457 writePublisher.writeScheduler.runOrSchedule(); |
|
458 } |
|
459 |
|
460 /** Tells whether, or not, there is any outgoing data that can be published, |
|
461 * or if there is an error. */ |
|
462 private boolean hasOutgoing() { |
|
463 return !outgoing.isEmpty(); |
|
464 } |
|
465 |
|
466 // Invoked only by the publisher |
|
467 // ALL tasks should execute off the Selector-Manager thread |
|
468 /** Returns the next portion of the HTTP request, or the error. */ |
|
469 private DataPair getOutgoing() { |
|
470 final Executor exec = client.theExecutor(); |
|
471 final DataPair dp = outgoing.pollFirst(); |
|
472 |
|
473 if (dp == null) // publisher has not published anything yet |
|
474 return null; |
|
475 |
|
476 synchronized (lock) { |
|
477 if (dp.throwable != null) { |
|
478 state = State.ERROR; |
|
479 exec.execute(() -> { |
|
480 connection.close(); |
|
481 headersSentCF.completeExceptionally(dp.throwable); |
|
482 bodySentCF.completeExceptionally(dp.throwable); |
|
483 }); |
|
484 return dp; |
|
485 } |
|
486 |
|
487 switch (state) { |
|
488 case HEADERS: |
|
489 state = State.BODY; |
|
490 // completeAsync, since dependent tasks should run in another thread |
|
491 debug.log(Level.DEBUG, "initiating completion of headersSentCF"); |
|
492 headersSentCF.completeAsync(() -> this, exec); |
|
493 break; |
|
494 case BODY: |
|
495 if (dp.data == Http1BodySubscriber.COMPLETED) { |
|
496 state = State.COMPLETING; |
|
497 debug.log(Level.DEBUG, "initiating completion of bodySentCF"); |
|
498 bodySentCF.completeAsync(() -> this, exec); |
|
499 } else { |
|
500 debug.log(Level.DEBUG, "requesting more body from the subscriber"); |
|
501 exec.execute(() -> bodySubscriber.request(1)); |
|
502 } |
|
503 break; |
|
504 case INITIAL: |
|
505 case ERROR: |
|
506 case COMPLETING: |
|
507 case COMPLETED: |
|
508 default: |
|
509 assert false : "Unexpected state:" + state; |
|
510 } |
|
511 |
|
512 return dp; |
|
513 } |
|
514 } |
|
515 |
|
516 /** A Publisher of HTTP/1.1 headers and request body. */ |
|
517 final class Http1Publisher implements FlowTube.TubePublisher { |
|
518 |
|
519 final System.Logger debug = Utils.getDebugLogger(this::dbgString); |
|
520 volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber; |
|
521 volatile boolean cancelled; |
|
522 final Http1WriteSubscription subscription = new Http1WriteSubscription(); |
|
523 final Demand demand = new Demand(); |
|
524 final SequentialScheduler writeScheduler = |
|
525 SequentialScheduler.synchronizedScheduler(new WriteTask()); |
|
526 |
|
527 @Override |
|
528 public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) { |
|
529 assert state == State.INITIAL; |
|
530 Objects.requireNonNull(s); |
|
531 assert subscriber == null; |
|
532 |
|
533 subscriber = s; |
|
534 debug.log(Level.DEBUG, "got subscriber: %s", s); |
|
535 s.onSubscribe(subscription); |
|
536 } |
|
537 |
|
538 volatile String dbgTag; |
|
539 String dbgString() { |
|
540 String tag = dbgTag; |
|
541 Object flow = connection.getConnectionFlow(); |
|
542 if (tag == null && flow != null) { |
|
543 dbgTag = tag = "Http1Publisher(" + flow + ")"; |
|
544 } else if (tag == null) { |
|
545 tag = "Http1Publisher(?)"; |
|
546 } |
|
547 return tag; |
|
548 } |
|
549 |
|
550 final class WriteTask implements Runnable { |
|
551 @Override |
|
552 public void run() { |
|
553 assert state != State.COMPLETED : "Unexpected state:" + state; |
|
554 debug.log(Level.DEBUG, "WriteTask"); |
|
555 if (subscriber == null) { |
|
556 debug.log(Level.DEBUG, "no subscriber yet"); |
|
557 return; |
|
558 } |
|
559 debug.log(Level.DEBUG, () -> "hasOutgoing = " + hasOutgoing()); |
|
560 while (hasOutgoing() && demand.tryDecrement()) { |
|
561 DataPair dp = getOutgoing(); |
|
562 |
|
563 if (dp.throwable != null) { |
|
564 debug.log(Level.DEBUG, "onError"); |
|
565 // Do not call the subscriber's onError, it is not required. |
|
566 writeScheduler.stop(); |
|
567 } else { |
|
568 List<ByteBuffer> data = dp.data; |
|
569 if (data == Http1BodySubscriber.COMPLETED) { |
|
570 synchronized (lock) { |
|
571 assert state == State.COMPLETING : "Unexpected state:" + state; |
|
572 state = State.COMPLETED; |
|
573 } |
|
574 debug.log(Level.DEBUG, |
|
575 "completed, stopping %s", writeScheduler); |
|
576 writeScheduler.stop(); |
|
577 // Do nothing more. Just do not publish anything further. |
|
578 // The next Subscriber will eventually take over. |
|
579 |
|
580 } else { |
|
581 debug.log(Level.DEBUG, () -> |
|
582 "onNext with " + Utils.remaining(data) + " bytes"); |
|
583 subscriber.onNext(data); |
|
584 } |
|
585 } |
|
586 } |
|
587 } |
|
588 } |
|
589 |
|
590 final class Http1WriteSubscription implements Flow.Subscription { |
|
591 |
|
592 @Override |
|
593 public void request(long n) { |
|
594 if (cancelled) |
|
595 return; //no-op |
|
596 demand.increase(n); |
|
597 debug.log(Level.DEBUG, |
|
598 "subscription request(%d), demand=%s", n, demand); |
|
599 writeScheduler.runOrSchedule(client.theExecutor()); |
|
600 } |
|
601 |
|
602 @Override |
|
603 public void cancel() { |
|
604 debug.log(Level.DEBUG, "subscription cancelled"); |
|
605 if (cancelled) |
|
606 return; //no-op |
|
607 cancelled = true; |
|
608 writeScheduler.stop(); |
|
609 } |
|
610 } |
|
611 } |
|
612 |
|
613 String dbgString() { |
|
614 return "Http1Exchange"; |
|
615 } |
|
616 } |
|