src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
branchhttp-client-branch
changeset 56071 3353cb42b1b4
parent 56054 352e845ae744
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Mon Feb 05 15:51:09 2018 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Mon Feb 05 17:18:26 2018 +0000
@@ -150,44 +150,49 @@
             // can't process anything yet
             return;
 
-        while (!inputQ.isEmpty()) {
-            Http2Frame frame  = inputQ.peek();
-            if (frame instanceof ResetFrame) {
-                inputQ.remove();
-                handleReset((ResetFrame)frame);
-                return;
-            }
-            DataFrame df = (DataFrame)frame;
-            boolean finished = df.getFlag(DataFrame.END_STREAM);
+        try {
+            while (!inputQ.isEmpty()) {
+                Http2Frame frame = inputQ.peek();
+                if (frame instanceof ResetFrame) {
+                    inputQ.remove();
+                    handleReset((ResetFrame)frame);
+                    return;
+                }
+                DataFrame df = (DataFrame)frame;
+                boolean finished = df.getFlag(DataFrame.END_STREAM);
 
-            List<ByteBuffer> buffers = df.getData();
-            List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
-            int size = Utils.remaining(dsts, Integer.MAX_VALUE);
-            if (size == 0 && finished) {
-                inputQ.remove();
-                Log.logTrace("responseSubscriber.onComplete");
-                debug.log(Level.DEBUG, "incoming: onComplete");
-                sched.stop();
-                responseSubscriber.onComplete();
-                setEndStreamReceived();
-                return;
-            } else if (userSubscription.tryDecrement()) {
-                inputQ.remove();
-                Log.logTrace("responseSubscriber.onNext {0}", size);
-                debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
-                responseSubscriber.onNext(dsts);
-                if (consumed(df)) {
+                List<ByteBuffer> buffers = df.getData();
+                List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
+                int size = Utils.remaining(dsts, Integer.MAX_VALUE);
+                if (size == 0 && finished) {
+                    inputQ.remove();
                     Log.logTrace("responseSubscriber.onComplete");
                     debug.log(Level.DEBUG, "incoming: onComplete");
                     sched.stop();
                     responseSubscriber.onComplete();
                     setEndStreamReceived();
                     return;
+                } else if (userSubscription.tryDecrement()) {
+                    inputQ.remove();
+                    Log.logTrace("responseSubscriber.onNext {0}", size);
+                    debug.log(Level.DEBUG, "incoming: onNext(%d)", size);
+                    responseSubscriber.onNext(dsts);
+                    if (consumed(df)) {
+                        Log.logTrace("responseSubscriber.onComplete");
+                        debug.log(Level.DEBUG, "incoming: onComplete");
+                        sched.stop();
+                        responseSubscriber.onComplete();
+                        setEndStreamReceived();
+                        return;
+                    }
+                } else {
+                    return;
                 }
-            } else {
-                return;
             }
+        } catch (Throwable throwable) {
+            failed = throwable;
         }
+
         Throwable t = failed;
         if (t != null) {
             sched.stop();