202 |
202 |
203 /** |
203 /** |
204 * Sometime it might be necessary to complete the downstream subscriber |
204 * Sometime it might be necessary to complete the downstream subscriber |
205 * before the upstream completes. For instance, when an SSL server |
205 * before the upstream completes. For instance, when an SSL server |
206 * sends a notify_close. In that case we should let the outgoing |
206 * sends a notify_close. In that case we should let the outgoing |
207 * complete before upstream us completed. |
207 * complete before upstream is completed. |
208 * @return true, may be overridden by subclasses. |
208 * @return true, may be overridden by subclasses. |
209 */ |
209 */ |
210 public boolean closing() { |
210 public boolean closing() { |
211 return false; |
211 return false; |
212 } |
212 } |
215 Objects.requireNonNull(buffers); |
215 Objects.requireNonNull(buffers); |
216 if (complete) { |
216 if (complete) { |
217 assert Utils.remaining(buffers) == 0; |
217 assert Utils.remaining(buffers) == 0; |
218 boolean closing = closing(); |
218 boolean closing = closing(); |
219 logger.log(Level.DEBUG, |
219 logger.log(Level.DEBUG, |
220 "completionAcknowledged upstreamCompleted:%s, downstreamCompleted:%s, closing:%s", |
220 "completionAcknowledged upstreamCompleted:%s," |
221 upstreamCompleted, downstreamCompleted, closing); |
221 + " downstreamCompleted:%s, closing:%s", |
222 if (!upstreamCompleted && !closing) |
222 upstreamCompleted, downstreamCompleted, closing); |
|
223 if (!upstreamCompleted && !closing) { |
223 throw new IllegalStateException("upstream not completed"); |
224 throw new IllegalStateException("upstream not completed"); |
|
225 } |
224 completionAcknowledged = true; |
226 completionAcknowledged = true; |
225 } else { |
227 } else { |
226 logger.log(Level.DEBUG, () -> "Adding " |
228 logger.log(Level.DEBUG, () -> "Adding " |
227 + Utils.remaining(buffers) |
229 + Utils.remaining(buffers) + " to outputQ queue"); |
228 + " to outputQ queue"); |
|
229 outputQ.add(buffers); |
230 outputQ.add(buffers); |
230 } |
231 } |
231 logger.log(Level.DEBUG, () -> "pushScheduler " |
232 logger.log(Level.DEBUG, () -> "pushScheduler" |
232 + (pushScheduler.isStopped() ? " is stopped!" : " is alive")); |
233 + (pushScheduler.isStopped() ? " is stopped!" : " is alive")); |
233 pushScheduler.runOrSchedule(); |
234 pushScheduler.runOrSchedule(); |
234 } |
235 } |
235 |
236 |
236 /** |
237 /** |
279 } |
280 } |
280 // If there was an error, send it downstream. |
281 // If there was an error, send it downstream. |
281 Throwable error = errorRef.get(); |
282 Throwable error = errorRef.get(); |
282 if (error != null) { |
283 if (error != null) { |
283 synchronized(this) { |
284 synchronized(this) { |
284 if (downstreamCompleted) return; |
285 if (downstreamCompleted) |
|
286 return; |
285 downstreamCompleted = true; |
287 downstreamCompleted = true; |
286 } |
288 } |
287 logger.log(Level.DEBUG, |
289 logger.log(Level.DEBUG, |
288 () -> "DownstreamPusher: forwarding error downstream: " + error); |
290 () -> "DownstreamPusher: forwarding error downstream: " + error); |
289 pushScheduler.stop(); |
291 pushScheduler.stop(); |
317 } |
319 } |
318 } |
320 } |
319 |
321 |
320 void upstreamWindowUpdate() { |
322 void upstreamWindowUpdate() { |
321 long downstreamQueueSize = outputQ.size(); |
323 long downstreamQueueSize = outputQ.size(); |
322 long n = upstreamWindowUpdate(upstreamWindow.get(), downstreamQueueSize); |
324 long upstreamWindowSize = upstreamWindow.get(); |
323 logger.log(Level.DEBUG, "upstreamWindowUpdate, downstreamQueueSize:%d, upstreamWindow:%d", |
325 long n = upstreamWindowUpdate(upstreamWindowSize, downstreamQueueSize); |
324 downstreamQueueSize, upstreamWindow.get()); |
326 logger.log(Level.DEBUG, "upstreamWindowUpdate, " |
|
327 + "downstreamQueueSize:%d, upstreamWindow:%d", |
|
328 downstreamQueueSize, upstreamWindowSize); |
325 if (n > 0) |
329 if (n > 0) |
326 upstreamRequest(n); |
330 upstreamRequest(n); |
327 } |
331 } |
328 |
332 |
329 @Override |
333 @Override |