src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Fri Dec 01 19:25:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Sat Dec 02 17:40:57 2017 +0000
@@ -320,6 +320,8 @@
DEBUG_LOGGER.log(Level.DEBUG, "Increased demand by 1");
s.request(1);
}
+ assert currentListItr != null;
+ if (lb.isEmpty()) continue;
}
assert currentListItr != null;
assert currentListItr.hasNext();
@@ -359,27 +361,38 @@
@Override
public void onSubscribe(Flow.Subscription s) {
- if (!subscribed.compareAndSet(false, true)) {
- s.cancel();
- } else {
- // check whether the stream is already closed.
- // if so, we should cancel the subscription
- // immediately.
- boolean closed;
- synchronized(this) {
- closed = this.closed;
- if (!closed) {
- this.subscription = s;
+ try {
+ if (!subscribed.compareAndSet(false, true)) {
+ s.cancel();
+ } else {
+ // check whether the stream is already closed.
+ // if so, we should cancel the subscription
+ // immediately.
+ boolean closed;
+ synchronized (this) {
+ closed = this.closed;
+ if (!closed) {
+ this.subscription = s;
+ }
}
- }
- if (closed) {
- s.cancel();
- return;
+ if (closed) {
+ s.cancel();
+ return;
+ }
+ assert buffers.remainingCapacity() > 1; // should contain at least 2
+ DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
+ + Math.max(1, buffers.remainingCapacity() - 1));
+ s.request(Math.max(1, buffers.remainingCapacity() - 1));
}
- assert buffers.remainingCapacity() > 1; // should contain at least 2
- DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
- + Math.max(1, buffers.remainingCapacity() - 1));
- s.request(Math.max(1, buffers.remainingCapacity() - 1));
+ } catch (Throwable t) {
+ failed = t;
+ try {
+ close();
+ } catch (IOException x) {
+ // OK
+ } finally {
+ onError(t);
+ }
}
}
@@ -392,12 +405,14 @@
throw new IllegalStateException("queue is full");
}
DEBUG_LOGGER.log(Level.DEBUG, "item offered");
- } catch (Exception ex) {
+ } catch (Throwable ex) {
failed = ex;
try {
close();
} catch (IOException ex1) {
// OK
+ } finally {
+ onError(ex);
}
}
}