src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
branchhttp-client-branch
changeset 55942 8d4770c22b63
parent 55858 cd5eeec735fb
child 55950 5e1707e5a254
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Fri Dec 01 19:25:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Sat Dec 02 17:40:57 2017 +0000
@@ -320,6 +320,8 @@
                             DEBUG_LOGGER.log(Level.DEBUG, "Increased demand by 1");
                             s.request(1);
                         }
+                        assert currentListItr != null;
+                        if (lb.isEmpty()) continue;
                     }
                     assert currentListItr != null;
                     assert currentListItr.hasNext();
@@ -359,27 +361,38 @@
 
         @Override
         public void onSubscribe(Flow.Subscription s) {
-            if (!subscribed.compareAndSet(false, true)) {
-                s.cancel();
-            } else {
-                // check whether the stream is already closed.
-                // if so, we should cancel the subscription
-                // immediately.
-                boolean closed;
-                synchronized(this) {
-                    closed = this.closed;
-                    if (!closed) {
-                        this.subscription = s;
+            try {
+                if (!subscribed.compareAndSet(false, true)) {
+                    s.cancel();
+                } else {
+                    // check whether the stream is already closed.
+                    // if so, we should cancel the subscription
+                    // immediately.
+                    boolean closed;
+                    synchronized (this) {
+                        closed = this.closed;
+                        if (!closed) {
+                            this.subscription = s;
+                        }
                     }
-                }
-                if (closed) {
-                    s.cancel();
-                    return;
+                    if (closed) {
+                        s.cancel();
+                        return;
+                    }
+                    assert buffers.remainingCapacity() > 1; // should contain at least 2
+                    DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
+                            + Math.max(1, buffers.remainingCapacity() - 1));
+                    s.request(Math.max(1, buffers.remainingCapacity() - 1));
                 }
-                assert buffers.remainingCapacity() > 1; // should contain at least 2
-                DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
-                        + Math.max(1, buffers.remainingCapacity() - 1));
-                s.request(Math.max(1, buffers.remainingCapacity() - 1));
+            } catch (Throwable t) {
+                failed = t;
+                try {
+                    close();
+                } catch (IOException x) {
+                    // OK
+                } finally {
+                    onError(t);
+                }
             }
         }
 
@@ -392,12 +405,14 @@
                     throw new IllegalStateException("queue is full");
                 }
                 DEBUG_LOGGER.log(Level.DEBUG, "item offered");
-            } catch (Exception ex) {
+            } catch (Throwable ex) {
                 failed = ex;
                 try {
                     close();
                 } catch (IOException ex1) {
                     // OK
+                } finally {
+                    onError(ex);
                 }
             }
         }