http-client-branch: Fixed SSLFlowDelegate to not call Subscriber.onError directly
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Tue Feb 27 16:44:39 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Tue Feb 27 18:43:06 2018 +0000
@@ -96,7 +96,8 @@
final CompletableFuture<String> alpnCF; // completes on initial handshake
final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
volatile boolean close_notify_received;
- volatile Flow.Subscriber<?> downReader;
+ final CompletableFuture<Void> readerCF;
+ final CompletableFuture<Void> writerCF;
static AtomicInteger scount = new AtomicInteger(1);
final int id;
@@ -117,13 +118,14 @@
this.engine = engine;
this.exec = exec;
this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
- CompletableFuture.anyOf(reader.completion(), writer.completion())
- .exceptionally(this::stopOnError);
+ this.readerCF = reader.completion();
+ this.writerCF = reader.completion();
+ readerCF.exceptionally(this::stopOnError);
+ readerCF.exceptionally(this::stopOnError);
CompletableFuture.allOf(reader.completion(), writer.completion())
.thenRun(this::normalStop);
this.alpnCF = new MinimalFuture<>();
- this.downReader = downReader;
// connect the Reader to the downReader and the
// Writer to the downWriter.
@@ -153,7 +155,6 @@
*/
void connect(Subscriber<? super List<ByteBuffer>> downReader,
Subscriber<? super List<ByteBuffer>> downWriter) {
- this.downReader = downReader;
this.reader.subscribe(downReader);
this.writer.subscribe(downWriter);
}
@@ -354,7 +355,7 @@
}
}
} catch (IOException ex) {
- errorCommon(ex);
+ errorCommon(ex, true);
handleError(ex);
}
if (handshaking && !complete)
@@ -374,7 +375,7 @@
outgoing(Utils.EMPTY_BB_LIST, true);
}
} catch (Throwable ex) {
- errorCommon(ex);
+ errorCommon(ex, true);
handleError(ex);
}
}
@@ -580,7 +581,7 @@
writer.addData(HS_TRIGGER);
}
} catch (Throwable ex) {
- errorCommon(ex);
+ errorCommon(ex, true);
handleError(ex);
}
}
@@ -607,7 +608,8 @@
private void handleError(Throwable t) {
debug.log(Level.DEBUG, "handleError", t);
- downReader.onError(t);
+ readerCF.completeExceptionally(t);
+ writerCF.completeExceptionally(t);
// no-op if already completed
alpnCF.completeExceptionally(t);
reader.stop();
@@ -618,7 +620,12 @@
stopOnError(null);
}
- private Void stopOnError(Throwable t) {
+ boolean stopped = false;
+
+ synchronized private Void stopOnError(Throwable t) {
+ if (stopped)
+ return null;
+ stopped = true;
reader.stop();
writer.stop();
return null;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java Tue Feb 27 16:44:39 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java Tue Feb 27 18:43:06 2018 +0000
@@ -92,7 +92,11 @@
public SubscriberWrapper()
{
this.outputQ = new ConcurrentLinkedQueue<>();
- this.cf = new MinimalFuture<>();
+ this.cf = new MinimalFuture<Void>().whenComplete((v,t) ->
+ {
+ if (t != null)
+ errorCommon(t, false);
+ });
this.pushScheduler =
SequentialScheduler.synchronizedScheduler(new DownstreamPusher());
this.downstreamSubscription = new SubscriptionBase(pushScheduler,
@@ -255,7 +259,7 @@
try {
run1();
} catch (Throwable t) {
- errorCommon(t);
+ errorCommon(t, true);
}
}
@@ -363,17 +367,18 @@
@Override
public void onError(Throwable throwable) {
logger.log(Level.DEBUG, () -> "onError: " + throwable);
- errorCommon(Objects.requireNonNull(throwable));
+ errorCommon(Objects.requireNonNull(throwable), true);
}
- protected boolean errorCommon(Throwable throwable) {
+ protected boolean errorCommon(Throwable throwable, boolean completecf) {
assert throwable != null ||
(throwable = new AssertionError("null throwable")) != null;
if (errorRef.compareAndSet(null, throwable)) {
logger.log(Level.DEBUG, "error", throwable);
pushScheduler.runOrSchedule();
upstreamCompleted = true;
- cf.completeExceptionally(throwable);
+ if (completecf)
+ cf.completeExceptionally(throwable);
return true;
}
return false;
@@ -381,14 +386,18 @@
@Override
public void close() {
- errorCommon(new RuntimeException("wrapper closed"));
+ errorCommon(new RuntimeException("wrapper closed"), true);
+ }
+
+ public void close(Throwable t) {
+ errorCommon(t, true);
}
private void incomingCaller(List<ByteBuffer> l, boolean complete) {
try {
incoming(l, complete);
} catch(Throwable t) {
- errorCommon(t);
+ errorCommon(t, true);
}
}