http-client-branch: Fixed SSLFlowDelegate to not call Subscriber.onError directly http-client-branch
authormichaelm
Tue, 27 Feb 2018 18:43:06 +0000
branchhttp-client-branch
changeset 56207 03c89ed2070f
parent 56206 a0cf7477d139
child 56208 d37c08ce784a
http-client-branch: Fixed SSLFlowDelegate to not call Subscriber.onError directly
src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Tue Feb 27 16:44:39 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Tue Feb 27 18:43:06 2018 +0000
@@ -96,7 +96,8 @@
     final CompletableFuture<String> alpnCF; // completes on initial handshake
     final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
     volatile boolean close_notify_received;
-    volatile Flow.Subscriber<?> downReader;
+    final CompletableFuture<Void> readerCF;
+    final CompletableFuture<Void> writerCF;
     static AtomicInteger scount = new AtomicInteger(1);
     final int id;
 
@@ -117,13 +118,14 @@
         this.engine = engine;
         this.exec = exec;
         this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
-        CompletableFuture.anyOf(reader.completion(), writer.completion())
-            .exceptionally(this::stopOnError);
+        this.readerCF = reader.completion();
+        this.writerCF = reader.completion();
+        readerCF.exceptionally(this::stopOnError);
+        readerCF.exceptionally(this::stopOnError);
 
         CompletableFuture.allOf(reader.completion(), writer.completion())
             .thenRun(this::normalStop);
         this.alpnCF = new MinimalFuture<>();
-        this.downReader = downReader;
 
         // connect the Reader to the downReader and the
         // Writer to the downWriter.
@@ -153,7 +155,6 @@
      */
     void connect(Subscriber<? super List<ByteBuffer>> downReader,
                  Subscriber<? super List<ByteBuffer>> downWriter) {
-        this.downReader = downReader;
         this.reader.subscribe(downReader);
         this.writer.subscribe(downWriter);
     }
@@ -354,7 +355,7 @@
                             }
                         }
                     } catch (IOException ex) {
-                        errorCommon(ex);
+                        errorCommon(ex, true);
                         handleError(ex);
                     }
                     if (handshaking && !complete)
@@ -374,7 +375,7 @@
                     outgoing(Utils.EMPTY_BB_LIST, true);
                 }
             } catch (Throwable ex) {
-                errorCommon(ex);
+                errorCommon(ex, true);
                 handleError(ex);
             }
         }
@@ -580,7 +581,7 @@
                     writer.addData(HS_TRIGGER);
                 }
             } catch (Throwable ex) {
-                errorCommon(ex);
+                errorCommon(ex, true);
                 handleError(ex);
             }
         }
@@ -607,7 +608,8 @@
 
     private void handleError(Throwable t) {
         debug.log(Level.DEBUG, "handleError", t);
-        downReader.onError(t);
+        readerCF.completeExceptionally(t);
+        writerCF.completeExceptionally(t);
         // no-op if already completed
         alpnCF.completeExceptionally(t);
         reader.stop();
@@ -618,7 +620,12 @@
         stopOnError(null);
     }
 
-    private Void stopOnError(Throwable t) {
+    boolean stopped = false;
+
+    synchronized private Void stopOnError(Throwable t) {
+        if (stopped)
+            return null;
+        stopped = true;
         reader.stop();
         writer.stop();
         return null;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java	Tue Feb 27 16:44:39 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java	Tue Feb 27 18:43:06 2018 +0000
@@ -92,7 +92,11 @@
     public SubscriberWrapper()
     {
         this.outputQ = new ConcurrentLinkedQueue<>();
-        this.cf = new MinimalFuture<>();
+        this.cf = new MinimalFuture<Void>().whenComplete((v,t) ->
+        {
+            if (t != null)
+                errorCommon(t, false);
+        });
         this.pushScheduler =
                 SequentialScheduler.synchronizedScheduler(new DownstreamPusher());
         this.downstreamSubscription = new SubscriptionBase(pushScheduler,
@@ -255,7 +259,7 @@
             try {
                 run1();
             } catch (Throwable t) {
-                errorCommon(t);
+                errorCommon(t, true);
             }
         }
 
@@ -363,17 +367,18 @@
     @Override
     public void onError(Throwable throwable) {
         logger.log(Level.DEBUG, () -> "onError: " + throwable);
-        errorCommon(Objects.requireNonNull(throwable));
+        errorCommon(Objects.requireNonNull(throwable), true);
     }
 
-    protected boolean errorCommon(Throwable throwable) {
+    protected boolean errorCommon(Throwable throwable, boolean completecf) {
         assert throwable != null ||
                 (throwable = new AssertionError("null throwable")) != null;
         if (errorRef.compareAndSet(null, throwable)) {
             logger.log(Level.DEBUG, "error", throwable);
             pushScheduler.runOrSchedule();
             upstreamCompleted = true;
-            cf.completeExceptionally(throwable);
+            if (completecf)
+                cf.completeExceptionally(throwable);
             return true;
         }
         return false;
@@ -381,14 +386,18 @@
 
     @Override
     public void close() {
-        errorCommon(new RuntimeException("wrapper closed"));
+        errorCommon(new RuntimeException("wrapper closed"), true);
+    }
+
+    public void close(Throwable t) {
+        errorCommon(t, true);
     }
 
     private void incomingCaller(List<ByteBuffer> l, boolean complete) {
         try {
             incoming(l, complete);
         } catch(Throwable t) {
-            errorCommon(t);
+            errorCommon(t, true);
         }
     }