49 volatile Throwable error; // any exception that occurred during pushes |
49 volatile Throwable error; // any exception that occurred during pushes |
50 |
50 |
51 // user's subscriber object |
51 // user's subscriber object |
52 final PushPromiseHandler<T> pushPromiseHandler; |
52 final PushPromiseHandler<T> pushPromiseHandler; |
53 |
53 |
54 private final AccessControlContext acc; |
54 private final Executor executor; |
55 |
55 |
56 int numberOfPushes; |
56 int numberOfPushes; |
57 int remainingPushes; |
57 int remainingPushes; |
58 boolean noMorePushes = false; |
58 boolean noMorePushes = false; |
59 |
59 |
60 PushGroup(PushPromiseHandler<T> pushPromiseHandler, |
60 PushGroup(PushPromiseHandler<T> pushPromiseHandler, |
61 HttpRequestImpl initiatingRequest, |
61 HttpRequestImpl initiatingRequest, |
62 AccessControlContext acc) { |
62 Executor executor) { |
63 this(pushPromiseHandler, initiatingRequest, new MinimalFuture<>(), acc); |
63 this(pushPromiseHandler, initiatingRequest, new MinimalFuture<>(), executor); |
64 } |
64 } |
65 |
65 |
66 // Check mainBodyHandler before calling nested constructor. |
66 // Check mainBodyHandler before calling nested constructor. |
67 private PushGroup(HttpResponse.PushPromiseHandler<T> pushPromiseHandler, |
67 private PushGroup(HttpResponse.PushPromiseHandler<T> pushPromiseHandler, |
68 HttpRequestImpl initiatingRequest, |
68 HttpRequestImpl initiatingRequest, |
69 CompletableFuture<HttpResponse<T>> mainResponse, |
69 CompletableFuture<HttpResponse<T>> mainResponse, |
70 AccessControlContext acc) { |
70 Executor executor) { |
71 this.noMorePushesCF = new MinimalFuture<>(); |
71 this.noMorePushesCF = new MinimalFuture<>(); |
72 this.pushPromiseHandler = pushPromiseHandler; |
72 this.pushPromiseHandler = pushPromiseHandler; |
73 this.initiatingRequest = initiatingRequest; |
73 this.initiatingRequest = initiatingRequest; |
74 this.acc = acc; |
74 this.executor = executor; |
75 } |
75 } |
76 |
76 |
77 interface Acceptor<T> { |
77 interface Acceptor<T> { |
78 BodyHandler<T> bodyHandler(); |
78 BodyHandler<T> bodyHandler(); |
79 CompletableFuture<HttpResponse<T>> cf(); |
79 CompletableFuture<HttpResponse<T>> cf(); |
80 boolean accepted(); |
80 boolean accepted(); |
81 } |
81 } |
82 |
82 |
83 private static class AcceptorImpl<T> implements Acceptor<T> { |
83 private static class AcceptorImpl<T> implements Acceptor<T> { |
|
84 private final Executor executor; |
84 private volatile HttpResponse.BodyHandler<T> bodyHandler; |
85 private volatile HttpResponse.BodyHandler<T> bodyHandler; |
85 private volatile CompletableFuture<HttpResponse<T>> cf; |
86 private volatile CompletableFuture<HttpResponse<T>> cf; |
|
87 |
|
88 AcceptorImpl(Executor executor) { |
|
89 this.executor = executor; |
|
90 } |
86 |
91 |
87 CompletableFuture<HttpResponse<T>> accept(BodyHandler<T> bodyHandler) { |
92 CompletableFuture<HttpResponse<T>> accept(BodyHandler<T> bodyHandler) { |
88 Objects.requireNonNull(bodyHandler); |
93 Objects.requireNonNull(bodyHandler); |
89 if (this.bodyHandler != null) |
94 if (this.bodyHandler != null) |
90 throw new IllegalStateException("non-null bodyHandler"); |
95 throw new IllegalStateException("non-null bodyHandler"); |
91 this.bodyHandler = bodyHandler; |
96 this.bodyHandler = bodyHandler; |
92 cf = new MinimalFuture<>(); |
97 cf = new MinimalFuture<>(); |
93 return cf; |
98 return cf.whenCompleteAsync((r,t) -> {}, executor); |
94 } |
99 } |
95 |
100 |
96 @Override public BodyHandler<T> bodyHandler() { return bodyHandler; } |
101 @Override public BodyHandler<T> bodyHandler() { return bodyHandler; } |
97 |
102 |
98 @Override public CompletableFuture<HttpResponse<T>> cf() { return cf; } |
103 @Override public CompletableFuture<HttpResponse<T>> cf() { return cf; } |
99 |
104 |
100 @Override public boolean accepted() { return cf != null; } |
105 @Override public boolean accepted() { return cf != null; } |
101 } |
106 } |
102 |
107 |
103 Acceptor<T> acceptPushRequest(HttpRequest pushRequest, Executor e) { |
108 Acceptor<T> acceptPushRequest(HttpRequest pushRequest) { |
104 AcceptorImpl<T> acceptor = new AcceptorImpl<>(); |
109 AcceptorImpl<T> acceptor = new AcceptorImpl<>(executor); |
105 try { |
110 try { |
106 pushPromiseHandler.applyPushPromise(initiatingRequest, pushRequest, acceptor::accept); |
111 pushPromiseHandler.applyPushPromise(initiatingRequest, pushRequest, acceptor::accept); |
107 } catch (Throwable t) { |
112 } catch (Throwable t) { |
108 if (acceptor.accepted()) { |
113 if (acceptor.accepted()) { |
109 CompletableFuture<?> cf = acceptor.cf(); |
114 CompletableFuture<?> cf = acceptor.cf(); |
110 e.execute(() -> cf.completeExceptionally(t)); |
115 cf.completeExceptionally(t); |
111 } |
116 } |
112 throw t; |
117 throw t; |
113 } |
118 } |
114 |
119 |
115 synchronized (this) { |
120 synchronized (this) { |