src/java.net.http/share/classes/java/net/http/internal/BufferingSubscriber.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56091 aedd6133e7a0
child 56093 22d94c4a3641
equal deleted inserted replaced
56091:aedd6133e7a0 56092:fd85b2bf2b0d
     1 /*
       
     2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package java.net.http.internal;
       
    27 
       
    28 import java.nio.ByteBuffer;
       
    29 import java.util.ArrayList;
       
    30 import java.util.Collections;
       
    31 import java.util.List;
       
    32 import java.util.ListIterator;
       
    33 import java.util.Objects;
       
    34 import java.util.concurrent.CompletionStage;
       
    35 import java.util.concurrent.Flow;
       
    36 import java.util.concurrent.atomic.AtomicBoolean;
       
    37 import java.net.http.HttpResponse.BodySubscriber;
       
    38 import java.net.http.internal.common.Demand;
       
    39 import java.net.http.internal.common.SequentialScheduler;
       
    40 import java.net.http.internal.common.Utils;
       
    41 
       
    42 /**
       
    43  * A buffering BodySubscriber. When subscribed, accumulates ( buffers ) a given
       
    44  * amount ( in bytes ) of a publisher's data before pushing it to a downstream
       
    45  * subscriber.
       
    46  */
       
    47 public class BufferingSubscriber<T> implements BodySubscriber<T>
       
    48 {
       
    49     /** The downstream consumer of the data. */
       
    50     private final BodySubscriber<T> downstreamSubscriber;
       
    51     /** The amount of data to be accumulate before pushing downstream. */
       
    52     private final int bufferSize;
       
    53 
       
    54     /** The subscription, created lazily. */
       
    55     private volatile Flow.Subscription subscription;
       
    56     /** The downstream subscription, created lazily. */
       
    57     private volatile DownstreamSubscription downstreamSubscription;
       
    58 
       
    59     /** Must be held when accessing the internal buffers. */
       
    60     private final Object buffersLock = new Object();
       
    61     /** The internal buffers holding the buffered data. */
       
    62     private ArrayList<ByteBuffer> internalBuffers;
       
    63     /** The actual accumulated remaining bytes in internalBuffers. */
       
    64     private int accumulatedBytes;
       
    65 
       
    66     /** Holds the Throwable from upstream's onError. */
       
    67     private volatile Throwable throwable;
       
    68 
       
    69     /** State of the buffering subscriber:
       
    70      *  1) [UNSUBSCRIBED] when initially created
       
    71      *  2) [ACTIVE] when subscribed and can receive data
       
    72      *  3) [ERROR | CANCELLED | COMPLETE] (terminal state)
       
    73      */
       
    74     static final int UNSUBSCRIBED = 0x01;
       
    75     static final int ACTIVE       = 0x02;
       
    76     static final int ERROR        = 0x04;
       
    77     static final int CANCELLED    = 0x08;
       
    78     static final int COMPLETE     = 0x10;
       
    79 
       
    80     private volatile int state;
       
    81 
       
    82     public BufferingSubscriber(BodySubscriber<T> downstreamSubscriber,
       
    83                                int bufferSize) {
       
    84         this.downstreamSubscriber = Objects.requireNonNull(downstreamSubscriber);
       
    85         this.bufferSize = bufferSize;
       
    86         synchronized (buffersLock) {
       
    87             internalBuffers = new ArrayList<>();
       
    88         }
       
    89         state = UNSUBSCRIBED;
       
    90     }
       
    91 
       
    92     /** Returns the number of bytes remaining in the given buffers. */
       
    93     private static final long remaining(List<ByteBuffer> buffers) {
       
    94         return buffers.stream().mapToLong(ByteBuffer::remaining).sum();
       
    95     }
       
    96 
       
    97     /**
       
    98      * Tells whether, or not, there is at least a sufficient number of bytes
       
    99      * accumulated in the internal buffers. If the subscriber is COMPLETE, and
       
   100      * has some buffered data, then there is always enough ( to pass downstream ).
       
   101      */
       
   102     private final boolean hasEnoughAccumulatedBytes() {
       
   103         assert Thread.holdsLock(buffersLock);
       
   104         return accumulatedBytes >= bufferSize
       
   105                 || (state == COMPLETE && accumulatedBytes > 0);
       
   106     }
       
   107 
       
   108     /**
       
   109      * Returns a new, unmodifiable, List<ByteBuffer> containing exactly the
       
   110      * amount of data as required before pushing downstream. The amount of data
       
   111      * may be less than required ( bufferSize ), in the case where the subscriber
       
   112      * is COMPLETE.
       
   113      */
       
   114     private List<ByteBuffer> fromInternalBuffers() {
       
   115         assert Thread.holdsLock(buffersLock);
       
   116         int leftToFill = bufferSize;
       
   117         int state = this.state;
       
   118         assert (state == ACTIVE || state == CANCELLED)
       
   119                 ? accumulatedBytes >= leftToFill : true;
       
   120         List<ByteBuffer> dsts = new ArrayList<>();
       
   121 
       
   122         ListIterator<ByteBuffer> itr = internalBuffers.listIterator();
       
   123         while (itr.hasNext()) {
       
   124             ByteBuffer b = itr.next();
       
   125             if (b.remaining() <= leftToFill) {
       
   126                 itr.remove();
       
   127                 if (b.position() != 0)
       
   128                     b = b.slice();  // ensure position = 0 when propagated
       
   129                 dsts.add(b);
       
   130                 leftToFill -= b.remaining();
       
   131                 accumulatedBytes -= b.remaining();
       
   132                 if (leftToFill == 0)
       
   133                     break;
       
   134             } else {
       
   135                 int prevLimit = b.limit();
       
   136                 b.limit(b.position() + leftToFill);
       
   137                 ByteBuffer slice = b.slice();
       
   138                 dsts.add(slice);
       
   139                 b.limit(prevLimit);
       
   140                 b.position(b.position() + leftToFill);
       
   141                 accumulatedBytes -= leftToFill;
       
   142                 leftToFill = 0;
       
   143                 break;
       
   144             }
       
   145         }
       
   146         assert (state == ACTIVE || state == CANCELLED)
       
   147                 ? leftToFill == 0 : state == COMPLETE;
       
   148         assert (state == ACTIVE || state == CANCELLED)
       
   149                 ? remaining(dsts) == bufferSize : state == COMPLETE;
       
   150         assert accumulatedBytes >= 0;
       
   151         assert dsts.stream().noneMatch(b -> b.position() != 0);
       
   152         return Collections.unmodifiableList(dsts);
       
   153     }
       
   154 
       
   155     /** Subscription that is passed to the downstream subscriber. */
       
   156     private class DownstreamSubscription implements Flow.Subscription {
       
   157         private final AtomicBoolean cancelled = new AtomicBoolean(); // false
       
   158         private final Demand demand = new Demand();
       
   159         private volatile boolean illegalArg;
       
   160 
       
   161         @Override
       
   162         public void request(long n) {
       
   163             if (cancelled.get() || illegalArg) {
       
   164                 return;
       
   165             }
       
   166             if (n <= 0L) {
       
   167                 // pass the "bad" value upstream so the Publisher can deal with
       
   168                 // it appropriately, i.e. invoke onError
       
   169                 illegalArg = true;
       
   170                 subscription.request(n);
       
   171                 return;
       
   172             }
       
   173 
       
   174             demand.increase(n);
       
   175 
       
   176             pushDemanded();
       
   177         }
       
   178 
       
   179         private final SequentialScheduler pushDemandedScheduler =
       
   180                 new SequentialScheduler(new PushDemandedTask());
       
   181 
       
   182         void pushDemanded() {
       
   183             if (cancelled.get())
       
   184                 return;
       
   185             pushDemandedScheduler.runOrSchedule();
       
   186         }
       
   187 
       
   188         class PushDemandedTask extends SequentialScheduler.CompleteRestartableTask {
       
   189             @Override
       
   190             public void run() {
       
   191                 try {
       
   192                     Throwable t = throwable;
       
   193                     if (t != null) {
       
   194                         pushDemandedScheduler.stop(); // stop the demand scheduler
       
   195                         downstreamSubscriber.onError(t);
       
   196                         return;
       
   197                     }
       
   198 
       
   199                     while (true) {
       
   200                         List<ByteBuffer> item;
       
   201                         synchronized (buffersLock) {
       
   202                             if (cancelled.get())
       
   203                                 return;
       
   204                             if (!hasEnoughAccumulatedBytes())
       
   205                                 break;
       
   206                             if (!demand.tryDecrement())
       
   207                                 break;
       
   208                             item = fromInternalBuffers();
       
   209                         }
       
   210                         assert item != null;
       
   211 
       
   212                         downstreamSubscriber.onNext(item);
       
   213                     }
       
   214                     if (cancelled.get())
       
   215                         return;
       
   216 
       
   217                     // complete only if all data consumed
       
   218                     boolean complete;
       
   219                     synchronized (buffersLock) {
       
   220                         complete = state == COMPLETE && internalBuffers.isEmpty();
       
   221                     }
       
   222                     if (complete) {
       
   223                         assert internalBuffers.isEmpty();
       
   224                         pushDemandedScheduler.stop(); // stop the demand scheduler
       
   225                         downstreamSubscriber.onComplete();
       
   226                         return;
       
   227                     }
       
   228                 } catch (Throwable t) {
       
   229                     cancel();  // cancel if there is any error
       
   230                     throw t;
       
   231                 }
       
   232 
       
   233                 boolean requestMore = false;
       
   234                 synchronized (buffersLock) {
       
   235                     if (!hasEnoughAccumulatedBytes() && !demand.isFulfilled()) {
       
   236                         // request more upstream data
       
   237                         requestMore = true;
       
   238                     }
       
   239                 }
       
   240                 if (requestMore)
       
   241                     subscription.request(1);
       
   242             }
       
   243         }
       
   244 
       
   245         @Override
       
   246         public void cancel() {
       
   247             if (cancelled.compareAndExchange(false, true))
       
   248                 return;  // already cancelled
       
   249 
       
   250             state = CANCELLED;  // set CANCELLED state of upstream subscriber
       
   251             subscription.cancel();  // cancel upstream subscription
       
   252             pushDemandedScheduler.stop(); // stop the demand scheduler
       
   253         }
       
   254     }
       
   255 
       
   256     @Override
       
   257     public void onSubscribe(Flow.Subscription subscription) {
       
   258         Objects.requireNonNull(subscription);
       
   259         if (this.subscription != null) {
       
   260             subscription.cancel();
       
   261             return;
       
   262         }
       
   263 
       
   264         int s = this.state;
       
   265         assert s == UNSUBSCRIBED;
       
   266         state = ACTIVE;
       
   267         this.subscription = subscription;
       
   268         downstreamSubscription = new DownstreamSubscription();
       
   269         downstreamSubscriber.onSubscribe(downstreamSubscription);
       
   270     }
       
   271 
       
   272     @Override
       
   273     public void onNext(List<ByteBuffer> item) {
       
   274         Objects.requireNonNull(item);
       
   275 
       
   276         int s = state;
       
   277         if (s == CANCELLED)
       
   278             return;
       
   279 
       
   280         if (s != ACTIVE)
       
   281             throw new InternalError("onNext on inactive subscriber");
       
   282 
       
   283         synchronized (buffersLock) {
       
   284             internalBuffers.addAll(item);
       
   285             accumulatedBytes += remaining(item);
       
   286         }
       
   287 
       
   288         downstreamSubscription.pushDemanded();
       
   289     }
       
   290 
       
   291     @Override
       
   292     public void onError(Throwable incomingThrowable) {
       
   293         Objects.requireNonNull(incomingThrowable);
       
   294         int s = state;
       
   295         assert s == ACTIVE : "Expected ACTIVE, got:" + s;
       
   296         state = ERROR;
       
   297         Throwable t = this.throwable;
       
   298         assert t == null : "Expected null, got:" + t;
       
   299         this.throwable = incomingThrowable;
       
   300         downstreamSubscription.pushDemanded();
       
   301     }
       
   302 
       
   303     @Override
       
   304     public void onComplete() {
       
   305         int s = state;
       
   306         assert s == ACTIVE : "Expected ACTIVE, got:" + s;
       
   307         state = COMPLETE;
       
   308         downstreamSubscription.pushDemanded();
       
   309     }
       
   310 
       
   311     @Override
       
   312     public CompletionStage<T> getBody() {
       
   313         return downstreamSubscriber.getBody();
       
   314     }
       
   315 }