--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Mon Feb 05 15:51:09 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Mon Feb 05 17:18:26 2018 +0000
@@ -150,44 +150,49 @@
// can't process anything yet
return;
- while (!inputQ.isEmpty()) {
- Http2Frame frame = inputQ.peek();
- if (frame instanceof ResetFrame) {
- inputQ.remove();
- handleReset((ResetFrame)frame);
- return;
- }
- DataFrame df = (DataFrame)frame;
- boolean finished = df.getFlag(DataFrame.END_STREAM);
+ try {
+ while (!inputQ.isEmpty()) {
+ Http2Frame frame = inputQ.peek();
+ if (frame instanceof ResetFrame) {
+ inputQ.remove();
+ handleReset((ResetFrame)frame);
+ return;
+ }
+ DataFrame df = (DataFrame)frame;
+ boolean finished = df.getFlag(DataFrame.END_STREAM);
- List<ByteBuffer> buffers = df.getData();
- List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
- int size = Utils.remaining(dsts, Integer.MAX_VALUE);
- if (size == 0 && finished) {
- inputQ.remove();
- Log.logTrace("responseSubscriber.onComplete");
- debug.log(Level.DEBUG, "incoming: onComplete");
- sched.stop();
- responseSubscriber.onComplete();
- setEndStreamReceived();
- return;
- } else if (userSubscription.tryDecrement()) {
- inputQ.remove();
- Log.logTrace("responseSubscriber.onNext {0}", size);
- debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
- responseSubscriber.onNext(dsts);
- if (consumed(df)) {
+ List<ByteBuffer> buffers = df.getData();
+ List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
+ int size = Utils.remaining(dsts, Integer.MAX_VALUE);
+ if (size == 0 && finished) {
+ inputQ.remove();
Log.logTrace("responseSubscriber.onComplete");
debug.log(Level.DEBUG, "incoming: onComplete");
sched.stop();
responseSubscriber.onComplete();
setEndStreamReceived();
return;
+ } else if (userSubscription.tryDecrement()) {
+ inputQ.remove();
+ Log.logTrace("responseSubscriber.onNext {0}", size);
+ debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
+ responseSubscriber.onNext(dsts);
+ if (consumed(df)) {
+ Log.logTrace("responseSubscriber.onComplete");
+ debug.log(Level.DEBUG, "incoming: onComplete");
+ sched.stop();
+ responseSubscriber.onComplete();
+ setEndStreamReceived();
+ return;
+ }
+ } else {
+ return;
}
- } else {
- return;
}
+ } catch (Throwable throwable) {
+ failed = throwable;
}
+
Throwable t = failed;
if (t != null) {
sched.stop();