# HG changeset patch # User michaelm # Date 1519756986 0 # Node ID 03c89ed2070fab0f244b432dfd928e320b9b9f73 # Parent a0cf7477d1392e6a08bf9088493f2de97ff9e057 http-client-branch: Fixed SSLFlowDelegate to not call Subscriber.onError directly diff -r a0cf7477d139 -r 03c89ed2070f src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Tue Feb 27 16:44:39 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Tue Feb 27 18:43:06 2018 +0000 @@ -96,7 +96,8 @@ final CompletableFuture alpnCF; // completes on initial handshake final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; volatile boolean close_notify_received; - volatile Flow.Subscriber downReader; + final CompletableFuture readerCF; + final CompletableFuture writerCF; static AtomicInteger scount = new AtomicInteger(1); final int id; @@ -117,13 +118,14 @@ this.engine = engine; this.exec = exec; this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); - CompletableFuture.anyOf(reader.completion(), writer.completion()) - .exceptionally(this::stopOnError); + this.readerCF = reader.completion(); + this.writerCF = reader.completion(); + readerCF.exceptionally(this::stopOnError); + readerCF.exceptionally(this::stopOnError); CompletableFuture.allOf(reader.completion(), writer.completion()) .thenRun(this::normalStop); this.alpnCF = new MinimalFuture<>(); - this.downReader = downReader; // connect the Reader to the downReader and the // Writer to the downWriter. @@ -153,7 +155,6 @@ */ void connect(Subscriber> downReader, Subscriber> downWriter) { - this.downReader = downReader; this.reader.subscribe(downReader); this.writer.subscribe(downWriter); } @@ -354,7 +355,7 @@ } } } catch (IOException ex) { - errorCommon(ex); + errorCommon(ex, true); handleError(ex); } if (handshaking && !complete) @@ -374,7 +375,7 @@ outgoing(Utils.EMPTY_BB_LIST, true); } } catch (Throwable ex) { - errorCommon(ex); + errorCommon(ex, true); handleError(ex); } } @@ -580,7 +581,7 @@ writer.addData(HS_TRIGGER); } } catch (Throwable ex) { - errorCommon(ex); + errorCommon(ex, true); handleError(ex); } } @@ -607,7 +608,8 @@ private void handleError(Throwable t) { debug.log(Level.DEBUG, "handleError", t); - downReader.onError(t); + readerCF.completeExceptionally(t); + writerCF.completeExceptionally(t); // no-op if already completed alpnCF.completeExceptionally(t); reader.stop(); @@ -618,7 +620,12 @@ stopOnError(null); } - private Void stopOnError(Throwable t) { + boolean stopped = false; + + synchronized private Void stopOnError(Throwable t) { + if (stopped) + return null; + stopped = true; reader.stop(); writer.stop(); return null; diff -r a0cf7477d139 -r 03c89ed2070f src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java Tue Feb 27 16:44:39 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java Tue Feb 27 18:43:06 2018 +0000 @@ -92,7 +92,11 @@ public SubscriberWrapper() { this.outputQ = new ConcurrentLinkedQueue<>(); - this.cf = new MinimalFuture<>(); + this.cf = new MinimalFuture().whenComplete((v,t) -> + { + if (t != null) + errorCommon(t, false); + }); this.pushScheduler = SequentialScheduler.synchronizedScheduler(new DownstreamPusher()); this.downstreamSubscription = new SubscriptionBase(pushScheduler, @@ -255,7 +259,7 @@ try { run1(); } catch (Throwable t) { - errorCommon(t); + errorCommon(t, true); } } @@ -363,17 +367,18 @@ @Override public void onError(Throwable throwable) { logger.log(Level.DEBUG, () -> "onError: " + throwable); - errorCommon(Objects.requireNonNull(throwable)); + errorCommon(Objects.requireNonNull(throwable), true); } - protected boolean errorCommon(Throwable throwable) { + protected boolean errorCommon(Throwable throwable, boolean completecf) { assert throwable != null || (throwable = new AssertionError("null throwable")) != null; if (errorRef.compareAndSet(null, throwable)) { logger.log(Level.DEBUG, "error", throwable); pushScheduler.runOrSchedule(); upstreamCompleted = true; - cf.completeExceptionally(throwable); + if (completecf) + cf.completeExceptionally(throwable); return true; } return false; @@ -381,14 +386,18 @@ @Override public void close() { - errorCommon(new RuntimeException("wrapper closed")); + errorCommon(new RuntimeException("wrapper closed"), true); + } + + public void close(Throwable t) { + errorCommon(t, true); } private void incomingCaller(List l, boolean complete) { try { incoming(l, complete); } catch(Throwable t) { - errorCommon(t); + errorCommon(t, true); } }