24 */ |
24 */ |
25 |
25 |
26 package jdk.incubator.http; |
26 package jdk.incubator.http; |
27 |
27 |
28 import java.io.IOException; |
28 import java.io.IOException; |
|
29 import java.lang.System.Logger.Level; |
29 import java.net.URI; |
30 import java.net.URI; |
30 import java.nio.ByteBuffer; |
31 import java.nio.ByteBuffer; |
31 import java.util.ArrayList; |
32 import java.util.ArrayList; |
|
33 import java.util.Arrays; |
|
34 import java.util.Collections; |
32 import java.util.List; |
35 import java.util.List; |
33 import java.util.Optional; |
36 import java.util.Optional; |
34 import java.util.concurrent.CompletableFuture; |
37 import java.util.concurrent.CompletableFuture; |
35 import java.util.concurrent.CompletionException; |
38 import java.util.concurrent.ConcurrentLinkedQueue; |
36 import java.util.concurrent.ExecutionException; |
|
37 import java.util.concurrent.Executor; |
39 import java.util.concurrent.Executor; |
38 import java.util.concurrent.Flow; |
40 import java.util.concurrent.Flow; |
39 import java.util.concurrent.Flow.Subscription; |
41 import java.util.concurrent.Flow.Subscription; |
40 import java.util.concurrent.TimeUnit; |
42 import java.util.concurrent.atomic.AtomicReference; |
41 import java.util.concurrent.TimeoutException; |
43 import java.util.stream.Collectors; |
42 import java.util.function.Consumer; |
|
43 |
44 |
44 import jdk.incubator.http.internal.common.*; |
45 import jdk.incubator.http.internal.common.*; |
|
46 import jdk.incubator.http.internal.common.SequentialScheduler; |
|
47 import jdk.incubator.http.internal.common.SequentialScheduler.SynchronizedRestartableTask; |
45 import jdk.incubator.http.internal.frame.*; |
48 import jdk.incubator.http.internal.frame.*; |
46 import jdk.incubator.http.internal.hpack.DecodingCallback; |
49 import jdk.incubator.http.internal.hpack.DecodingCallback; |
|
50 import static java.util.stream.Collectors.toList; |
47 |
51 |
48 /** |
52 /** |
49 * Http/2 Stream handling. |
53 * Http/2 Stream handling. |
50 * |
54 * |
51 * REQUESTS |
55 * REQUESTS |
52 * |
56 * |
53 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q |
57 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q |
54 * |
58 * |
55 * sendRequest() -- sendHeadersOnly() + sendBody() |
59 * sendRequest() -- sendHeadersOnly() + sendBody() |
56 * |
|
57 * sendBody() -- in calling thread: obeys all flow control (so may block) |
|
58 * obtains data from request body processor and places on connection |
|
59 * outbound Q. |
|
60 * |
60 * |
61 * sendBodyAsync() -- calls sendBody() in an executor thread. |
61 * sendBodyAsync() -- calls sendBody() in an executor thread. |
62 * |
62 * |
63 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block |
63 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block |
64 * |
64 * |
144 @Override |
148 @Override |
145 HttpConnection connection() { |
149 HttpConnection connection() { |
146 return connection.connection; |
150 return connection.connection; |
147 } |
151 } |
148 |
152 |
|
153 /** |
|
154 * Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() } |
|
155 * of after user subscription window has re-opened, from SubscriptionBase.request() |
|
156 */ |
|
157 private void schedule() { |
|
158 if (responseSubscriber == null) |
|
159 // can't process anything yet |
|
160 return; |
|
161 |
|
162 while (!inputQ.isEmpty()) { |
|
163 Http2Frame frame = inputQ.peek(); |
|
164 if (frame instanceof ResetFrame) { |
|
165 inputQ.remove(); |
|
166 handleReset((ResetFrame)frame); |
|
167 return; |
|
168 } |
|
169 DataFrame df = (DataFrame)frame; |
|
170 boolean finished = df.getFlag(DataFrame.END_STREAM); |
|
171 |
|
172 ByteBufferReference[] buffers = df.getData(); |
|
173 List<ByteBuffer> dsts = Arrays.stream(buffers) |
|
174 .map(ByteBufferReference::get) |
|
175 .filter(ByteBuffer::hasRemaining) |
|
176 .collect(Collectors.collectingAndThen(toList(), Collections::unmodifiableList)); |
|
177 int size = (int)Utils.remaining(dsts); |
|
178 if (size == 0 && finished) { |
|
179 inputQ.remove(); |
|
180 Log.logTrace("responseSubscriber.onComplete"); |
|
181 debug.log(Level.DEBUG, "incoming: onComplete"); |
|
182 sched.stop(); |
|
183 responseSubscriber.onComplete(); |
|
184 setEndStreamReceived(); |
|
185 return; |
|
186 } else if (userSubscription.tryDecrement()) { |
|
187 inputQ.remove(); |
|
188 Log.logTrace("responseSubscriber.onNext {0}", size); |
|
189 debug.log(Level.DEBUG, "incoming: onNext(%d)", size); |
|
190 responseSubscriber.onNext(dsts); |
|
191 if (consumed(df)) { |
|
192 Log.logTrace("responseSubscriber.onComplete"); |
|
193 debug.log(Level.DEBUG, "incoming: onComplete"); |
|
194 sched.stop(); |
|
195 responseSubscriber.onComplete(); |
|
196 setEndStreamReceived(); |
|
197 return; |
|
198 } |
|
199 } else { |
|
200 return; |
|
201 } |
|
202 } |
|
203 Throwable t = failed; |
|
204 if (t != null) { |
|
205 sched.stop(); |
|
206 responseSubscriber.onError(t); |
|
207 close(); |
|
208 } |
|
209 } |
|
210 |
|
211 // Callback invoked after the Response BodyProcessor has consumed the |
|
212 // buffers contained in a DataFrame. |
|
213 // Returns true if END_STREAM is reached, false otherwise. |
|
214 private boolean consumed(DataFrame df) { |
|
215 // RFC 7540 6.1: |
|
216 // The entire DATA frame payload is included in flow control, |
|
217 // including the Pad Length and Padding fields if present |
|
218 int len = df.payloadLength(); |
|
219 connection.windowUpdater.update(len); |
|
220 |
|
221 if (!df.getFlag(DataFrame.END_STREAM)) { |
|
222 // Don't send window update on a stream which is |
|
223 // closed or half closed. |
|
224 windowUpdater.update(len); |
|
225 return false; // more data coming |
|
226 } |
|
227 return true; // end of stream |
|
228 } |
|
229 |
149 @Override |
230 @Override |
150 CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, |
231 CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler, |
151 boolean returnConnectionToPool, |
232 boolean returnConnectionToPool, |
152 Executor executor) |
233 Executor executor) |
153 { |
234 { |
154 Log.logTrace("Reading body on stream {0}", streamid); |
235 Log.logTrace("Reading body on stream {0}", streamid); |
155 responseProcessor = handler.apply(responseCode, responseHeaders); |
236 responseSubscriber = handler.apply(responseCode, responseHeaders); |
156 publisher.subscribe(responseProcessor); |
237 CompletableFuture<T> cf = receiveData(); |
157 CompletableFuture<T> cf = receiveData(executor); |
|
158 |
238 |
159 PushGroup<?,?> pg = exchange.getPushGroup(); |
239 PushGroup<?,?> pg = exchange.getPushGroup(); |
160 if (pg != null) { |
240 if (pg != null) { |
161 // if an error occurs make sure it is recorded in the PushGroup |
241 // if an error occurs make sure it is recorded in the PushGroup |
162 cf = cf.whenComplete((t,e) -> pg.pushError(e)); |
242 cf = cf.whenComplete((t,e) -> pg.pushError(e)); |
163 } |
243 } |
164 return cf; |
244 return cf; |
165 } |
|
166 |
|
167 @Override |
|
168 T readBody(HttpResponse.BodyHandler<T> handler, boolean returnConnectionToPool) |
|
169 throws IOException |
|
170 { |
|
171 CompletableFuture<T> cf = readBodyAsync(handler, |
|
172 returnConnectionToPool, |
|
173 null); |
|
174 try { |
|
175 return cf.join(); |
|
176 } catch (CompletionException e) { |
|
177 throw Utils.getIOException(e); |
|
178 } |
|
179 } |
245 } |
180 |
246 |
181 @Override |
247 @Override |
182 public String toString() { |
248 public String toString() { |
183 StringBuilder sb = new StringBuilder(); |
249 StringBuilder sb = new StringBuilder(); |
184 sb.append("streamid: ") |
250 sb.append("streamid: ") |
185 .append(streamid); |
251 .append(streamid); |
186 return sb.toString(); |
252 return sb.toString(); |
187 } |
253 } |
188 |
254 |
189 private boolean receiveDataFrame(Http2Frame frame) throws IOException, InterruptedException { |
255 private void receiveDataFrame(DataFrame df) { |
190 if (frame instanceof ResetFrame) { |
256 inputQ.add(df); |
191 handleReset((ResetFrame) frame); |
257 sched.runOrSchedule(); |
192 return true; |
258 } |
193 } else if (!(frame instanceof DataFrame)) { |
259 |
194 assert false; |
260 /** |
195 return true; |
261 * RESET always handled inline in queue |
196 } |
262 */ |
197 DataFrame df = (DataFrame) frame; |
263 private void receiveResetFrame(ResetFrame frame) { |
198 // RFC 7540 6.1: |
264 inputQ.add(frame); |
199 // The entire DATA frame payload is included in flow control, |
265 sched.runOrSchedule(); |
200 // including the Pad Length and Padding fields if present |
266 } |
201 int len = df.payloadLength(); |
267 |
202 ByteBufferReference[] buffers = df.getData(); |
268 // pushes entire response body into response subscriber |
203 for (ByteBufferReference b : buffers) { |
|
204 ByteBuffer buf = b.get(); |
|
205 if (buf.hasRemaining()) { |
|
206 publisher.acceptData(Optional.of(buf)); |
|
207 } |
|
208 } |
|
209 connection.windowUpdater.update(len); |
|
210 if (df.getFlag(DataFrame.END_STREAM)) { |
|
211 setEndStreamReceived(); |
|
212 publisher.acceptData(Optional.empty()); |
|
213 return false; |
|
214 } |
|
215 // Don't send window update on a stream which is |
|
216 // closed or half closed. |
|
217 windowUpdater.update(len); |
|
218 return true; |
|
219 } |
|
220 |
|
221 // pushes entire response body into response processor |
|
222 // blocking when required by local or remote flow control |
269 // blocking when required by local or remote flow control |
223 CompletableFuture<T> receiveData(Executor executor) { |
270 CompletableFuture<T> receiveData() { |
224 CompletableFuture<T> cf = responseProcessor |
271 responseBodyCF = responseSubscriber |
225 .getBody() |
272 .getBody() |
226 .toCompletableFuture(); |
273 .toCompletableFuture(); |
227 Consumer<Throwable> onError = e -> { |
274 |
228 Log.logTrace("receiveData: {0}", e.toString()); |
275 if (isCanceled()) { |
229 e.printStackTrace(); |
276 Throwable t = getCancelCause(); |
230 cf.completeExceptionally(e); |
277 responseBodyCF.completeExceptionally(t); |
231 publisher.acceptError(e); |
278 sched.runOrSchedule(); |
232 }; |
|
233 if (executor == null) { |
|
234 inputQ.blockingReceive(this::receiveDataFrame, onError); |
|
235 } else { |
279 } else { |
236 inputQ.asyncReceive(executor, this::receiveDataFrame, onError); |
280 responseSubscriber.onSubscribe(userSubscription); |
237 } |
281 sched.runOrSchedule(); // in case data waiting already to be processed |
238 return cf; |
282 } |
|
283 return responseBodyCF; |
239 } |
284 } |
240 |
285 |
241 @Override |
286 @Override |
242 void sendBody() throws IOException { |
|
243 try { |
|
244 sendBodyImpl().join(); |
|
245 } catch (CompletionException e) { |
|
246 throw Utils.getIOException(e); |
|
247 } |
|
248 } |
|
249 |
|
250 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { |
287 CompletableFuture<ExchangeImpl<T>> sendBodyAsync() { |
251 return sendBodyImpl().thenApply( v -> this); |
288 return sendBodyImpl().thenApply( v -> this); |
252 } |
289 } |
253 |
290 |
254 @SuppressWarnings("unchecked") |
291 @SuppressWarnings("unchecked") |
347 } |
384 } |
348 |
385 |
349 completeResponse(response); |
386 completeResponse(response); |
350 } |
387 } |
351 |
388 |
352 void incoming_reset(ResetFrame frame) throws IOException { |
389 void incoming_reset(ResetFrame frame) { |
353 Log.logTrace("Received RST_STREAM on stream {0}", streamid); |
390 Log.logTrace("Received RST_STREAM on stream {0}", streamid); |
354 if (endStreamReceived()) { |
391 if (endStreamReceived()) { |
355 Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid); |
392 Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid); |
356 } else if (closed) { |
393 } else if (closed) { |
357 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); |
394 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); |
358 } else { |
395 } else { |
359 boolean pushedToQueue = false; |
396 // put it in the input queue in order to read all |
360 synchronized(this) { |
397 // pending data frames first. Indeed, a server may send |
361 // if the response headers are not yet |
398 // RST_STREAM after sending END_STREAM, in which case we should |
362 // received, or the inputQueue is closed, handle reset directly. |
399 // ignore it. However, we won't know if we have received END_STREAM |
363 // Otherwise, put it in the input queue in order to read all |
400 // or not until all pending data frames are read. |
364 // pending data frames first. Indeed, a server may send |
401 receiveResetFrame(frame); |
365 // RST_STREAM after sending END_STREAM, in which case we should |
402 // RST_STREAM was pushed to the queue. It will be handled by |
366 // ignore it. However, we won't know if we have received END_STREAM |
403 // asyncReceive after all pending data frames have been |
367 // or not until all pending data frames are read. |
404 // processed. |
368 // Because the inputQ will not be read until the response |
405 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); |
369 // headers are received, and because response headers won't be |
406 } |
370 // sent if the server sent RST_STREAM, then we must handle |
407 } |
371 // reset here directly unless responseHeadersReceived is true. |
408 |
372 pushedToQueue = !closed && responseHeadersReceived && inputQ.tryPut(frame); |
409 void handleReset(ResetFrame frame) { |
373 } |
|
374 if (!pushedToQueue) { |
|
375 // RST_STREAM was not pushed to the queue: handle it. |
|
376 try { |
|
377 handleReset(frame); |
|
378 } catch (IOException io) { |
|
379 completeResponseExceptionally(io); |
|
380 } |
|
381 } else { |
|
382 // RST_STREAM was pushed to the queue. It will be handled by |
|
383 // asyncReceive after all pending data frames have been |
|
384 // processed. |
|
385 Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid); |
|
386 } |
|
387 } |
|
388 } |
|
389 |
|
390 void handleReset(ResetFrame frame) throws IOException { |
|
391 Log.logTrace("Handling RST_STREAM on stream {0}", streamid); |
410 Log.logTrace("Handling RST_STREAM on stream {0}", streamid); |
392 if (!closed) { |
411 if (!closed) { |
393 close(); |
412 close(); |
394 int error = frame.getErrorCode(); |
413 int error = frame.getErrorCode(); |
395 throw new IOException(ErrorFrame.stringForCode(error)); |
414 completeResponseExceptionally(new IOException(ErrorFrame.stringForCode(error))); |
396 } else { |
415 } else { |
397 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); |
416 Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid); |
398 } |
417 } |
399 } |
418 } |
400 |
419 |
556 private boolean endStreamReceived() { |
549 private boolean endStreamReceived() { |
557 return remotelyClosed; |
550 return remotelyClosed; |
558 } |
551 } |
559 |
552 |
560 @Override |
553 @Override |
561 void sendHeadersOnly() throws IOException, InterruptedException { |
554 CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() { |
|
555 debug.log(Level.DEBUG, "sendHeadersOnly()"); |
562 if (Log.requests() && request != null) { |
556 if (Log.requests() && request != null) { |
563 Log.logRequest(request.toString()); |
557 Log.logRequest(request.toString()); |
564 } |
558 } |
565 requestContentLen = requestProcessor.contentLength(); |
559 if (requestPublisher != null) { |
|
560 requestContentLen = requestPublisher.contentLength(); |
|
561 } else { |
|
562 requestContentLen = 0; |
|
563 } |
566 OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen); |
564 OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen); |
567 connection.sendFrame(f); |
565 connection.sendFrame(f); |
|
566 CompletableFuture<ExchangeImpl<T>> cf = new CompletableFuture<ExchangeImpl<T>>(); |
|
567 cf.complete(this); // #### good enough for now |
|
568 return cf; |
|
569 } |
|
570 |
|
571 @Override |
|
572 void released() { |
|
573 if (streamid > 0) { |
|
574 debug.log(Level.DEBUG, "Released stream %d", streamid); |
|
575 // remove this stream from the Http2Connection map. |
|
576 connection.closeStream(streamid); |
|
577 } else { |
|
578 debug.log(Level.DEBUG, "Can't release stream %d", streamid); |
|
579 } |
|
580 } |
|
581 |
|
582 @Override |
|
583 void completed() { |
|
584 // There should be nothing to do here: the stream should have |
|
585 // been already closed (or will be closed shortly after). |
568 } |
586 } |
569 |
587 |
570 void registerStream(int id) { |
588 void registerStream(int id) { |
571 this.streamid = id; |
589 this.streamid = id; |
572 connection.putStream(this, streamid); |
590 connection.putStream(this, streamid); |
573 } |
591 debug.log(Level.DEBUG, "Registered stream %d", id); |
574 |
592 } |
|
593 |
|
594 void signalWindowUpdate() { |
|
595 RequestSubscriber subscriber = requestSubscriber; |
|
596 assert subscriber != null; |
|
597 debug.log(Level.DEBUG, "Signalling window update"); |
|
598 subscriber.sendScheduler.runOrSchedule(); |
|
599 } |
|
600 |
|
601 static final ByteBuffer COMPLETED = ByteBuffer.allocate(0); |
575 class RequestSubscriber implements Flow.Subscriber<ByteBuffer> { |
602 class RequestSubscriber implements Flow.Subscriber<ByteBuffer> { |
576 // can be < 0 if the actual length is not known. |
603 // can be < 0 if the actual length is not known. |
|
604 private final long contentLength; |
577 private volatile long remainingContentLength; |
605 private volatile long remainingContentLength; |
578 private volatile Subscription subscription; |
606 private volatile Subscription subscription; |
|
607 private volatile ByteBuffer current; |
|
608 private final AtomicReference<Throwable> errorRef = new AtomicReference<>(); |
|
609 // A scheduler used to honor window updates. Writing must be paused |
|
610 // when the window is exhausted, and resumed when the window acquires |
|
611 // some space. The sendScheduler makes it possible to implement this |
|
612 // behaviour in an asynchronous non-blocking way. |
|
613 // See RequestSubscriber::trySend below. |
|
614 final SequentialScheduler sendScheduler; |
579 |
615 |
580 RequestSubscriber(long contentLen) { |
616 RequestSubscriber(long contentLen) { |
|
617 this.contentLength = contentLen; |
581 this.remainingContentLength = contentLen; |
618 this.remainingContentLength = contentLen; |
|
619 this.sendScheduler = new SequentialScheduler( |
|
620 new SynchronizedRestartableTask(this::trySend)); |
582 } |
621 } |
583 |
622 |
584 @Override |
623 @Override |
585 public void onSubscribe(Flow.Subscription subscription) { |
624 public void onSubscribe(Flow.Subscription subscription) { |
586 if (this.subscription != null) { |
625 if (this.subscription != null) { |
587 throw new IllegalStateException(); |
626 throw new IllegalStateException("already subscribed"); |
588 } |
627 } |
589 this.subscription = subscription; |
628 this.subscription = subscription; |
|
629 debug.log(Level.DEBUG, "RequestSubscriber: onSubscribe, request 1"); |
590 subscription.request(1); |
630 subscription.request(1); |
591 } |
631 } |
592 |
632 |
593 @Override |
633 @Override |
594 public void onNext(ByteBuffer item) { |
634 public void onNext(ByteBuffer item) { |
|
635 debug.log(Level.DEBUG, |
|
636 "RequestSubscriber: onNext(%d)", item.remaining()); |
|
637 // Got some more request body bytes to send. |
595 if (requestBodyCF.isDone()) { |
638 if (requestBodyCF.isDone()) { |
596 throw new IllegalStateException(); |
639 // stream already cancelled, probably in timeout |
597 } |
640 sendScheduler.stop(); |
598 |
641 subscription.cancel(); |
|
642 return; |
|
643 } |
|
644 ByteBuffer prev = current; |
|
645 assert prev == null; |
|
646 current = item; |
|
647 sendScheduler.runOrSchedule(); |
|
648 } |
|
649 |
|
650 @Override |
|
651 public void onError(Throwable throwable) { |
|
652 debug.log(Level.DEBUG, |
|
653 () -> "RequestSubscriber: onError: " + throwable); |
|
654 // ensure that errors are handled within the flow. |
|
655 if (errorRef.compareAndSet(null, throwable)) { |
|
656 sendScheduler.runOrSchedule(); |
|
657 } |
|
658 } |
|
659 |
|
660 @Override |
|
661 public void onComplete() { |
|
662 debug.log(Level.DEBUG, "RequestSubscriber: onComplete"); |
|
663 // last byte of request body has been obtained. |
|
664 // ensure that everything is completed within the flow. |
|
665 onNext(COMPLETED); |
|
666 } |
|
667 |
|
668 // Attempts to send the data, if any. |
|
669 // Handles errors and completion state. |
|
670 // Pause writing if the send window is exhausted, resume it if the |
|
671 // send window has some bytes that can be acquired. |
|
672 void trySend() { |
599 try { |
673 try { |
|
674 // handle errors raised by onError; |
|
675 Throwable t = errorRef.get(); |
|
676 if (t != null) { |
|
677 sendScheduler.stop(); |
|
678 if (requestBodyCF.isDone()) return; |
|
679 subscription.cancel(); |
|
680 requestBodyCF.completeExceptionally(t); |
|
681 return; |
|
682 } |
|
683 |
|
684 // handle COMPLETED; |
|
685 ByteBuffer item = current; |
|
686 if (item == null) return; |
|
687 else if (item == COMPLETED) { |
|
688 sendScheduler.stop(); |
|
689 complete(); |
|
690 return; |
|
691 } |
|
692 |
|
693 // handle bytes to send downstream |
600 while (item.hasRemaining()) { |
694 while (item.hasRemaining()) { |
|
695 debug.log(Level.DEBUG, "trySend: %d", item.remaining()); |
601 assert !endStreamSent : "internal error, send data after END_STREAM flag"; |
696 assert !endStreamSent : "internal error, send data after END_STREAM flag"; |
602 DataFrame df = getDataFrame(item); |
697 DataFrame df = getDataFrame(item); |
603 if (remainingContentLength > 0) { |
698 if (df == null) { |
|
699 debug.log(Level.DEBUG, "trySend: can't send yet: %d", |
|
700 item.remaining()); |
|
701 return; // the send window is exhausted: come back later |
|
702 } |
|
703 |
|
704 if (contentLength > 0) { |
604 remainingContentLength -= df.getDataLength(); |
705 remainingContentLength -= df.getDataLength(); |
605 assert remainingContentLength >= 0; |
706 if (remainingContentLength < 0) { |
606 if (remainingContentLength == 0) { |
707 String msg = connection().getConnectionFlow() |
|
708 + " stream=" + streamid + " " |
|
709 + "[" + Thread.currentThread().getName() +"] " |
|
710 + "Too many bytes in request body. Expected: " |
|
711 + contentLength + ", got: " |
|
712 + (contentLength - remainingContentLength); |
|
713 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); |
|
714 throw new IOException(msg); |
|
715 } else if (remainingContentLength == 0) { |
607 df.setFlag(DataFrame.END_STREAM); |
716 df.setFlag(DataFrame.END_STREAM); |
608 endStreamSent = true; |
717 endStreamSent = true; |
609 } |
718 } |
610 } |
719 } |
|
720 debug.log(Level.DEBUG, "trySend: sending: %d", df.getDataLength()); |
611 connection.sendDataFrame(df); |
721 connection.sendDataFrame(df); |
612 } |
722 } |
|
723 assert !item.hasRemaining(); |
|
724 current = null; |
|
725 debug.log(Level.DEBUG, "trySend: request 1"); |
613 subscription.request(1); |
726 subscription.request(1); |
614 } catch (InterruptedException ex) { |
727 } catch (Throwable ex) { |
|
728 debug.log(Level.DEBUG, "trySend: ", ex); |
|
729 sendScheduler.stop(); |
615 subscription.cancel(); |
730 subscription.cancel(); |
616 requestBodyCF.completeExceptionally(ex); |
731 requestBodyCF.completeExceptionally(ex); |
617 } |
732 } |
618 } |
733 } |
619 |
734 |
620 @Override |
735 private void complete() throws IOException { |
621 public void onError(Throwable throwable) { |
736 long remaining = remainingContentLength; |
622 if (requestBodyCF.isDone()) { |
737 long written = contentLength - remaining; |
623 return; |
738 if (remaining > 0) { |
624 } |
739 connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR); |
625 subscription.cancel(); |
740 // let trySend() handle the exception |
626 requestBodyCF.completeExceptionally(throwable); |
741 throw new IOException(connection().getConnectionFlow() |
627 } |
742 + " stream=" + streamid + " " |
628 |
743 + "[" + Thread.currentThread().getName() +"] " |
629 @Override |
744 + "Too few bytes returned by the publisher (" |
630 public void onComplete() { |
745 + written + "/" |
631 assert endStreamSent || remainingContentLength < 0; |
746 + contentLength + ")"); |
632 try { |
747 } |
633 if (!endStreamSent) { |
748 if (!endStreamSent) { |
634 endStreamSent = true; |
749 endStreamSent = true; |
635 connection.sendDataFrame(getEmptyEndStreamDataFrame()); |
750 connection.sendDataFrame(getEmptyEndStreamDataFrame()); |
636 } |
751 } |
637 requestBodyCF.complete(null); |
752 requestBodyCF.complete(null); |
638 } catch (InterruptedException ex) { |
753 } |
639 requestBodyCF.completeExceptionally(ex); |
754 } |
640 } |
755 |
641 } |
756 DataFrame getDataFrame(ByteBuffer buffer) { |
642 } |
|
643 |
|
644 DataFrame getDataFrame(ByteBuffer buffer) throws InterruptedException { |
|
645 int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining()); |
757 int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining()); |
646 // blocks waiting for stream send window, if exhausted |
758 // blocks waiting for stream send window, if exhausted |
647 int actualAmount = windowController.tryAcquire(requestAmount, streamid); |
759 int actualAmount = windowController.tryAcquire(requestAmount, streamid, this); |
|
760 if (actualAmount <= 0) return null; |
648 ByteBuffer outBuf = Utils.slice(buffer, actualAmount); |
761 ByteBuffer outBuf = Utils.slice(buffer, actualAmount); |
649 DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf)); |
762 DataFrame df = new DataFrame(streamid, 0 , ByteBufferReference.of(outBuf)); |
650 return df; |
763 return df; |
651 } |
764 } |
652 |
765 |
653 private DataFrame getEmptyEndStreamDataFrame() throws InterruptedException { |
766 private DataFrame getEmptyEndStreamDataFrame() { |
654 return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]); |
767 return new DataFrame(streamid, DataFrame.END_STREAM, new ByteBufferReference[0]); |
655 } |
768 } |
656 |
769 |
657 /** |
770 /** |
658 * A List of responses relating to this stream. Normally there is only |
771 * A List of responses relating to this stream. Normally there is only |