--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Tue Feb 27 19:26:25 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Wed Feb 28 15:48:46 2018 +0000
@@ -53,9 +53,6 @@
static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
- private static final System.Logger DEBUG_LOGGER =
- Utils.getDebugLogger("Http1Exchange"::toString, DEBUG);
-
final HttpRequestImpl request; // main request
final Http1Request requestAction;
private volatile Http1Response<T> response;
@@ -121,12 +118,17 @@
final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>();
private volatile Flow.Subscription subscription;
volatile boolean complete;
+ private final System.Logger debug;
+ Http1BodySubscriber(System.Logger debug) {
+ assert debug != null;
+ this.debug = debug;
+ }
/** Final sentinel in the stream of request body. */
static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0));
final void request(long n) {
- DEBUG_LOGGER.log(Level.DEBUG, () ->
+ debug.log(Level.DEBUG, () ->
"Http1BodySubscriber requesting " + n + ", from " + subscription);
subscription.request(n);
}
@@ -141,11 +143,17 @@
}
final void cancelSubscription() {
- subscription.cancel();
+ try {
+ subscription.cancel();
+ } catch(Throwable t) {
+ String msg = "Ignoring exception raised when canceling BodyPublisher subscription";
+ debug.log(Level.DEBUG, "%s: %s", msg, t);
+ Log.logError("{0}: {1}", msg, (Object)t);
+ }
}
- static Http1BodySubscriber completeSubscriber() {
- return new Http1BodySubscriber() {
+ static Http1BodySubscriber completeSubscriber(System.Logger debug) {
+ return new Http1BodySubscriber(debug) {
@Override public void onSubscribe(Flow.Subscription subscription) { error(); }
@Override public void onNext(ByteBuffer item) { error(); }
@Override public void onError(Throwable throwable) { error(); }
@@ -285,15 +293,15 @@
try {
bodySubscriber = requestAction.continueRequest();
if (bodySubscriber == null) {
- bodySubscriber = Http1BodySubscriber.completeSubscriber();
+ bodySubscriber = Http1BodySubscriber.completeSubscriber(debug);
appendToOutgoing(Http1BodySubscriber.COMPLETED);
} else {
// start
- bodySubscriber.whenSubscribed.thenAccept(
- (s) -> bodySubscriber.request(1));
+ bodySubscriber.whenSubscribed
+ .thenAccept((s) -> requestMoreBody());
}
} catch (Throwable t) {
- connection.close();
+ cancelImpl(t);
bodySentCF.completeExceptionally(t);
}
return bodySentCF;
@@ -385,9 +393,11 @@
private void cancelImpl(Throwable cause) {
LinkedList<CompletableFuture<?>> toComplete = null;
int count = 0;
+ Throwable error;
synchronized (lock) {
- if (failed == null)
- failed = cause;
+ if ((error = failed) == null) {
+ failed = error = cause;
+ }
if (requestAction != null && requestAction.finished()
&& response != null && response.finished()) {
return;
@@ -426,12 +436,13 @@
Executor exec = client.isSelectorThread()
? executor
: this::runInline;
+ Throwable x = error;
while (!toComplete.isEmpty()) {
CompletableFuture<?> cf = toComplete.poll();
exec.execute(() -> {
- if (cf.completeExceptionally(cause)) {
+ if (cf.completeExceptionally(x)) {
debug.log(Level.DEBUG, "completed cf with %s",
- (Object) cause);
+ (Object) x);
}
});
}
@@ -479,6 +490,17 @@
return !outgoing.isEmpty();
}
+ private void requestMoreBody() {
+ try {
+ debug.log(Level.DEBUG, "requesting more body from the subscriber");
+ bodySubscriber.request(1);
+ } catch (Throwable t) {
+ debug.log(Level.DEBUG, "Subscription::request failed", t);
+ cancelImpl(t);
+ bodySentCF.completeExceptionally(t);
+ }
+ }
+
// Invoked only by the publisher
// ALL tasks should execute off the Selector-Manager thread
/** Returns the next portion of the HTTP request, or the error. */
@@ -513,8 +535,7 @@
debug.log(Level.DEBUG, "initiating completion of bodySentCF");
bodySentCF.completeAsync(() -> this, exec);
} else {
- debug.log(Level.DEBUG, "requesting more body from the subscriber");
- exec.execute(() -> bodySubscriber.request(1));
+ exec.execute(this::requestMoreBody);
}
break;
case INITIAL: