src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56089 42208b2f224e
child 56451 9585061fdb04
equal deleted inserted replaced
56091:aedd6133e7a0 56092:fd85b2bf2b0d
       
     1 /*
       
     2  * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.internal.net.http;
       
    27 
       
    28 import java.util.Iterator;
       
    29 import java.util.concurrent.Flow;
       
    30 import jdk.internal.net.http.common.Demand;
       
    31 import jdk.internal.net.http.common.SequentialScheduler;
       
    32 
       
    33 /**
       
    34  * A Publisher that publishes items obtained from the given Iterable. Each new
       
    35  * subscription gets a new Iterator.
       
    36  */
       
    37 class PullPublisher<T> implements Flow.Publisher<T> {
       
    38 
       
    39     // Only one of `iterable` and `throwable` can be non-null. throwable is
       
    40     // non-null when an error has been encountered, by the creator of
       
    41     // PullPublisher, while subscribing the subscriber, but before subscribe has
       
    42     // completed.
       
    43     private final Iterable<T> iterable;
       
    44     private final Throwable throwable;
       
    45 
       
    46     PullPublisher(Iterable<T> iterable, Throwable throwable) {
       
    47         this.iterable = iterable;
       
    48         this.throwable = throwable;
       
    49     }
       
    50 
       
    51     PullPublisher(Iterable<T> iterable) {
       
    52         this(iterable, null);
       
    53     }
       
    54 
       
    55     @Override
       
    56     public void subscribe(Flow.Subscriber<? super T> subscriber) {
       
    57         Subscription sub;
       
    58         if (throwable != null) {
       
    59             assert iterable == null : "non-null iterable: " + iterable;
       
    60             sub = new Subscription(subscriber, null, throwable);
       
    61         } else {
       
    62             assert throwable == null : "non-null exception: " + throwable;
       
    63             sub = new Subscription(subscriber, iterable.iterator(), null);
       
    64         }
       
    65         subscriber.onSubscribe(sub);
       
    66 
       
    67         if (throwable != null) {
       
    68             sub.pullScheduler.runOrSchedule();
       
    69         }
       
    70     }
       
    71 
       
    72     private class Subscription implements Flow.Subscription {
       
    73 
       
    74         private final Flow.Subscriber<? super T> subscriber;
       
    75         private final Iterator<T> iter;
       
    76         private volatile boolean completed;
       
    77         private volatile boolean cancelled;
       
    78         private volatile Throwable error;
       
    79         final SequentialScheduler pullScheduler = new SequentialScheduler(new PullTask());
       
    80         private final Demand demand = new Demand();
       
    81 
       
    82         Subscription(Flow.Subscriber<? super T> subscriber,
       
    83                      Iterator<T> iter,
       
    84                      Throwable throwable) {
       
    85             this.subscriber = subscriber;
       
    86             this.iter = iter;
       
    87             this.error = throwable;
       
    88         }
       
    89 
       
    90         final class PullTask extends SequentialScheduler.CompleteRestartableTask {
       
    91             @Override
       
    92             protected void run() {
       
    93                 if (completed || cancelled) {
       
    94                     return;
       
    95                 }
       
    96 
       
    97                 Throwable t = error;
       
    98                 if (t != null) {
       
    99                     completed = true;
       
   100                     pullScheduler.stop();
       
   101                     subscriber.onError(t);
       
   102                     return;
       
   103                 }
       
   104 
       
   105                 while (demand.tryDecrement() && !cancelled) {
       
   106                     if (!iter.hasNext()) {
       
   107                         break;
       
   108                     } else {
       
   109                         subscriber.onNext(iter.next());
       
   110                     }
       
   111                 }
       
   112                 if (!iter.hasNext() && !cancelled) {
       
   113                     completed = true;
       
   114                     pullScheduler.stop();
       
   115                     subscriber.onComplete();
       
   116                 }
       
   117             }
       
   118         }
       
   119 
       
   120         @Override
       
   121         public void request(long n) {
       
   122             if (cancelled)
       
   123                 return;  // no-op
       
   124 
       
   125             if (n <= 0) {
       
   126                 error = new IllegalArgumentException("illegal non-positive request:" + n);
       
   127             } else {
       
   128                 demand.increase(n);
       
   129             }
       
   130             pullScheduler.runOrSchedule();
       
   131         }
       
   132 
       
   133         @Override
       
   134         public void cancel() {
       
   135             cancelled = true;
       
   136         }
       
   137     }
       
   138 }