148 private void schedule() { |
148 private void schedule() { |
149 if (responseSubscriber == null) |
149 if (responseSubscriber == null) |
150 // can't process anything yet |
150 // can't process anything yet |
151 return; |
151 return; |
152 |
152 |
153 while (!inputQ.isEmpty()) { |
153 try { |
154 Http2Frame frame = inputQ.peek(); |
154 while (!inputQ.isEmpty()) { |
155 if (frame instanceof ResetFrame) { |
155 Http2Frame frame = inputQ.peek(); |
156 inputQ.remove(); |
156 if (frame instanceof ResetFrame) { |
157 handleReset((ResetFrame)frame); |
157 inputQ.remove(); |
158 return; |
158 handleReset((ResetFrame)frame); |
159 } |
159 return; |
160 DataFrame df = (DataFrame)frame; |
160 } |
161 boolean finished = df.getFlag(DataFrame.END_STREAM); |
161 DataFrame df = (DataFrame)frame; |
162 |
162 boolean finished = df.getFlag(DataFrame.END_STREAM); |
163 List<ByteBuffer> buffers = df.getData(); |
163 |
164 List<ByteBuffer> dsts = Collections.unmodifiableList(buffers); |
164 List<ByteBuffer> buffers = df.getData(); |
165 int size = Utils.remaining(dsts, Integer.MAX_VALUE); |
165 List<ByteBuffer> dsts = Collections.unmodifiableList(buffers); |
166 if (size == 0 && finished) { |
166 int size = Utils.remaining(dsts, Integer.MAX_VALUE); |
167 inputQ.remove(); |
167 if (size == 0 && finished) { |
168 Log.logTrace("responseSubscriber.onComplete"); |
168 inputQ.remove(); |
169 debug.log(Level.DEBUG, "incoming: onComplete"); |
|
170 sched.stop(); |
|
171 responseSubscriber.onComplete(); |
|
172 setEndStreamReceived(); |
|
173 return; |
|
174 } else if (userSubscription.tryDecrement()) { |
|
175 inputQ.remove(); |
|
176 Log.logTrace("responseSubscriber.onNext {0}", size); |
|
177 debug.log(Level.DEBUG, "incoming: onNext(%d)", size); |
|
178 responseSubscriber.onNext(dsts); |
|
179 if (consumed(df)) { |
|
180 Log.logTrace("responseSubscriber.onComplete"); |
169 Log.logTrace("responseSubscriber.onComplete"); |
181 debug.log(Level.DEBUG, "incoming: onComplete"); |
170 debug.log(Level.DEBUG, "incoming: onComplete"); |
182 sched.stop(); |
171 sched.stop(); |
183 responseSubscriber.onComplete(); |
172 responseSubscriber.onComplete(); |
184 setEndStreamReceived(); |
173 setEndStreamReceived(); |
185 return; |
174 return; |
|
175 } else if (userSubscription.tryDecrement()) { |
|
176 inputQ.remove(); |
|
177 Log.logTrace("responseSubscriber.onNext {0}", size); |
|
178 debug.log(Level.DEBUG, "incoming: onNext(%d)", size); |
|
179 responseSubscriber.onNext(dsts); |
|
180 if (consumed(df)) { |
|
181 Log.logTrace("responseSubscriber.onComplete"); |
|
182 debug.log(Level.DEBUG, "incoming: onComplete"); |
|
183 sched.stop(); |
|
184 responseSubscriber.onComplete(); |
|
185 setEndStreamReceived(); |
|
186 return; |
|
187 } |
|
188 } else { |
|
189 return; |
186 } |
190 } |
187 } else { |
191 } |
188 return; |
192 } catch (Throwable throwable) { |
189 } |
193 failed = throwable; |
190 } |
194 } |
|
195 |
191 Throwable t = failed; |
196 Throwable t = failed; |
192 if (t != null) { |
197 if (t != null) { |
193 sched.stop(); |
198 sched.stop(); |
194 responseSubscriber.onError(t); |
199 responseSubscriber.onError(t); |
195 close(); |
200 close(); |