equal
deleted
inserted
replaced
311 if (t == null) { |
311 if (t == null) { |
312 assert subscribed; |
312 assert subscribed; |
313 try { |
313 try { |
314 userSubscriber.onComplete(); |
314 userSubscriber.onComplete(); |
315 } catch (Throwable x) { |
315 } catch (Throwable x) { |
|
316 // Simply propagate the error by calling |
|
317 // onError on the user subscriber, and let the |
|
318 // connection be reused since we should have received |
|
319 // and parsed all the bytes when we reach here. |
|
320 // If onError throws in turn, then we will simply |
|
321 // let that new exception flow up to the caller |
|
322 // and let it deal with it. |
|
323 // (i.e: log and close the connection) |
|
324 // Note that rethrowing here could introduce a |
|
325 // race that might cause the next send() operation to |
|
326 // fail as the connection has already been put back |
|
327 // into the cache when we reach here. |
316 propagateError(t = withError = Utils.getCompletionCause(x)); |
328 propagateError(t = withError = Utils.getCompletionCause(x)); |
317 // rethrow and let the caller deal with it. |
|
318 // (i.e: log and close the connection) |
|
319 // arguably we could decide to not throw and let the |
|
320 // connection be reused since we should have received and |
|
321 // parsed all the bytes when we reach here. |
|
322 throw x; |
|
323 } |
329 } |
324 } else { |
330 } else { |
325 propagateError(t); |
331 propagateError(t); |
326 } |
332 } |
327 } |
333 } |