equal
deleted
inserted
replaced
240 requestPublisher.subscribe(subscriber); |
240 requestPublisher.subscribe(subscriber); |
241 } |
241 } |
242 return subscriber; |
242 return subscriber; |
243 } |
243 } |
244 |
244 |
245 class StreamSubscriber extends Http1BodySubscriber { |
245 final class StreamSubscriber extends Http1BodySubscriber { |
246 |
246 |
247 @Override |
247 @Override |
248 public void onSubscribe(Flow.Subscription subscription) { |
248 public void onSubscribe(Flow.Subscription subscription) { |
249 if (this.subscription != null) { |
249 if (isSubscribed()) { |
250 Throwable t = new IllegalStateException("already subscribed"); |
250 Throwable t = new IllegalStateException("already subscribed"); |
251 http1Exchange.appendToOutgoing(t); |
251 http1Exchange.appendToOutgoing(t); |
252 } else { |
252 } else { |
253 this.subscription = subscription; |
253 setSubscription(subscription); |
254 } |
254 } |
255 } |
255 } |
256 |
256 |
257 @Override |
257 @Override |
258 public void onNext(ByteBuffer item) { |
258 public void onNext(ByteBuffer item) { |
273 @Override |
273 @Override |
274 public void onError(Throwable throwable) { |
274 public void onError(Throwable throwable) { |
275 if (complete) |
275 if (complete) |
276 return; |
276 return; |
277 |
277 |
278 subscription.cancel(); |
278 cancelSubscription(); |
279 http1Exchange.appendToOutgoing(throwable); |
279 http1Exchange.appendToOutgoing(throwable); |
280 } |
280 } |
281 |
281 |
282 @Override |
282 @Override |
283 public void onComplete() { |
283 public void onComplete() { |
296 |
296 |
297 } |
297 } |
298 } |
298 } |
299 } |
299 } |
300 |
300 |
301 class FixedContentSubscriber extends Http1BodySubscriber { |
301 final class FixedContentSubscriber extends Http1BodySubscriber { |
302 |
302 |
303 private volatile long contentWritten; |
303 private volatile long contentWritten; |
304 |
304 |
305 @Override |
305 @Override |
306 public void onSubscribe(Flow.Subscription subscription) { |
306 public void onSubscribe(Flow.Subscription subscription) { |
307 if (this.subscription != null) { |
307 if (isSubscribed()) { |
308 Throwable t = new IllegalStateException("already subscribed"); |
308 Throwable t = new IllegalStateException("already subscribed"); |
309 http1Exchange.appendToOutgoing(t); |
309 http1Exchange.appendToOutgoing(t); |
310 } else { |
310 } else { |
311 this.subscription = subscription; |
311 setSubscription(subscription); |
312 } |
312 } |
313 } |
313 } |
314 |
314 |
315 @Override |
315 @Override |
316 public void onNext(ByteBuffer item) { |
316 public void onNext(ByteBuffer item) { |
322 } else { |
322 } else { |
323 long writing = item.remaining(); |
323 long writing = item.remaining(); |
324 long written = (contentWritten += writing); |
324 long written = (contentWritten += writing); |
325 |
325 |
326 if (written > contentLength) { |
326 if (written > contentLength) { |
327 subscription.cancel(); |
327 cancelSubscription(); |
328 String msg = connection.getConnectionFlow() |
328 String msg = connection.getConnectionFlow() |
329 + " [" + Thread.currentThread().getName() +"] " |
329 + " [" + Thread.currentThread().getName() +"] " |
330 + "Too many bytes in request body. Expected: " |
330 + "Too many bytes in request body. Expected: " |
331 + contentLength + ", got: " + written; |
331 + contentLength + ", got: " + written; |
332 http1Exchange.appendToOutgoing(new IOException(msg)); |
332 http1Exchange.appendToOutgoing(new IOException(msg)); |
340 public void onError(Throwable throwable) { |
340 public void onError(Throwable throwable) { |
341 debug.log(Level.DEBUG, "onError"); |
341 debug.log(Level.DEBUG, "onError"); |
342 if (complete) // TODO: error? |
342 if (complete) // TODO: error? |
343 return; |
343 return; |
344 |
344 |
345 subscription.cancel(); |
345 cancelSubscription(); |
346 http1Exchange.appendToOutgoing(throwable); |
346 http1Exchange.appendToOutgoing(throwable); |
347 } |
347 } |
348 |
348 |
349 @Override |
349 @Override |
350 public void onComplete() { |
350 public void onComplete() { |
354 http1Exchange.appendToOutgoing(t); |
354 http1Exchange.appendToOutgoing(t); |
355 } else { |
355 } else { |
356 complete = true; |
356 complete = true; |
357 long written = contentWritten; |
357 long written = contentWritten; |
358 if (contentLength > written) { |
358 if (contentLength > written) { |
359 subscription.cancel(); |
359 cancelSubscription(); |
360 Throwable t = new IOException(connection.getConnectionFlow() |
360 Throwable t = new IOException(connection.getConnectionFlow() |
361 + " [" + Thread.currentThread().getName() +"] " |
361 + " [" + Thread.currentThread().getName() +"] " |
362 + "Too few bytes returned by the publisher (" |
362 + "Too few bytes returned by the publisher (" |
363 + written + "/" |
363 + written + "/" |
364 + contentLength + ")"); |
364 + contentLength + ")"); |