src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
branchhttp-client-branch
changeset 56010 782b2f2d1e76
parent 56009 cf8792f51dee
child 56035 2f3f5da13c4c
equal deleted inserted replaced
56009:cf8792f51dee 56010:782b2f2d1e76
   516         static HttpLineStream create(Charset charset) {
   516         static HttpLineStream create(Charset charset) {
   517             return new HttpLineStream(Optional.ofNullable(charset).orElse(StandardCharsets.UTF_8));
   517             return new HttpLineStream(Optional.ofNullable(charset).orElse(StandardCharsets.UTF_8));
   518         }
   518         }
   519     }
   519     }
   520 
   520 
   521     static class MultiSubscriberImpl<V>
       
   522         implements HttpResponse.MultiSubscriber<MultiMapResult<V>,V>
       
   523     {
       
   524         private final MultiMapResult<V> results;
       
   525         private final Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler;
       
   526         private final Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler;
       
   527         private final boolean completion; // aggregate completes on last PP received or overall completion
       
   528 
       
   529         MultiSubscriberImpl(
       
   530                 Function<HttpRequest,HttpResponse.BodyHandler<V>> requestHandler,
       
   531                 Function<HttpRequest,Optional<HttpResponse.BodyHandler<V>>> pushHandler, boolean completion) {
       
   532             this.results = new MultiMapResult<>(new ConcurrentHashMap<>());
       
   533             this.requestHandler = requestHandler;
       
   534             this.pushHandler = pushHandler;
       
   535             this.completion = completion;
       
   536         }
       
   537 
       
   538         @Override
       
   539         public HttpResponse.BodyHandler<V> onRequest(HttpRequest request) {
       
   540             CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture();
       
   541             results.put(request, cf);
       
   542             return requestHandler.apply(request);
       
   543         }
       
   544 
       
   545         @Override
       
   546         public Optional<HttpResponse.BodyHandler<V>> onPushPromise(HttpRequest push) {
       
   547             CompletableFuture<HttpResponse<V>> cf = MinimalFuture.newMinimalFuture();
       
   548             results.put(push, cf);
       
   549             return pushHandler.apply(push);
       
   550         }
       
   551 
       
   552         @Override
       
   553         public void onResponse(HttpResponse<V> response) {
       
   554             CompletableFuture<HttpResponse<V>> cf = results.get(response.request());
       
   555             cf.complete(response);
       
   556         }
       
   557 
       
   558         @Override
       
   559         public void onError(HttpRequest request, Throwable t) {
       
   560             CompletableFuture<HttpResponse<V>> cf = results.get(request);
       
   561             cf.completeExceptionally(t);
       
   562         }
       
   563 
       
   564         @Override
       
   565         public CompletableFuture<MultiMapResult<V>> completion(
       
   566                 CompletableFuture<Void> onComplete, CompletableFuture<Void> onFinalPushPromise) {
       
   567             if (completion)
       
   568                 return onComplete.thenApply((ignored)-> results);
       
   569             else
       
   570                 return onFinalPushPromise.thenApply((ignored) -> results);
       
   571         }
       
   572     }
       
   573 
       
   574     /**
   521     /**
   575      * Currently this consumes all of the data and ignores it
   522      * Currently this consumes all of the data and ignores it
   576      */
   523      */
   577     static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> {
   524     static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> {
   578 
   525