516 static HttpLineStream create(Charset charset) { |
516 static HttpLineStream create(Charset charset) { |
517 return new HttpLineStream(Optional.ofNullable(charset).orElse(StandardCharsets.UTF_8)); |
517 return new HttpLineStream(Optional.ofNullable(charset).orElse(StandardCharsets.UTF_8)); |
518 } |
518 } |
519 } |
519 } |
520 |
520 |
521 static class MultiSubscriberImpl<V> |
|
522 implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V> |
|
523 { |
|
524 private final MultiMapResult<V> results; |
|
525 private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler; |
|
526 private final Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler; |
|
527 private final boolean completion; // aggregate completes on last PP received or overall completion |
|
528 |
|
529 MultiSubscriberImpl( |
|
530 Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler, |
|
531 Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler, boolean completion) { |
|
532 this.results = new MultiMapResult<>(new ConcurrentHashMap<>()); |
|
533 this.requestHandler = requestHandler; |
|
534 this.pushHandler = pushHandler; |
|
535 this.completion = completion; |
|
536 } |
|
537 |
|
538 @Override |
|
539 public HttpResponse.BodyHandler<V> onRequest(HttpRequest request) { |
|
540 CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture(); |
|
541 results.put(request, cf); |
|
542 return requestHandler.apply(request); |
|
543 } |
|
544 |
|
545 @Override |
|
546 public Optional<HttpResponse.BodyHandler<V>> onPushPromise(HttpRequest push) { |
|
547 CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture(); |
|
548 results.put(push, cf); |
|
549 return pushHandler.apply(push); |
|
550 } |
|
551 |
|
552 @Override |
|
553 public void onResponse(HttpResponse<V> response) { |
|
554 CompletableFuture<HttpResponse<V>> cf = results.get(response.request()); |
|
555 cf.complete(response); |
|
556 } |
|
557 |
|
558 @Override |
|
559 public void onError(HttpRequest request, Throwable t) { |
|
560 CompletableFuture<HttpResponse<V>> cf = results.get(request); |
|
561 cf.completeExceptionally(t); |
|
562 } |
|
563 |
|
564 @Override |
|
565 public CompletableFuture<MultiMapResult<V>> completion( |
|
566 CompletableFuture<Void> onComplete, CompletableFuture<Void> onFinalPushPromise) { |
|
567 if (completion) |
|
568 return onComplete.thenApply((ignored)-> results); |
|
569 else |
|
570 return onFinalPushPromise.thenApply((ignored) -> results); |
|
571 } |
|
572 } |
|
573 |
|
574 /** |
521 /** |
575 * Currently this consumes all of the data and ignores it |
522 * Currently this consumes all of the data and ignores it |
576 */ |
523 */ |
577 static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> { |
524 static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> { |
578 |
525 |