37 import java.security.PrivilegedActionException; |
37 import java.security.PrivilegedActionException; |
38 import java.security.PrivilegedExceptionAction; |
38 import java.security.PrivilegedExceptionAction; |
39 import java.util.ArrayList; |
39 import java.util.ArrayList; |
40 import java.util.Iterator; |
40 import java.util.Iterator; |
41 import java.util.List; |
41 import java.util.List; |
|
42 import java.util.Objects; |
42 import java.util.Optional; |
43 import java.util.Optional; |
43 import java.util.concurrent.ArrayBlockingQueue; |
44 import java.util.concurrent.ArrayBlockingQueue; |
44 import java.util.concurrent.BlockingQueue; |
45 import java.util.concurrent.BlockingQueue; |
45 import java.util.concurrent.CompletableFuture; |
46 import java.util.concurrent.CompletableFuture; |
46 import java.util.concurrent.CompletionStage; |
47 import java.util.concurrent.CompletionStage; |
47 import java.util.concurrent.ConcurrentHashMap; |
48 import java.util.concurrent.ConcurrentHashMap; |
48 import java.util.concurrent.Flow; |
49 import java.util.concurrent.Flow; |
|
50 import java.util.concurrent.atomic.AtomicBoolean; |
49 import java.util.function.Consumer; |
51 import java.util.function.Consumer; |
50 import java.util.function.Function; |
52 import java.util.function.Function; |
51 import jdk.incubator.http.internal.common.MinimalFuture; |
53 import jdk.incubator.http.internal.common.MinimalFuture; |
52 import jdk.incubator.http.internal.common.Utils; |
54 import jdk.incubator.http.internal.common.Utils; |
53 |
55 |
55 |
57 |
56 static class ConsumerSubscriber implements HttpResponse.BodySubscriber<Void> { |
58 static class ConsumerSubscriber implements HttpResponse.BodySubscriber<Void> { |
57 private final Consumer<Optional<byte[]>> consumer; |
59 private final Consumer<Optional<byte[]>> consumer; |
58 private Flow.Subscription subscription; |
60 private Flow.Subscription subscription; |
59 private final CompletableFuture<Void> result = new MinimalFuture<>(); |
61 private final CompletableFuture<Void> result = new MinimalFuture<>(); |
|
62 private final AtomicBoolean subscribed = new AtomicBoolean(); |
60 |
63 |
61 ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) { |
64 ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) { |
62 this.consumer = consumer; |
65 this.consumer = consumer; |
63 } |
66 } |
64 |
67 |
67 return result; |
70 return result; |
68 } |
71 } |
69 |
72 |
70 @Override |
73 @Override |
71 public void onSubscribe(Flow.Subscription subscription) { |
74 public void onSubscribe(Flow.Subscription subscription) { |
72 this.subscription = subscription; |
75 if (!subscribed.compareAndSet(false, true)) { |
73 subscription.request(1); |
76 subscription.cancel(); |
|
77 } else { |
|
78 this.subscription = subscription; |
|
79 subscription.request(1); |
|
80 } |
74 } |
81 } |
75 |
82 |
76 @Override |
83 @Override |
77 public void onNext(List<ByteBuffer> items) { |
84 public void onNext(List<ByteBuffer> items) { |
78 for (ByteBuffer item : items) { |
85 for (ByteBuffer item : items) { |
250 private volatile Flow.Subscription subscription; |
257 private volatile Flow.Subscription subscription; |
251 private volatile boolean closed; |
258 private volatile boolean closed; |
252 private volatile Throwable failed; |
259 private volatile Throwable failed; |
253 private volatile Iterator<ByteBuffer> currentListItr; |
260 private volatile Iterator<ByteBuffer> currentListItr; |
254 private volatile ByteBuffer currentBuffer; |
261 private volatile ByteBuffer currentBuffer; |
|
262 private final AtomicBoolean subscribed = new AtomicBoolean(); |
255 |
263 |
256 HttpResponseInputStream() { |
264 HttpResponseInputStream() { |
257 this(MAX_BUFFERS_IN_QUEUE); |
265 this(MAX_BUFFERS_IN_QUEUE); |
258 } |
266 } |
259 |
267 |
349 return buffer.get() & 0xFF; |
357 return buffer.get() & 0xFF; |
350 } |
358 } |
351 |
359 |
352 @Override |
360 @Override |
353 public void onSubscribe(Flow.Subscription s) { |
361 public void onSubscribe(Flow.Subscription s) { |
354 if (this.subscription != null) { |
362 if (!subscribed.compareAndSet(false, true)) { |
355 s.cancel(); |
363 s.cancel(); |
356 return; |
364 } else { |
357 } |
365 this.subscription = s; |
358 this.subscription = s; |
366 assert buffers.remainingCapacity() > 1; // should contain at least 2 |
359 assert buffers.remainingCapacity() > 1; // should contain at least 2 |
367 DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting " |
360 DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting " |
368 + Math.max(1, buffers.remainingCapacity() - 1)); |
361 + Math.max(1, buffers.remainingCapacity() - 1)); |
369 s.request(Math.max(1, buffers.remainingCapacity() - 1)); |
362 s.request(Math.max(1, buffers.remainingCapacity() - 1)); |
370 } |
363 } |
371 } |
364 |
372 |
365 @Override |
373 @Override |
366 public void onNext(List<ByteBuffer> t) { |
374 public void onNext(List<ByteBuffer> t) { |
|
375 Objects.requireNonNull(t); |
367 try { |
376 try { |
368 DEBUG_LOGGER.log(Level.DEBUG, "next item received"); |
377 DEBUG_LOGGER.log(Level.DEBUG, "next item received"); |
369 if (!buffers.offer(t)) { |
378 if (!buffers.offer(t)) { |
370 throw new IllegalStateException("queue is full"); |
379 throw new IllegalStateException("queue is full"); |
371 } |
380 } |
381 } |
390 } |
382 |
391 |
383 @Override |
392 @Override |
384 public void onError(Throwable thrwbl) { |
393 public void onError(Throwable thrwbl) { |
385 subscription = null; |
394 subscription = null; |
386 failed = thrwbl == null ? new InternalError("illegal null Throwable") : thrwbl; |
395 failed = Objects.requireNonNull(thrwbl); |
387 // The client process that reads the input stream might |
396 // The client process that reads the input stream might |
388 // be blocked in queue.take(). |
397 // be blocked in queue.take(). |
389 // Tries to offer LAST_LIST to the queue. If the queue is |
398 // Tries to offer LAST_LIST to the queue. If the queue is |
390 // full we don't care if we can't insert this buffer, as |
399 // full we don't care if we can't insert this buffer, as |
391 // the client can't be blocked in queue.take() in that case. |
400 // the client can't be blocked in queue.take() in that case. |
472 /** |
481 /** |
473 * Currently this consumes all of the data and ignores it |
482 * Currently this consumes all of the data and ignores it |
474 */ |
483 */ |
475 static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> { |
484 static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> { |
476 |
485 |
477 final CompletableFuture<T> cf = new MinimalFuture<>(); |
486 private final CompletableFuture<T> cf = new MinimalFuture<>(); |
478 final Optional<T> result; |
487 private final Optional<T> result; |
|
488 private final AtomicBoolean subscribed = new AtomicBoolean(); |
479 |
489 |
480 NullSubscriber(Optional<T> result) { |
490 NullSubscriber(Optional<T> result) { |
481 this.result = result; |
491 this.result = result; |
482 } |
492 } |
483 |
493 |
484 @Override |
494 @Override |
485 public void onSubscribe(Flow.Subscription subscription) { |
495 public void onSubscribe(Flow.Subscription subscription) { |
486 subscription.request(Long.MAX_VALUE); |
496 if (!subscribed.compareAndSet(false, true)) { |
|
497 subscription.cancel(); |
|
498 } else { |
|
499 subscription.request(Long.MAX_VALUE); |
|
500 } |
487 } |
501 } |
488 |
502 |
489 @Override |
503 @Override |
490 public void onNext(List<ByteBuffer> items) { |
504 public void onNext(List<ByteBuffer> items) { |
491 // NO-OP |
505 Objects.requireNonNull(items); |
492 } |
506 } |
493 |
507 |
494 @Override |
508 @Override |
495 public void onError(Throwable throwable) { |
509 public void onError(Throwable throwable) { |
496 cf.completeExceptionally(throwable); |
510 cf.completeExceptionally(throwable); |