--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Wed Jun 20 17:15:16 2018 +0200
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Wed Jun 20 09:05:57 2018 -0700
@@ -27,7 +27,6 @@
import java.io.EOFException;
import java.io.IOException;
-import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
@@ -42,7 +41,9 @@
import java.util.function.Consumer;
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.FlowTube.TubeSubscriber;
+import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
+import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.SequentialScheduler;
import jdk.internal.net.http.common.ConnectionExpiredException;
import jdk.internal.net.http.common.Utils;
@@ -166,6 +167,7 @@
= new ConcurrentLinkedDeque<>();
private final SequentialScheduler scheduler =
SequentialScheduler.synchronizedScheduler(this::flush);
+ final MinimalFuture<Void> whenFinished;
private final Executor executor;
private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber();
private final AtomicReference<Http1AsyncDelegate> pendingDelegateRef;
@@ -184,6 +186,7 @@
public Http1AsyncReceiver(Executor executor, Http1Exchange<?> owner) {
this.pendingDelegateRef = new AtomicReference<>();
this.executor = executor;
+ this.whenFinished = new MinimalFuture<>();
this.owner = owner;
this.client = owner.client;
}
@@ -261,6 +264,14 @@
}
}
+ private String describe() {
+ Http1Exchange<?> exchange = owner;
+ if (exchange != null) {
+ return String.valueOf(exchange.request());
+ }
+ return "<uri unavailable>";
+ }
+
/**
* Must be called from within the scheduler main loop.
* Handles any pending errors by calling delegate.onReadError().
@@ -284,6 +295,10 @@
+ "\t\t queue.isEmpty: " + queue.isEmpty());
scheduler.stop();
delegate.onReadError(x);
+ whenFinished.completeExceptionally(x);
+ if (Log.channel()) {
+ Log.logChannel("HTTP/1 read subscriber stopped for: {0}", describe());
+ }
if (stopRequested) {
// This is the special case where the subscriber
// has requested an illegal number of items.
@@ -464,27 +479,35 @@
// throw ConnectionExpiredException
// to try & force a retry of the request.
retry = false;
- ex = new ConnectionExpiredException(
- "subscription is finished", ex);
+ ex = new ConnectionExpiredException(ex);
}
}
error = ex;
}
+ }
final Throwable t = (recorded == null ? ex : recorded);
if (debug.on())
debug.log("recorded " + t + "\n\t delegate: " + delegate
+ "\t\t queue.isEmpty: " + queue.isEmpty(), ex);
+ if (Log.errors()) {
+ Log.logError("HTTP/1 read subscriber recorded error: {0} - {1}", describe(), t);
}
if (queue.isEmpty() || pendingDelegateRef.get() != null || stopRequested) {
// This callback is called from within the selector thread.
// Use an executor here to avoid doing the heavy lifting in the
// selector.
+ if (Log.errors()) {
+ Log.logError("HTTP/1 propagating recorded error: {0} - {1}", describe(), t);
+ }
scheduler.runOrSchedule(executor);
}
}
void stop() {
if (debug.on()) debug.log("stopping");
+ if (Log.channel() && !scheduler.isStopped()) {
+ Log.logChannel("HTTP/1 read subscriber stopped for {0}", describe());
+ }
scheduler.stop();
// make sure ref count is handled properly by
// closing the delegate.
@@ -492,6 +515,7 @@
if (previous != null) previous.close(error);
delegate = null;
owner = null;
+ whenFinished.complete(null);
}
/**
@@ -514,12 +538,18 @@
// supports being called multiple time.
// doesn't cancel the previous subscription, since that is
// most probably the same as the new subscription.
+ if (debug.on()) debug.log("Received onSubscribed from upstream");
+ if (Log.channel()) {
+ Log.logChannel("HTTP/1 read subscriber got subscription from {0}", describe());
+ }
assert this.subscription == null || dropped == false;
this.subscription = subscription;
dropped = false;
canRequestMore.set(true);
if (delegate != null) {
scheduler.runOrSchedule(executor);
+ } else {
+ if (debug.on()) debug.log("onSubscribe: read delegate not present yet");
}
}