src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PullPublisher.java
branchhttp-client-branch
changeset 55763 634d8e14c172
parent 47216 71c04702a3d5
child 55777 e62cbcc08cae
equal deleted inserted replaced
55762:e947a3a50a95 55763:634d8e14c172
     1 /*
     1 /*
     2  * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
     2  * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.  Oracle designates this
     7  * published by the Free Software Foundation.  Oracle designates this
    25 
    25 
    26 package jdk.incubator.http;
    26 package jdk.incubator.http;
    27 
    27 
    28 import java.util.Iterator;
    28 import java.util.Iterator;
    29 import java.util.concurrent.Flow;
    29 import java.util.concurrent.Flow;
       
    30 import jdk.incubator.http.internal.common.Demand;
       
    31 import jdk.incubator.http.internal.common.SequentialScheduler;
    30 
    32 
    31 /**
    33 /**
    32  * A Publisher that is expected to run in same thread as subscriber.
    34  * A Publisher that publishes items obtained from the given Iterable. Each new
    33  * Items are obtained from Iterable. Each new subscription gets a new Iterator.
    35  * subscription gets a new Iterator.
    34  */
    36  */
    35 class PullPublisher<T> implements Flow.Publisher<T> {
    37 class PullPublisher<T> implements Flow.Publisher<T> {
    36 
    38 
    37     private final Iterable<T> iterable;
    39     private final Iterable<T> iterable;
    38 
    40 
    47 
    49 
    48     private class Subscription implements Flow.Subscription {
    50     private class Subscription implements Flow.Subscription {
    49 
    51 
    50         private final Flow.Subscriber<? super T> subscriber;
    52         private final Flow.Subscriber<? super T> subscriber;
    51         private final Iterator<T> iter;
    53         private final Iterator<T> iter;
    52         private boolean done = false;
    54         private volatile boolean completed;
    53         private long demand = 0;
    55         private volatile boolean cancelled;
    54         private int recursion = 0;
    56         private volatile Throwable error;
       
    57         final SequentialScheduler pullScheduler = new SequentialScheduler(new PullTask());
       
    58         private final Demand demand = new Demand();
    55 
    59 
    56         Subscription(Flow.Subscriber<? super T> subscriber, Iterator<T> iter) {
    60         Subscription(Flow.Subscriber<? super T> subscriber, Iterator<T> iter) {
    57             this.subscriber = subscriber;
    61             this.subscriber = subscriber;
    58             this.iter = iter;
    62             this.iter = iter;
    59         }
    63         }
    60 
    64 
    61         @Override
    65         final class PullTask extends SequentialScheduler.CompleteRestartableTask {
    62         public void request(long n) {
    66             @Override
    63             if (done) {
    67             protected void run() {
    64                 subscriber.onError(new IllegalArgumentException("request(" + n + ")"));
    68                 if (completed) {
    65             }
    69                     pullScheduler.stop();
    66             demand += n;
       
    67             recursion ++;
       
    68             if (recursion > 1) {
       
    69                 return;
       
    70             }
       
    71             while (demand > 0) {
       
    72                 done = !iter.hasNext();
       
    73                 if (done) {
       
    74                     subscriber.onComplete();
       
    75                     recursion --;
       
    76                     return;
    70                     return;
    77                 }
    71                 }
    78                 subscriber.onNext(iter.next());
    72                 Throwable t = error;
    79                 demand --;
    73                 if (t != null) {
       
    74                     completed = true;
       
    75                     pullScheduler.stop();
       
    76                     subscriber.onError(t);
       
    77                 }
       
    78                 if (demand.tryDecrement()) {
       
    79                     boolean done = completed = !iter.hasNext();
       
    80                     if (done)
       
    81                         subscriber.onComplete();
       
    82                     else
       
    83                         subscriber.onNext(iter.next());
       
    84                 }
    80             }
    85             }
    81         }
    86         }
    82 
    87 
    83         @Override
    88         @Override
    84         public void cancel() {
    89         public void request(long n) {
    85             done = true;
    90             if (cancelled) {
       
    91                 error = new IllegalArgumentException("request("
       
    92                                                       + n + "): cancelled");
       
    93             } else {
       
    94                 demand.increase(n);
       
    95             }
       
    96             pullScheduler.runOrSchedule();
    86         }
    97         }
    87 
    98 
       
    99         @Override
       
   100         public void cancel() {
       
   101             cancelled = true;
       
   102         }
    88     }
   103     }
    89 }
   104 }