316 readBuf = newb; |
316 readBuf = newb; |
317 } |
317 } |
318 |
318 |
319 @Override |
319 @Override |
320 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { |
320 protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) { |
321 if (readBuf.remaining() > TARGET_BUFSIZE) { |
321 if (needsMoreData()) { |
322 if (debugr.on()) |
322 // run the scheduler to see if more data should be requested |
323 debugr.log("readBuf has more than TARGET_BUFSIZE: %d", |
323 if (debugr.on()) { |
324 readBuf.remaining()); |
324 int remaining = readBuf.remaining(); |
325 return 0; |
325 if (remaining > TARGET_BUFSIZE) { |
326 } else { |
326 // just some logging to check how much we have in the read buffer |
327 return super.upstreamWindowUpdate(currentWindow, downstreamQsize); |
327 debugr.log("readBuf has more than TARGET_BUFSIZE: %d", |
328 } |
328 remaining); |
|
329 } |
|
330 } |
|
331 scheduler.runOrSchedule(); |
|
332 } |
|
333 return 0; // we will request more from the scheduler loop (processData). |
329 } |
334 } |
330 |
335 |
331 // readBuf is kept ready for reading outside of this method |
336 // readBuf is kept ready for reading outside of this method |
332 private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) { |
337 private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) { |
333 assert Utils.remaining(buffers) > 0 || buffers.isEmpty(); |
338 assert Utils.remaining(buffers) > 0 || buffers.isEmpty(); |
365 // minimum number of bytes required to call unwrap. |
370 // minimum number of bytes required to call unwrap. |
366 // Usually this is 0, unless there was a buffer underflow. |
371 // Usually this is 0, unless there was a buffer underflow. |
367 // In this case we need to wait for more bytes than what |
372 // In this case we need to wait for more bytes than what |
368 // we had before calling unwrap() again. |
373 // we had before calling unwrap() again. |
369 volatile int minBytesRequired; |
374 volatile int minBytesRequired; |
|
375 |
|
376 // We might need to request more data if: |
|
377 // - we have a subscription from upstream |
|
378 // - and we don't have enough data to decrypt in the read buffer |
|
379 // - *and* - either we're handshaking, and more data is required (NEED_UNWRAP), |
|
380 // - or we have demand from downstream, but we have nothing decrypted |
|
381 // to forward downstream. |
|
382 boolean needsMoreData() { |
|
383 if (upstreamSubscription != null && readBuf.remaining() <= minBytesRequired && |
|
384 (engine.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP |
|
385 || !downstreamSubscription.demand.isFulfilled() && hasNoOutputData())) { |
|
386 return true; |
|
387 } |
|
388 return false; |
|
389 } |
|
390 |
|
391 // If the readBuf has not enough data, and we either need to |
|
392 // unwrap (handshaking) or we have demand from downstream, |
|
393 // then request more data |
|
394 void requestMoreDataIfNeeded() { |
|
395 if (needsMoreData()) { |
|
396 // request more will only request more if our |
|
397 // demand from upstream is fulfilled |
|
398 requestMore(); |
|
399 } |
|
400 } |
370 |
401 |
371 // work function where it all happens |
402 // work function where it all happens |
372 final void processData() { |
403 final void processData() { |
373 try { |
404 try { |
374 if (debugr.on()) |
405 if (debugr.on()) |
432 if (complete && result.status() == Status.CLOSED) { |
463 if (complete && result.status() == Status.CLOSED) { |
433 if (debugr.on()) debugr.log("Closed: completing"); |
464 if (debugr.on()) debugr.log("Closed: completing"); |
434 outgoing(Utils.EMPTY_BB_LIST, true); |
465 outgoing(Utils.EMPTY_BB_LIST, true); |
435 // complete ALPN if not yet completed |
466 // complete ALPN if not yet completed |
436 setALPN(); |
467 setALPN(); |
|
468 requestMoreDataIfNeeded(); |
437 return; |
469 return; |
438 } |
470 } |
439 if (result.handshaking()) { |
471 if (result.handshaking()) { |
440 handshaking = true; |
472 handshaking = true; |
441 if (debugr.on()) debugr.log("handshaking"); |
473 if (debugr.on()) debugr.log("handshaking"); |
464 // Complete the alpnCF, if not already complete, regardless of |
498 // Complete the alpnCF, if not already complete, regardless of |
465 // whether or not the ALPN is available, there will be no more |
499 // whether or not the ALPN is available, there will be no more |
466 // activity. |
500 // activity. |
467 setALPN(); |
501 setALPN(); |
468 outgoing(Utils.EMPTY_BB_LIST, true); |
502 outgoing(Utils.EMPTY_BB_LIST, true); |
|
503 } else { |
|
504 requestMoreDataIfNeeded(); |
469 } |
505 } |
470 } catch (Throwable ex) { |
506 } catch (Throwable ex) { |
471 errorCommon(ex); |
507 errorCommon(ex); |
472 handleError(ex); |
508 handleError(ex); |
473 } |
509 } |