24 */ |
24 */ |
25 |
25 |
26 package jdk.internal.net.http.common; |
26 package jdk.internal.net.http.common; |
27 |
27 |
28 import java.io.Closeable; |
28 import java.io.Closeable; |
29 import java.lang.System.Logger.Level; |
|
30 import java.nio.ByteBuffer; |
29 import java.nio.ByteBuffer; |
31 import java.util.ArrayList; |
|
32 import java.util.List; |
30 import java.util.List; |
33 import java.util.Objects; |
31 import java.util.Objects; |
34 import java.util.concurrent.CompletableFuture; |
32 import java.util.concurrent.CompletableFuture; |
35 import java.util.concurrent.ConcurrentLinkedQueue; |
33 import java.util.concurrent.ConcurrentLinkedQueue; |
36 import java.util.concurrent.Flow; |
34 import java.util.concurrent.Flow; |
316 debug.log("DownstreamPusher: Pushing %d bytes downstream", |
314 debug.log("DownstreamPusher: Pushing %d bytes downstream", |
317 Utils.remaining(b)); |
315 Utils.remaining(b)); |
318 downstreamSubscriber.onNext(b); |
316 downstreamSubscriber.onNext(b); |
319 datasent = true; |
317 datasent = true; |
320 } |
318 } |
321 if (datasent) upstreamWindowUpdate(); |
319 |
|
320 // If we have sent some decrypted data downstream, |
|
321 // or if: |
|
322 // - there's nothing more available to send downstream |
|
323 // - and we still have some demand from downstream |
|
324 // - and upstream is not completed yet |
|
325 // - and our demand from upstream has reached 0, |
|
326 // then check whether we should request more data from |
|
327 // upstream |
|
328 if (datasent || outputQ.isEmpty() |
|
329 && !downstreamSubscription.demand.isFulfilled() |
|
330 && !upstreamCompleted |
|
331 && upstreamWindow.get() == 0) { |
|
332 upstreamWindowUpdate(); |
|
333 } |
322 checkCompletion(); |
334 checkCompletion(); |
323 } |
335 } |
|
336 } |
|
337 |
|
338 final int outputQueueSize() { |
|
339 return outputQ.size(); |
|
340 } |
|
341 |
|
342 final boolean hasNoOutputData() { |
|
343 return outputQ.isEmpty(); |
324 } |
344 } |
325 |
345 |
326 void upstreamWindowUpdate() { |
346 void upstreamWindowUpdate() { |
327 long downstreamQueueSize = outputQ.size(); |
347 long downstreamQueueSize = outputQ.size(); |
328 long upstreamWindowSize = upstreamWindow.get(); |
348 long upstreamWindowSize = upstreamWindow.get(); |
339 public void onSubscribe(Flow.Subscription subscription) { |
359 public void onSubscribe(Flow.Subscription subscription) { |
340 if (upstreamSubscription != null) { |
360 if (upstreamSubscription != null) { |
341 throw new IllegalStateException("Single shot publisher"); |
361 throw new IllegalStateException("Single shot publisher"); |
342 } |
362 } |
343 this.upstreamSubscription = subscription; |
363 this.upstreamSubscription = subscription; |
344 upstreamRequest(upstreamWindowUpdate(0, 0)); |
364 upstreamRequest(initialUpstreamDemand()); |
345 if (debug.on()) |
365 if (debug.on()) |
346 debug.log("calling downstreamSubscriber::onSubscribe on %s", |
366 debug.log("calling downstreamSubscriber::onSubscribe on %s", |
347 downstreamSubscriber); |
367 downstreamSubscriber); |
348 downstreamSubscriber.onSubscribe(downstreamSubscription); |
368 downstreamSubscriber.onSubscribe(downstreamSubscription); |
349 onSubscribe(); |
369 onSubscribe(); |
354 if (debug.on()) debug.log("onNext"); |
374 if (debug.on()) debug.log("onNext"); |
355 long prev = upstreamWindow.getAndDecrement(); |
375 long prev = upstreamWindow.getAndDecrement(); |
356 if (prev <= 0) |
376 if (prev <= 0) |
357 throw new IllegalStateException("invalid onNext call"); |
377 throw new IllegalStateException("invalid onNext call"); |
358 incomingCaller(item, false); |
378 incomingCaller(item, false); |
359 upstreamWindowUpdate(); |
|
360 } |
379 } |
361 |
380 |
362 private void upstreamRequest(long n) { |
381 private void upstreamRequest(long n) { |
363 if (debug.on()) debug.log("requesting %d", n); |
382 if (debug.on()) debug.log("requesting %d", n); |
364 upstreamWindow.getAndAdd(n); |
383 upstreamWindow.getAndAdd(n); |
365 upstreamSubscription.request(n); |
384 upstreamSubscription.request(n); |
|
385 } |
|
386 |
|
387 /** |
|
388 * Initial demand that should be requested |
|
389 * from upstream when we get the upstream subscription |
|
390 * from {@link #onSubscribe(Flow.Subscription)}. |
|
391 * @return The initial demand to request from upstream. |
|
392 */ |
|
393 protected long initialUpstreamDemand() { |
|
394 return 1; |
366 } |
395 } |
367 |
396 |
368 protected void requestMore() { |
397 protected void requestMore() { |
369 if (upstreamWindow.get() == 0) { |
398 if (upstreamWindow.get() == 0) { |
370 upstreamRequest(1); |
399 upstreamRequest(1); |