src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/BufferingSubscriber.java
branchhttp-client-branch
changeset 55763 634d8e14c172
child 55859 4ca3e578b9c4
equal deleted inserted replaced
55762:e947a3a50a95 55763:634d8e14c172
       
     1 /*
       
     2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http;
       
    27 
       
    28 import java.nio.ByteBuffer;
       
    29 import java.util.ArrayList;
       
    30 import java.util.Collections;
       
    31 import java.util.List;
       
    32 import java.util.ListIterator;
       
    33 import java.util.Objects;
       
    34 import java.util.concurrent.CompletionStage;
       
    35 import java.util.concurrent.Flow;
       
    36 import java.util.concurrent.atomic.AtomicBoolean;
       
    37 import jdk.incubator.http.internal.common.Demand;
       
    38 import jdk.incubator.http.internal.common.SequentialScheduler;
       
    39 import jdk.incubator.http.internal.common.Utils;
       
    40 
       
    41 /**
       
    42  * A buffering BodySubscriber. When subscribed, accumulates ( buffers ) a given
       
    43  * amount ( in bytes ) of a publisher's data before pushing it to a downstream
       
    44  * subscriber.
       
    45  */
       
    46 class BufferingSubscriber<T> implements HttpResponse.BodySubscriber<T>
       
    47 {
       
    48     /** The downstream consumer of the data. */
       
    49     private final HttpResponse.BodySubscriber<T> downstreamSubscriber;
       
    50     /** The amount of data to be accumulate before pushing downstream. */
       
    51     private final int bufferSize;
       
    52 
       
    53     /** The subscription, created lazily. */
       
    54     private volatile Flow.Subscription subscription;
       
    55     /** The downstream subscription, created lazily. */
       
    56     private volatile DownstreamSubscription downstreamSubscription;
       
    57 
       
    58     /** Must be held when accessing the internal buffers. */
       
    59     private final Object buffersLock = new Object();
       
    60     /** The internal buffers holding the buffered data. */
       
    61     private ArrayList<ByteBuffer> internalBuffers;
       
    62     /** The actual accumulated remaining bytes in internalBuffers. */
       
    63     private int accumulatedBytes;
       
    64 
       
    65     /** State of the buffering subscriber:
       
    66      *  1) [UNSUBSCRIBED] when initially created
       
    67      *  2) [ACTIVE] when subscribed and can receive data
       
    68      *  3) [ERROR | CANCELLED | COMPLETE] (terminal state)
       
    69      */
       
    70     static final int UNSUBSCRIBED = 0x01;
       
    71     static final int ACTIVE       = 0x02;
       
    72     static final int ERROR        = 0x04;
       
    73     static final int CANCELLED    = 0x08;
       
    74     static final int COMPLETE     = 0x10;
       
    75 
       
    76     private volatile int state;
       
    77 
       
    78     BufferingSubscriber(HttpResponse.BodySubscriber<T> downstreamSubscriber,
       
    79                         int bufferSize) {
       
    80         this.downstreamSubscriber = downstreamSubscriber;
       
    81         this.bufferSize = bufferSize;
       
    82         synchronized (buffersLock) {
       
    83             internalBuffers = new ArrayList<>();
       
    84         }
       
    85         state = UNSUBSCRIBED;
       
    86     }
       
    87 
       
    88     /** Returns the number of bytes remaining in the given buffers. */
       
    89     private static final int remaining(List<ByteBuffer> buffers) {
       
    90         return buffers.stream().mapToInt(ByteBuffer::remaining).sum();
       
    91     }
       
    92 
       
    93     /**
       
    94      * Tells whether, or not, there is at least a sufficient number of bytes
       
    95      * accumulated in the internal buffers. If the subscriber is COMPLETE, and
       
    96      * has some buffered data, then there is always enough ( to pass downstream ).
       
    97      */
       
    98     private final boolean hasEnoughAccumulatedBytes() {
       
    99         assert Thread.holdsLock(buffersLock);
       
   100         return accumulatedBytes >= bufferSize
       
   101                 || (state == COMPLETE && accumulatedBytes > 0);
       
   102     }
       
   103 
       
   104     /**
       
   105      * Returns a new, unmodifiable, List<ByteBuffer> containing exactly the
       
   106      * amount of data as required before pushing downstream. The amount of data
       
   107      * may be less than required ( bufferSize ), in the case where the subscriber
       
   108      * is COMPLETE.
       
   109      */
       
   110     private List<ByteBuffer> fromInternalBuffers() {
       
   111         assert Thread.holdsLock(buffersLock);
       
   112         int leftToFill = bufferSize;
       
   113         int state = this.state;
       
   114         assert (state == ACTIVE || state == CANCELLED)
       
   115                 ? accumulatedBytes >= leftToFill : true;
       
   116         List<ByteBuffer> dsts = new ArrayList<>();
       
   117 
       
   118         ListIterator<ByteBuffer> itr = internalBuffers.listIterator();
       
   119         while (itr.hasNext()) {
       
   120             ByteBuffer b = itr.next();
       
   121             if (b.remaining() <= leftToFill) {
       
   122                 itr.remove();
       
   123                 if (b.position() != 0)
       
   124                     b = b.slice();  // ensure position = 0 when propagated
       
   125                 dsts.add(b);
       
   126                 leftToFill -= b.remaining();
       
   127                 accumulatedBytes -= b.remaining();
       
   128                 if (leftToFill == 0)
       
   129                     break;
       
   130             } else {
       
   131                 int prevLimit = b.limit();
       
   132                 b.limit(b.position() + leftToFill);
       
   133                 ByteBuffer slice = b.slice();
       
   134                 dsts.add(slice);
       
   135                 b.limit(prevLimit);
       
   136                 b.position(b.position() + leftToFill);
       
   137                 accumulatedBytes -= leftToFill;
       
   138                 leftToFill = 0;
       
   139                 break;
       
   140             }
       
   141         }
       
   142         assert (state == ACTIVE || state == CANCELLED)
       
   143                 ? leftToFill == 0 : state == COMPLETE;
       
   144         assert (state == ACTIVE || state == CANCELLED)
       
   145                 ? remaining(dsts) == bufferSize : state == COMPLETE;
       
   146         assert accumulatedBytes >= 0;
       
   147         assert dsts.stream().noneMatch(b -> b.position() != 0);
       
   148         return Collections.unmodifiableList(dsts);
       
   149     }
       
   150 
       
   151     /** Subscription that is passed to the downstream subscriber. */
       
   152     private class DownstreamSubscription implements Flow.Subscription {
       
   153         private final AtomicBoolean cancelled = new AtomicBoolean(); // false
       
   154         private final Demand demand = new Demand();
       
   155 
       
   156         @Override
       
   157         public void request(long n) {
       
   158             if (n <= 0L) {
       
   159                 onError(new IllegalArgumentException(
       
   160                         "non-positive subscription request"));
       
   161                 return;
       
   162             }
       
   163             if (cancelled.get())
       
   164                 return;
       
   165 
       
   166             demand.increase(n);
       
   167 
       
   168             pushDemanded();
       
   169         }
       
   170 
       
   171         private final SequentialScheduler pushDemandedScheduler =
       
   172                 new SequentialScheduler(new PushDemandedTask());
       
   173 
       
   174         void pushDemanded() {
       
   175             if (cancelled.get())
       
   176                 return;
       
   177             pushDemandedScheduler.runOrSchedule();
       
   178         }
       
   179 
       
   180         class PushDemandedTask extends SequentialScheduler.CompleteRestartableTask {
       
   181             @Override
       
   182             public void run() {
       
   183                 try {
       
   184                     while (true) {
       
   185                         List<ByteBuffer> item;
       
   186                         synchronized (buffersLock) {
       
   187                             if (cancelled.get())
       
   188                                 return;
       
   189                             if (!hasEnoughAccumulatedBytes())
       
   190                                 break;
       
   191                             if (!demand.tryDecrement())
       
   192                                 break;
       
   193                             item = fromInternalBuffers();
       
   194                         }
       
   195                         assert item != null;
       
   196 
       
   197                         downstreamSubscriber.onNext(item);
       
   198                     }
       
   199                     if (cancelled.get())
       
   200                         return;
       
   201 
       
   202                     // complete only if all data consumed
       
   203                     boolean complete;
       
   204                     synchronized (buffersLock) {
       
   205                         complete = state == COMPLETE && internalBuffers.isEmpty();
       
   206                     }
       
   207                     if (complete) {
       
   208                         downstreamSubscriber.onComplete();
       
   209                         return;
       
   210                     }
       
   211                 } catch (Throwable t) {
       
   212                     cancel();  // cancel if there is any find of error
       
   213                     throw t;
       
   214                 }
       
   215 
       
   216                 boolean requestMore = false;
       
   217                 synchronized (buffersLock) {
       
   218                     if (!hasEnoughAccumulatedBytes() && !demand.isFulfilled()) {
       
   219                         // request more upstream data
       
   220                         requestMore = true;
       
   221                     }
       
   222                 }
       
   223                 if (requestMore)
       
   224                     subscription.request(1);
       
   225             }
       
   226         }
       
   227 
       
   228         @Override
       
   229         public void cancel() {
       
   230             if (cancelled.compareAndExchange(false, true))
       
   231                 return;  // already cancelled
       
   232 
       
   233             state = CANCELLED;  // set CANCELLED state of upstream subscriber
       
   234             subscription.cancel();  // cancel upstream subscription
       
   235             pushDemandedScheduler.stop(); // stop the demand scheduler
       
   236         }
       
   237     }
       
   238 
       
   239     @Override
       
   240     public void onSubscribe(Flow.Subscription subscription) {
       
   241         Objects.requireNonNull(subscription);
       
   242         if (this.subscription != null) {
       
   243             subscription.cancel();
       
   244             return;
       
   245         }
       
   246 
       
   247         int s = this.state;
       
   248         assert s == UNSUBSCRIBED;
       
   249         state = ACTIVE;
       
   250         this.subscription = subscription;
       
   251         downstreamSubscription = new DownstreamSubscription();
       
   252         downstreamSubscriber.onSubscribe(downstreamSubscription);
       
   253     }
       
   254 
       
   255     @Override
       
   256     public void onNext(List<ByteBuffer> item) {
       
   257         Objects.requireNonNull(item);
       
   258 
       
   259         int s = state;
       
   260         if (s == CANCELLED)
       
   261             return;
       
   262 
       
   263         if (s != ACTIVE)
       
   264             throw new InternalError("onNext on inactive subscriber");
       
   265 
       
   266         synchronized (buffersLock) {
       
   267             accumulatedBytes += Utils.accumulateBuffers(internalBuffers, item);
       
   268         }
       
   269 
       
   270         downstreamSubscription.pushDemanded();
       
   271     }
       
   272 
       
   273     @Override
       
   274     public void onError(Throwable throwable) {
       
   275         Objects.requireNonNull(throwable);
       
   276         int s = state;
       
   277         assert s == ACTIVE : "Expected ACTIVE, got:" + s;
       
   278         state = ERROR;
       
   279         downstreamSubscriber.onError(throwable);
       
   280     }
       
   281 
       
   282     @Override
       
   283     public void onComplete() {
       
   284         int s = state;
       
   285         assert s == ACTIVE : "Expected ACTIVE, got:" + s;
       
   286         state = COMPLETE;
       
   287         downstreamSubscription.pushDemanded();
       
   288     }
       
   289 
       
   290     @Override
       
   291     public CompletionStage<T> getBody() {
       
   292         return downstreamSubscriber.getBody();
       
   293     }
       
   294 }