1 /* |
1 /* |
2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved. |
2 * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
4 * |
4 * |
5 * This code is free software; you can redistribute it and/or modify it |
5 * This code is free software; you can redistribute it and/or modify it |
6 * under the terms of the GNU General Public License version 2 only, as |
6 * under the terms of the GNU General Public License version 2 only, as |
7 * published by the Free Software Foundation. Oracle designates this |
7 * published by the Free Software Foundation. Oracle designates this |
25 |
25 |
26 package jdk.incubator.http; |
26 package jdk.incubator.http; |
27 |
27 |
28 import java.util.Iterator; |
28 import java.util.Iterator; |
29 import java.util.concurrent.Flow; |
29 import java.util.concurrent.Flow; |
|
30 import jdk.incubator.http.internal.common.Demand; |
|
31 import jdk.incubator.http.internal.common.SequentialScheduler; |
30 |
32 |
31 /** |
33 /** |
32 * A Publisher that is expected to run in same thread as subscriber. |
34 * A Publisher that publishes items obtained from the given Iterable. Each new |
33 * Items are obtained from Iterable. Each new subscription gets a new Iterator. |
35 * subscription gets a new Iterator. |
34 */ |
36 */ |
35 class PullPublisher<T> implements Flow.Publisher<T> { |
37 class PullPublisher<T> implements Flow.Publisher<T> { |
36 |
38 |
37 private final Iterable<T> iterable; |
39 private final Iterable<T> iterable; |
38 |
40 |
47 |
49 |
48 private class Subscription implements Flow.Subscription { |
50 private class Subscription implements Flow.Subscription { |
49 |
51 |
50 private final Flow.Subscriber<? super T> subscriber; |
52 private final Flow.Subscriber<? super T> subscriber; |
51 private final Iterator<T> iter; |
53 private final Iterator<T> iter; |
52 private boolean done = false; |
54 private volatile boolean completed; |
53 private long demand = 0; |
55 private volatile boolean cancelled; |
54 private int recursion = 0; |
56 private volatile Throwable error; |
|
57 final SequentialScheduler pullScheduler = new SequentialScheduler(new PullTask()); |
|
58 private final Demand demand = new Demand(); |
55 |
59 |
56 Subscription(Flow.Subscriber<? super T> subscriber, Iterator<T> iter) { |
60 Subscription(Flow.Subscriber<? super T> subscriber, Iterator<T> iter) { |
57 this.subscriber = subscriber; |
61 this.subscriber = subscriber; |
58 this.iter = iter; |
62 this.iter = iter; |
59 } |
63 } |
60 |
64 |
61 @Override |
65 final class PullTask extends SequentialScheduler.CompleteRestartableTask { |
62 public void request(long n) { |
66 @Override |
63 if (done) { |
67 protected void run() { |
64 subscriber.onError(new IllegalArgumentException("request(" + n + ")")); |
68 if (completed) { |
65 } |
69 pullScheduler.stop(); |
66 demand += n; |
|
67 recursion ++; |
|
68 if (recursion > 1) { |
|
69 return; |
|
70 } |
|
71 while (demand > 0) { |
|
72 done = !iter.hasNext(); |
|
73 if (done) { |
|
74 subscriber.onComplete(); |
|
75 recursion --; |
|
76 return; |
70 return; |
77 } |
71 } |
78 subscriber.onNext(iter.next()); |
72 Throwable t = error; |
79 demand --; |
73 if (t != null) { |
|
74 completed = true; |
|
75 pullScheduler.stop(); |
|
76 subscriber.onError(t); |
|
77 } |
|
78 if (demand.tryDecrement()) { |
|
79 boolean done = completed = !iter.hasNext(); |
|
80 if (done) |
|
81 subscriber.onComplete(); |
|
82 else |
|
83 subscriber.onNext(iter.next()); |
|
84 } |
80 } |
85 } |
81 } |
86 } |
82 |
87 |
83 @Override |
88 @Override |
84 public void cancel() { |
89 public void request(long n) { |
85 done = true; |
90 if (cancelled) { |
|
91 error = new IllegalArgumentException("request(" |
|
92 + n + "): cancelled"); |
|
93 } else { |
|
94 demand.increase(n); |
|
95 } |
|
96 pullScheduler.runOrSchedule(); |
86 } |
97 } |
87 |
98 |
|
99 @Override |
|
100 public void cancel() { |
|
101 cancelled = true; |
|
102 } |
88 } |
103 } |
89 } |
104 } |