src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java
branchhttp-client-branch
changeset 56209 43d5ad612710
parent 56204 e5d0c20217a3
child 56410 1b37529eaf3a
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Tue Feb 27 19:26:25 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Wed Feb 28 15:48:46 2018 +0000
@@ -53,9 +53,6 @@
 
     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
-    private static final System.Logger DEBUG_LOGGER =
-            Utils.getDebugLogger("Http1Exchange"::toString, DEBUG);
-
     final HttpRequestImpl request; // main request
     final Http1Request requestAction;
     private volatile Http1Response<T> response;
@@ -121,12 +118,17 @@
         final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>();
         private volatile Flow.Subscription subscription;
         volatile boolean complete;
+        private final System.Logger debug;
+        Http1BodySubscriber(System.Logger debug) {
+            assert debug != null;
+            this.debug = debug;
+        }
 
         /** Final sentinel in the stream of request body. */
         static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0));
 
         final void request(long n) {
-            DEBUG_LOGGER.log(Level.DEBUG, () ->
+            debug.log(Level.DEBUG, () ->
                 "Http1BodySubscriber requesting " + n + ", from " + subscription);
             subscription.request(n);
         }
@@ -141,11 +143,17 @@
         }
 
         final void cancelSubscription() {
-            subscription.cancel();
+            try {
+                subscription.cancel();
+            } catch(Throwable t) {
+                String msg = "Ignoring exception raised when canceling BodyPublisher subscription";
+                debug.log(Level.DEBUG, "%s: %s", msg, t);
+                Log.logError("{0}: {1}", msg, (Object)t);
+            }
         }
 
-        static Http1BodySubscriber completeSubscriber() {
-            return new Http1BodySubscriber() {
+        static Http1BodySubscriber completeSubscriber(System.Logger debug) {
+            return new Http1BodySubscriber(debug) {
                 @Override public void onSubscribe(Flow.Subscription subscription) { error(); }
                 @Override public void onNext(ByteBuffer item) { error(); }
                 @Override public void onError(Throwable throwable) { error(); }
@@ -285,15 +293,15 @@
         try {
             bodySubscriber = requestAction.continueRequest();
             if (bodySubscriber == null) {
-                bodySubscriber = Http1BodySubscriber.completeSubscriber();
+                bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
                 appendToOutgoing(Http1BodySubscriber.COMPLETED);
             } else {
                 // start
-                bodySubscriber.whenSubscribed.thenAccept(
-                        (s) -> bodySubscriber.request(1));
+                bodySubscriber.whenSubscribed
+                        .thenAccept((s) -> requestMoreBody());
             }
         } catch (Throwable t) {
-            connection.close();
+            cancelImpl(t);
             bodySentCF.completeExceptionally(t);
         }
         return bodySentCF;
@@ -385,9 +393,11 @@
     private void cancelImpl(Throwable cause) {
         LinkedList<CompletableFuture<?>> toComplete = null;
         int count = 0;
+        Throwable error;
         synchronized (lock) {
-            if (failed == null)
-                failed = cause;
+            if ((error = failed) == null) {
+                failed = error = cause;
+            }
             if (requestAction != null && requestAction.finished()
                     && response != null && response.finished()) {
                 return;
@@ -426,12 +436,13 @@
             Executor exec = client.isSelectorThread()
                             ? executor
                             : this::runInline;
+            Throwable x = error;
             while (!toComplete.isEmpty()) {
                 CompletableFuture<?> cf = toComplete.poll();
                 exec.execute(() -> {
-                    if (cf.completeExceptionally(cause)) {
+                    if (cf.completeExceptionally(x)) {
                         debug.log(Level.DEBUG, "completed cf with %s",
-                                 (Object) cause);
+                                 (Object) x);
                     }
                 });
             }
@@ -479,6 +490,17 @@
         return !outgoing.isEmpty();
     }
 
+    private void requestMoreBody() {
+        try {
+            debug.log(Level.DEBUG, "requesting more body from the subscriber");
+            bodySubscriber.request(1);
+        } catch (Throwable t) {
+            debug.log(Level.DEBUG, "Subscription::request failed", t);
+            cancelImpl(t);
+            bodySentCF.completeExceptionally(t);
+        }
+    }
+
     // Invoked only by the publisher
     // ALL tasks should execute off the Selector-Manager thread
     /** Returns the next portion of the HTTP request, or the error. */
@@ -513,8 +535,7 @@
                         debug.log(Level.DEBUG, "initiating completion of bodySentCF");
                         bodySentCF.completeAsync(() -> this, exec);
                     } else {
-                        debug.log(Level.DEBUG, "requesting more body from the subscriber");
-                        exec.execute(() -> bodySubscriber.request(1));
+                        exec.execute(this::requestMoreBody);
                     }
                     break;
                 case INITIAL: