src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
branchhttp-client-branch
changeset 55847 3bac3bca4adb
parent 55803 259cf67b22ec
child 55858 cd5eeec735fb
equal deleted inserted replaced
55846:2a7e2724a422 55847:3bac3bca4adb
    37 import java.security.PrivilegedActionException;
    37 import java.security.PrivilegedActionException;
    38 import java.security.PrivilegedExceptionAction;
    38 import java.security.PrivilegedExceptionAction;
    39 import java.util.ArrayList;
    39 import java.util.ArrayList;
    40 import java.util.Iterator;
    40 import java.util.Iterator;
    41 import java.util.List;
    41 import java.util.List;
       
    42 import java.util.Objects;
    42 import java.util.Optional;
    43 import java.util.Optional;
    43 import java.util.concurrent.ArrayBlockingQueue;
    44 import java.util.concurrent.ArrayBlockingQueue;
    44 import java.util.concurrent.BlockingQueue;
    45 import java.util.concurrent.BlockingQueue;
    45 import java.util.concurrent.CompletableFuture;
    46 import java.util.concurrent.CompletableFuture;
    46 import java.util.concurrent.CompletionStage;
    47 import java.util.concurrent.CompletionStage;
    47 import java.util.concurrent.ConcurrentHashMap;
    48 import java.util.concurrent.ConcurrentHashMap;
    48 import java.util.concurrent.Flow;
    49 import java.util.concurrent.Flow;
       
    50 import java.util.concurrent.atomic.AtomicBoolean;
    49 import java.util.function.Consumer;
    51 import java.util.function.Consumer;
    50 import java.util.function.Function;
    52 import java.util.function.Function;
    51 import jdk.incubator.http.internal.common.MinimalFuture;
    53 import jdk.incubator.http.internal.common.MinimalFuture;
    52 import jdk.incubator.http.internal.common.Utils;
    54 import jdk.incubator.http.internal.common.Utils;
    53 
    55 
    55 
    57 
    56     static class ConsumerSubscriber implements HttpResponse.BodySubscriber<Void> {
    58     static class ConsumerSubscriber implements HttpResponse.BodySubscriber<Void> {
    57         private final Consumer<Optional<byte[]>> consumer;
    59         private final Consumer<Optional<byte[]>> consumer;
    58         private Flow.Subscription subscription;
    60         private Flow.Subscription subscription;
    59         private final CompletableFuture<Void> result = new MinimalFuture<>();
    61         private final CompletableFuture<Void> result = new MinimalFuture<>();
       
    62         private final AtomicBoolean subscribed = new AtomicBoolean();
    60 
    63 
    61         ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) {
    64         ConsumerSubscriber(Consumer<Optional<byte[]>> consumer) {
    62             this.consumer = consumer;
    65             this.consumer = consumer;
    63         }
    66         }
    64 
    67 
    67             return result;
    70             return result;
    68         }
    71         }
    69 
    72 
    70         @Override
    73         @Override
    71         public void onSubscribe(Flow.Subscription subscription) {
    74         public void onSubscribe(Flow.Subscription subscription) {
    72             this.subscription = subscription;
    75             if (!subscribed.compareAndSet(false, true)) {
    73             subscription.request(1);
    76                 subscription.cancel();
       
    77             } else {
       
    78                 this.subscription = subscription;
       
    79                 subscription.request(1);
       
    80             }
    74         }
    81         }
    75 
    82 
    76         @Override
    83         @Override
    77         public void onNext(List<ByteBuffer> items) {
    84         public void onNext(List<ByteBuffer> items) {
    78             for (ByteBuffer item : items) {
    85             for (ByteBuffer item : items) {
   250         private volatile Flow.Subscription subscription;
   257         private volatile Flow.Subscription subscription;
   251         private volatile boolean closed;
   258         private volatile boolean closed;
   252         private volatile Throwable failed;
   259         private volatile Throwable failed;
   253         private volatile Iterator<ByteBuffer> currentListItr;
   260         private volatile Iterator<ByteBuffer> currentListItr;
   254         private volatile ByteBuffer currentBuffer;
   261         private volatile ByteBuffer currentBuffer;
       
   262         private final AtomicBoolean subscribed = new AtomicBoolean();
   255 
   263 
   256         HttpResponseInputStream() {
   264         HttpResponseInputStream() {
   257             this(MAX_BUFFERS_IN_QUEUE);
   265             this(MAX_BUFFERS_IN_QUEUE);
   258         }
   266         }
   259 
   267 
   349             return buffer.get() & 0xFF;
   357             return buffer.get() & 0xFF;
   350         }
   358         }
   351 
   359 
   352         @Override
   360         @Override
   353         public void onSubscribe(Flow.Subscription s) {
   361         public void onSubscribe(Flow.Subscription s) {
   354             if (this.subscription != null) {
   362             if (!subscribed.compareAndSet(false, true)) {
   355                 s.cancel();
   363                 s.cancel();
   356                 return;
   364             } else {
   357             }
   365                 this.subscription = s;
   358             this.subscription = s;
   366                 assert buffers.remainingCapacity() > 1; // should contain at least 2
   359             assert buffers.remainingCapacity() > 1; // should contain at least 2
   367                 DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
   360             DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
   368                         + Math.max(1, buffers.remainingCapacity() - 1));
   361                                 +  Math.max(1, buffers.remainingCapacity() - 1));
   369                 s.request(Math.max(1, buffers.remainingCapacity() - 1));
   362             s.request(Math.max(1, buffers.remainingCapacity() - 1));
   370             }
   363         }
   371         }
   364 
   372 
   365         @Override
   373         @Override
   366         public void onNext(List<ByteBuffer> t) {
   374         public void onNext(List<ByteBuffer> t) {
       
   375             Objects.requireNonNull(t);
   367             try {
   376             try {
   368                 DEBUG_LOGGER.log(Level.DEBUG, "next item received");
   377                 DEBUG_LOGGER.log(Level.DEBUG, "next item received");
   369                 if (!buffers.offer(t)) {
   378                 if (!buffers.offer(t)) {
   370                     throw new IllegalStateException("queue is full");
   379                     throw new IllegalStateException("queue is full");
   371                 }
   380                 }
   381         }
   390         }
   382 
   391 
   383         @Override
   392         @Override
   384         public void onError(Throwable thrwbl) {
   393         public void onError(Throwable thrwbl) {
   385             subscription = null;
   394             subscription = null;
   386             failed = thrwbl == null ? new InternalError("illegal null Throwable") : thrwbl;
   395             failed = Objects.requireNonNull(thrwbl);
   387             // The client process that reads the input stream might
   396             // The client process that reads the input stream might
   388             // be blocked in queue.take().
   397             // be blocked in queue.take().
   389             // Tries to offer LAST_LIST to the queue. If the queue is
   398             // Tries to offer LAST_LIST to the queue. If the queue is
   390             // full we don't care if we can't insert this buffer, as
   399             // full we don't care if we can't insert this buffer, as
   391             // the client can't be blocked in queue.take() in that case.
   400             // the client can't be blocked in queue.take() in that case.
   472     /**
   481     /**
   473      * Currently this consumes all of the data and ignores it
   482      * Currently this consumes all of the data and ignores it
   474      */
   483      */
   475     static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> {
   484     static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> {
   476 
   485 
   477         final CompletableFuture<T> cf = new MinimalFuture<>();
   486         private final CompletableFuture<T> cf = new MinimalFuture<>();
   478         final Optional<T> result;
   487         private final Optional<T> result;
       
   488         private final AtomicBoolean subscribed = new AtomicBoolean();
   479 
   489 
   480         NullSubscriber(Optional<T> result) {
   490         NullSubscriber(Optional<T> result) {
   481             this.result = result;
   491             this.result = result;
   482         }
   492         }
   483 
   493 
   484         @Override
   494         @Override
   485         public void onSubscribe(Flow.Subscription subscription) {
   495         public void onSubscribe(Flow.Subscription subscription) {
   486             subscription.request(Long.MAX_VALUE);
   496             if (!subscribed.compareAndSet(false, true)) {
       
   497                 subscription.cancel();
       
   498             } else {
       
   499                 subscription.request(Long.MAX_VALUE);
       
   500             }
   487         }
   501         }
   488 
   502 
   489         @Override
   503         @Override
   490         public void onNext(List<ByteBuffer> items) {
   504         public void onNext(List<ByteBuffer> items) {
   491             // NO-OP
   505             Objects.requireNonNull(items);
   492         }
   506         }
   493 
   507 
   494         @Override
   508         @Override
   495         public void onError(Throwable throwable) {
   509         public void onError(Throwable throwable) {
   496             cf.completeExceptionally(throwable);
   510             cf.completeExceptionally(throwable);