--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Mon Apr 16 13:57:06 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java Mon Apr 16 16:44:12 2018 +0100
@@ -41,6 +41,7 @@
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.FlowTube;
+import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.SequentialScheduler;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.Utils;
@@ -51,8 +52,7 @@
*/
class Http1Exchange<T> extends ExchangeImpl<T> {
- static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
- final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+ final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
final HttpRequestImpl request; // main request
final Http1Request requestAction;
private volatile Http1Response<T> response;
@@ -118,8 +118,8 @@
final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>();
private volatile Flow.Subscription subscription;
volatile boolean complete;
- private final System.Logger debug;
- Http1BodySubscriber(System.Logger debug) {
+ private final Logger debug;
+ Http1BodySubscriber(Logger debug) {
assert debug != null;
this.debug = debug;
}
@@ -128,8 +128,9 @@
static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0));
final void request(long n) {
- debug.log(Level.DEBUG, () ->
- "Http1BodySubscriber requesting " + n + ", from " + subscription);
+ if (debug.on())
+ debug.log("Http1BodySubscriber requesting %d, from %s",
+ n, subscription);
subscription.request(n);
}
@@ -147,12 +148,12 @@
subscription.cancel();
} catch(Throwable t) {
String msg = "Ignoring exception raised when canceling BodyPublisher subscription";
- debug.log(Level.DEBUG, "%s: %s", msg, t);
+ if (debug.on()) debug.log("%s: %s", msg, t);
Log.logError("{0}: {1}", msg, (Object)t);
}
}
- static Http1BodySubscriber completeSubscriber(System.Logger debug) {
+ static Http1BodySubscriber completeSubscriber(Logger debug) {
return new Http1BodySubscriber(debug) {
@Override public void onSubscribe(Flow.Subscription subscription) { error(); }
@Override public void onNext(ByteBuffer item) { error(); }
@@ -230,7 +231,7 @@
private void connectFlows(HttpConnection connection) {
FlowTube tube = connection.getConnectionFlow();
- debug.log(Level.DEBUG, "%s connecting flows", tube);
+ if (debug.on()) debug.log("%s connecting flows", tube);
// Connect the flow to our Http1TubeSubscriber:
// asyncReceiver.subscriber().
@@ -242,12 +243,12 @@
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
// create the response before sending the request headers, so that
// the response can set the appropriate receivers.
- debug.log(Level.DEBUG, "Sending headers only");
+ if (debug.on()) debug.log("Sending headers only");
if (response == null) {
response = new Http1Response<>(connection, this, asyncReceiver);
}
- debug.log(Level.DEBUG, "response created in advance");
+ if (debug.on()) debug.log("response created in advance");
// If the first attempt to read something triggers EOF, or
// IOException("channel reset by peer"), we're going to retry.
// Instruct the asyncReceiver to throw ConnectionExpiredException
@@ -256,7 +257,7 @@
CompletableFuture<Void> connectCF;
if (!connection.connected()) {
- debug.log(Level.DEBUG, "initiating connect async");
+ if (debug.on()) debug.log("initiating connect async");
connectCF = connection.connectAsync();
synchronized (lock) {
operations.add(connectCF);
@@ -272,18 +273,18 @@
try {
connectFlows(connection);
- debug.log(Level.DEBUG, "requestAction.headers");
+ if (debug.on()) debug.log("requestAction.headers");
List<ByteBuffer> data = requestAction.headers();
synchronized (lock) {
state = State.HEADERS;
}
- debug.log(Level.DEBUG, "setting outgoing with headers");
+ if (debug.on()) debug.log("setting outgoing with headers");
assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing;
appendToOutgoing(data);
cf.complete(null);
return cf;
} catch (Throwable t) {
- debug.log(Level.DEBUG, "Failed to send headers: %s", t);
+ if (debug.on()) debug.log("Failed to send headers: %s", t);
connection.close();
cf.completeExceptionally(t);
return cf;
@@ -332,10 +333,9 @@
+ request.timeout().get().getNano() / 1000000) : -1,
cause);
boolean acknowledged = cf.completeExceptionally(cause);
- debug.log(Level.DEBUG,
- () -> acknowledged
- ? ("completed response with " + cause)
- : ("response already completed, ignoring " + cause));
+ if (debug.on())
+ debug.log(acknowledged ? ("completed response with " + cause)
+ : ("response already completed, ignoring " + cause));
}
return cf;
}
@@ -446,8 +446,8 @@
CompletableFuture<?> cf = toComplete.poll();
exec.execute(() -> {
if (cf.completeExceptionally(x)) {
- debug.log(Level.DEBUG, "completed cf with %s",
- (Object) x);
+ if (debug.on())
+ debug.log("completed cf with %s", (Object) x);
}
});
}
@@ -484,7 +484,7 @@
}
private void appendToOutgoing(DataPair dp) {
- debug.log(Level.DEBUG, "appending to outgoing " + dp);
+ if (debug.on()) debug.log("appending to outgoing " + dp);
outgoing.add(dp);
writePublisher.writeScheduler.runOrSchedule();
}
@@ -497,10 +497,10 @@
private void requestMoreBody() {
try {
- debug.log(Level.DEBUG, "requesting more body from the subscriber");
+ if (debug.on()) debug.log("requesting more body from the subscriber");
bodySubscriber.request(1);
} catch (Throwable t) {
- debug.log(Level.DEBUG, "Subscription::request failed", t);
+ if (debug.on()) debug.log("Subscription::request failed", t);
cancelImpl(t);
bodySentCF.completeExceptionally(t);
}
@@ -531,13 +531,13 @@
case HEADERS:
state = State.BODY;
// completeAsync, since dependent tasks should run in another thread
- debug.log(Level.DEBUG, "initiating completion of headersSentCF");
+ if (debug.on()) debug.log("initiating completion of headersSentCF");
headersSentCF.completeAsync(() -> this, exec);
break;
case BODY:
if (dp.data == Http1BodySubscriber.COMPLETED) {
state = State.COMPLETING;
- debug.log(Level.DEBUG, "initiating completion of bodySentCF");
+ if (debug.on()) debug.log("initiating completion of bodySentCF");
bodySentCF.completeAsync(() -> this, exec);
} else {
exec.execute(this::requestMoreBody);
@@ -558,7 +558,7 @@
/** A Publisher of HTTP/1.1 headers and request body. */
final class Http1Publisher implements FlowTube.TubePublisher {
- final System.Logger debug = Utils.getDebugLogger(this::dbgString);
+ final Logger debug = Utils.getDebugLogger(this::dbgString);
volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
volatile boolean cancelled;
final Http1WriteSubscription subscription = new Http1WriteSubscription();
@@ -573,7 +573,7 @@
assert subscriber == null;
subscriber = s;
- debug.log(Level.DEBUG, "got subscriber: %s", s);
+ if (debug.on()) debug.log("got subscriber: %s", s);
s.onSubscribe(subscription);
}
@@ -593,17 +593,17 @@
@Override
public void run() {
assert state != State.COMPLETED : "Unexpected state:" + state;
- debug.log(Level.DEBUG, "WriteTask");
+ if (debug.on()) debug.log("WriteTask");
if (subscriber == null) {
- debug.log(Level.DEBUG, "no subscriber yet");
+ if (debug.on()) debug.log("no subscriber yet");
return;
}
- debug.log(Level.DEBUG, () -> "hasOutgoing = " + hasOutgoing());
+ if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing());
while (hasOutgoing() && demand.tryDecrement()) {
DataPair dp = getOutgoing();
if (dp.throwable != null) {
- debug.log(Level.DEBUG, "onError");
+ if (debug.on()) debug.log("onError");
// Do not call the subscriber's onError, it is not required.
writeScheduler.stop();
} else {
@@ -613,15 +613,15 @@
assert state == State.COMPLETING : "Unexpected state:" + state;
state = State.COMPLETED;
}
- debug.log(Level.DEBUG,
- "completed, stopping %s", writeScheduler);
+ if (debug.on())
+ debug.log("completed, stopping %s", writeScheduler);
writeScheduler.stop();
// Do nothing more. Just do not publish anything further.
// The next Subscriber will eventually take over.
} else {
- debug.log(Level.DEBUG, () ->
- "onNext with " + Utils.remaining(data) + " bytes");
+ if (debug.on())
+ debug.log("onNext with " + Utils.remaining(data) + " bytes");
subscriber.onNext(data);
}
}
@@ -636,14 +636,14 @@
if (cancelled)
return; //no-op
demand.increase(n);
- debug.log(Level.DEBUG,
- "subscription request(%d), demand=%s", n, demand);
+ if (debug.on())
+ debug.log("subscription request(%d), demand=%s", n, demand);
writeScheduler.runOrSchedule(client.theExecutor());
}
@Override
public void cancel() {
- debug.log(Level.DEBUG, "subscription cancelled");
+ if (debug.on()) debug.log("subscription cancelled");
if (cancelled)
return; //no-op
cancelled = true;