--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Sun Nov 05 17:05:57 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Sun Nov 05 17:32:13 2017 +0000
@@ -25,26 +25,32 @@
package jdk.incubator.http;
+import java.io.EOFException;
import java.io.IOException;
+import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.net.URI;
-import jdk.incubator.http.HttpConnection.Mode;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Formatter;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Flow;
+import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.net.ssl.SSLEngine;
import jdk.incubator.http.internal.common.*;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+import jdk.incubator.http.internal.common.SequentialScheduler.SynchronizedRestartableTask;
+import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
import jdk.incubator.http.internal.frame.*;
import jdk.incubator.http.internal.hpack.Encoder;
import jdk.incubator.http.internal.hpack.Decoder;
@@ -83,11 +89,21 @@
* stream are provided by calling Stream.incoming().
*/
class Http2Connection {
+
+ static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
+ static final boolean DEBUG_HPACK = Utils.DEBUG_HPACK; // Revisit: temporary dev flag.
+ final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
+ final static System.Logger DEBUG_LOGGER =
+ Utils.getDebugLogger("Http2Connection"::toString, DEBUG);
+ private final System.Logger debugHpack =
+ Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
+ static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
+
/*
* ByteBuffer pooling strategy for HTTP/2 protocol:
*
* In general there are 4 points where ByteBuffers are used:
- * - incoming/outgoing frames from/to ByteBufers plus incoming/outgoing encrypted data
+ * - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing encrypted data
* in case of SSL connection.
*
* 1. Outgoing frames encoded to ByteBuffers.
@@ -123,10 +139,15 @@
{
// if preface is not sent, buffers data in the pending list
if (!prefaceSent) {
+ debug.log(Level.DEBUG, "Preface is not sent: buffering %d",
+ buf.get().remaining());
synchronized (this) {
if (!prefaceSent) {
if (pending == null) pending = new ArrayList<>();
pending.add(buf);
+ debug.log(Level.DEBUG, () -> "there are now "
+ + Utils.remaining(pending.toArray(new ByteBufferReference[0]))
+ + " bytes buffered waiting for preface to be sent");
return false;
}
}
@@ -143,13 +164,18 @@
this.pending = null;
if (pending != null) {
// flush pending data
+ debug.log(Level.DEBUG, () -> "Processing buffered data: "
+ + Utils.remaining(pending.toArray(new ByteBufferReference[0])));
for (ByteBufferReference b : pending) {
decoder.decode(b);
}
}
-
+ ByteBuffer b = buf.get();
// push the received buffer to the frames decoder.
- decoder.decode(buf);
+ if (b != EMPTY_TRIGGER) {
+ debug.log(Level.DEBUG, "Processing %d", buf.get().remaining());
+ decoder.decode(buf);
+ }
return true;
}
@@ -167,7 +193,9 @@
//-------------------------------------
final HttpConnection connection;
- private final HttpClientImpl client;
+ // only keep a strong reference to Http2ClientImpl, which only has
+ // a weak reference on HttpClientImpl, to avoid strong references
+ // from the selector thread to HttpClientImpl (via attachments).
private final Http2ClientImpl client2;
private final Map<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
private int nextstreamid;
@@ -186,7 +214,10 @@
*/
private final WindowController windowController = new WindowController();
private final FramesController framesController = new FramesController();
+ private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber();
final WindowUpdateSender windowUpdater;
+ private volatile Throwable cause;
+ private volatile Supplier<ByteBuffer> initial;
static final int DEFAULT_FRAME_SIZE = 16 * 1024;
@@ -199,7 +230,6 @@
int nextstreamid,
String key) {
this.connection = connection;
- this.client = client2.client();
this.client2 = client2;
this.nextstreamid = nextstreamid;
this.key = key;
@@ -209,102 +239,151 @@
this.serverSettings = SettingsFrame.getDefaultSettings();
this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
- this.windowUpdater = new ConnectionWindowUpdateSender(this, client.getReceiveBufferSize());
+ debugHpack.log(Level.DEBUG, () -> "For the record:" + super.toString());
+ debugHpack.log(Level.DEBUG, "Decoder created: %s", hpackIn);
+ debugHpack.log(Level.DEBUG, "Encoder created: %s", hpackOut);
+ this.windowUpdater = new ConnectionWindowUpdateSender(this, client().getReceiveBufferSize());
}
/**
* Case 1) Create from upgraded HTTP/1.1 connection.
- * Is ready to use. Will not be SSL. exchange is the Exchange
+ * Is ready to use. Can be SSL. exchange is the Exchange
* that initiated the connection, whose response will be delivered
* on a Stream.
*/
- Http2Connection(HttpConnection connection,
+ private Http2Connection(HttpConnection connection,
Http2ClientImpl client2,
Exchange<?> exchange,
- ByteBuffer initial)
+ Supplier<ByteBuffer> initial)
throws IOException, InterruptedException
{
this(connection,
client2,
3, // stream 1 is registered during the upgrade
keyFor(connection));
- assert !(connection instanceof SSLConnection);
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
Stream<?> initialStream = createStream(exchange);
initialStream.registerStream(1);
windowController.registerStream(1, getInitialSendWindowSize());
initialStream.requestSent();
+ // Upgrading:
+ // set callbacks before sending preface - makes sure anything that
+ // might be sent by the server will come our way.
+ this.initial = initial;
+ connectFlows(connection);
sendConnectionPreface();
- // start reading and writing
- // start reading
- AsyncConnection asyncConn = (AsyncConnection)connection;
- asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
- connection.configureMode(Mode.ASYNC); // set mode only AFTER setAsyncCallbacks to provide visibility.
- asyncReceive(ByteBufferReference.of(initial));
- asyncConn.startReading();
}
// async style but completes immediately
static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
Http2ClientImpl client2,
Exchange<?> exchange,
- ByteBuffer initial) {
+ Supplier<ByteBuffer> initial)
+ {
return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
}
+ // Requires TLS handshake. So, is really async
+ static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request,
+ Http2ClientImpl h2client) {
+ assert request.secure();
+ AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection)
+ HttpConnection.getConnection(request.getAddress(h2client.client()),
+ h2client.client(),
+ request,
+ HttpClient.Version.HTTP_2);
+
+ return connection.connectAsync()
+ .thenCompose(unused -> checkSSLConfig(connection))
+ .thenCompose(notused-> {
+ CompletableFuture<Http2Connection> cf = new CompletableFuture<>();
+ try {
+ Http2Connection hc = new Http2Connection(request, h2client, connection);
+ cf.complete(hc);
+ } catch (IOException e) {
+ cf.completeExceptionally(e);
+ }
+ return cf; } );
+ }
+
/**
* Cases 2) 3)
*
* request is request to be sent.
*/
- Http2Connection(HttpRequestImpl request, Http2ClientImpl h2client)
- throws IOException, InterruptedException
+ private Http2Connection(HttpRequestImpl request,
+ Http2ClientImpl h2client,
+ HttpConnection connection)
+ throws IOException
{
- this(HttpConnection.getConnection(request.getAddress(h2client.client()), h2client.client(), request, true),
- h2client,
- 1,
- keyFor(request.uri(), request.proxy(h2client.client())));
+ this(connection,
+ h2client,
+ 1,
+ keyFor(request.uri(), request.proxy(h2client.client())));
+
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
- // start reading
- AsyncConnection asyncConn = (AsyncConnection)connection;
- asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown, this::getReadBuffer);
- connection.connect();
- checkSSLConfig();
// safe to resume async reading now.
- asyncConn.enableCallback();
+ connectFlows(connection);
sendConnectionPreface();
}
+ private void connectFlows(HttpConnection connection) {
+ FlowTube tube = connection.getConnectionFlow();
+ // Connect the flow to our Http2TubeSubscriber:
+ // Using connection.publisher() here is a hack that
+ // allows us to continue calling connection.writeAsync()
+ // and connection.flushAsync() transparently.
+ // We will eventually need to implement our own publisher
+ // to write to the flow instead.
+ tube.connectFlows(connection.publisher(), // hack
+ subscriber);
+ }
+
+ final HttpClientImpl client() {
+ return client2.client();
+ }
+
/**
* Throws an IOException if h2 was not negotiated
*/
- private void checkSSLConfig() throws IOException {
- AbstractAsyncSSLConnection aconn = (AbstractAsyncSSLConnection)connection;
- SSLEngine engine = aconn.getEngine();
- String alpn = engine.getApplicationProtocol();
- if (alpn == null || !alpn.equals("h2")) {
- String msg;
- if (alpn == null) {
- Log.logSSL("ALPN not supported");
- msg = "ALPN not supported";
- } else switch (alpn) {
- case "":
- Log.logSSL("No ALPN returned");
- msg = "No ALPN negotiated";
- break;
- case "http/1.1":
- Log.logSSL("HTTP/1.1 ALPN returned");
- msg = "HTTP/1.1 ALPN returned";
- break;
- default:
- Log.logSSL("unknown ALPN returned");
- msg = "Unexpected ALPN: " + alpn;
- throw new IOException(msg);
+ private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) {
+ assert aconn.isSecure();
+
+ Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> {
+ CompletableFuture<Void> cf = new MinimalFuture<>();
+ SSLEngine engine = aconn.getEngine();
+ assert Objects.equals(alpn, engine.getApplicationProtocol());
+
+ DEBUG_LOGGER.log(Level.DEBUG, "checkSSLConfig: alpn: %s", alpn );
+
+ if (alpn == null || !alpn.equals("h2")) {
+ String msg;
+ if (alpn == null) {
+ Log.logSSL("ALPN not supported");
+ msg = "ALPN not supported";
+ } else {
+ switch (alpn) {
+ case "":
+ Log.logSSL(msg = "No ALPN negotiated");
+ break;
+ case "http/1.1":
+ Log.logSSL( msg = "HTTP/1.1 ALPN returned");
+ break;
+ default:
+ Log.logSSL(msg = "Unexpected ALPN: " + alpn);
+ cf.completeExceptionally(new IOException(msg));
+ }
+ }
+ cf.completeExceptionally(new ALPNException(msg, aconn));
+ return cf;
}
- throw new ALPNException(msg, aconn);
- }
+ cf.complete(null);
+ return cf;
+ };
+
+ return aconn.getALPN().thenCompose(checkAlpnCF);
}
static String keyFor(HttpConnection connection) {
@@ -322,7 +401,7 @@
String host;
int port;
- if (isProxy) {
+ if (proxy != null) {
host = proxy.getHostString();
port = proxy.getPort();
} else {
@@ -381,7 +460,11 @@
return words.stream().collect(Collectors.joining(" "));
}
- private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) {
+ private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder)
+ throws IOException
+ {
+ debugHpack.log(Level.DEBUG, "decodeHeaders(%s)", decoder);
+
boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
ByteBufferReference[] buffers = frame.getHeaderBlock();
@@ -390,7 +473,7 @@
}
}
- int getInitialSendWindowSize() {
+ final int getInitialSendWindowSize() {
return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
}
@@ -400,7 +483,7 @@
sendFrame(f);
}
- private ByteBufferPool readBufferPool = new ByteBufferPool();
+ private final ByteBufferPool readBufferPool = new ByteBufferPool();
// provides buffer to read data (default size)
public ByteBufferReference getReadBuffer() {
@@ -409,7 +492,8 @@
private final Object readlock = new Object();
- public void asyncReceive(ByteBufferReference buffer) {
+ long count;
+ public final void asyncReceive(ByteBufferReference buffer) {
// We don't need to read anything and
// we don't want to send anything back to the server
// until the connection preface has been sent.
@@ -419,11 +503,45 @@
// SettingsFrame sent by the server) before the connection
// preface is fully sent might result in the server
// sending a GOAWAY frame with 'invalid_preface'.
+ //
+ // Note: asyncReceive is only called from the Http2TubeSubscriber
+ // sequential scheduler. Only asyncReceive uses the readLock.
+ // Therefore synchronizing on the readlock here should be
+ // safe.
+ //
synchronized (readlock) {
try {
+ Supplier<ByteBuffer> bs = initial;
+ // ensure that we always handle the initial buffer first,
+ // if any.
+ if (bs != null) {
+ initial = null;
+ ByteBuffer b = bs.get();
+ if (b.hasRemaining()) {
+ long c = ++count;
+ debug.log(Level.DEBUG, () -> "H2 Receiving Initial("
+ + c +"): " + b.remaining());
+ framesController.processReceivedData(framesDecoder,
+ ByteBufferReference.of(b));
+ }
+ }
+ ByteBuffer b = buffer.get();
// the readlock ensures that the order of incoming buffers
// is preserved.
- framesController.processReceivedData(framesDecoder, buffer);
+ if (b == EMPTY_TRIGGER) {
+ debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER");
+ boolean prefaceSent = framesController.prefaceSent;
+ assert prefaceSent;
+ // call framesController.processReceivedData to potentially
+ // trigger the processing of all the data buffered there.
+ framesController.processReceivedData(framesDecoder, buffer);
+ debug.log(Level.DEBUG, "H2 processed buffered data");
+ } else {
+ long c = ++count;
+ debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining());
+ framesController.processReceivedData(framesDecoder, buffer);
+ debug.log(Level.DEBUG, "H2 processed(%d)", c);
+ }
} catch (Throwable e) {
String msg = Utils.stackTrace(e);
Log.logTrace(msg);
@@ -432,10 +550,20 @@
}
}
+ Throwable getRecordedCause() {
+ return cause;
+ }
void shutdown(Throwable t) {
+ debug.log(Level.DEBUG, () -> "Shutting down h2c: " + t);
+ if (closed == true) return;
+ synchronized (this) {
+ if (closed == true) return;
+ closed = true;
+ }
Log.logError(t);
- closed = true;
+ Throwable initialCause = this.cause;
+ if (initialCause == null) this.cause = t;
client2.deleteConnection(this);
List<Stream<?>> c = new LinkedList<>(streams.values());
for (Stream<?> s : c) {
@@ -457,8 +585,11 @@
if (frame instanceof MalformedFrame) {
Log.logError(((MalformedFrame) frame).getMessage());
if (streamid == 0) {
- protocolError(((MalformedFrame) frame).getErrorCode());
+ protocolError(((MalformedFrame) frame).getErrorCode(),
+ ((MalformedFrame) frame).getMessage());
} else {
+ debug.log(Level.DEBUG, () -> "Reset stream: "
+ + ((MalformedFrame) frame).getMessage());
resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
}
return;
@@ -476,6 +607,13 @@
if (stream == null) {
// Should never receive a frame with unknown stream id
+ if (frame instanceof HeaderFrame) {
+ // always decode the headers as they may affect
+ // connection-level HPACK decoding state
+ HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder());
+ decodeHeaders((HeaderFrame) frame, decoder);
+ }
+
// To avoid looping, an endpoint MUST NOT send a RST_STREAM in
// response to a RST_STREAM frame.
if (!(frame instanceof ResetFrame)) {
@@ -499,6 +637,11 @@
private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
throws IOException
{
+ // always decode the headers as they may affect connection-level HPACK
+ // decoding state
+ HeaderDecoder decoder = new LoggingHeaderDecoder(new HeaderDecoder());
+ decodeHeaders(pp, decoder);
+
HttpRequestImpl parentReq = parent.request;
int promisedStreamid = pp.getPromisedStream();
if (promisedStreamid != nextPushStream) {
@@ -507,8 +650,7 @@
} else {
nextPushStream += 2;
}
- HeaderDecoder decoder = new HeaderDecoder();
- decodeHeaders(pp, decoder);
+
HttpHeadersImpl headers = decoder.headers();
HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
@@ -549,7 +691,15 @@
}
void closeStream(int streamid) {
+ debug.log(Level.DEBUG, "Closed stream %d", streamid);
Stream<?> s = streams.remove(streamid);
+ if (s != null) {
+ // decrement the reference count on the HttpClientImpl
+ // to allow the SelectorManager thread to exit if no
+ // other operation is pending and the facade is no
+ // longer referenced.
+ client().unreference();
+ }
// ## Remove s != null. It is a hack for delayed cancellation,reset
if (s != null && !(s instanceof Stream.PushedStream)) {
// Since PushStreams have no request body, then they have no
@@ -579,9 +729,15 @@
private void protocolError(int errorCode)
throws IOException
{
+ protocolError(errorCode, null);
+ }
+
+ private void protocolError(int errorCode, String msg)
+ throws IOException
+ {
GoAwayFrame frame = new GoAwayFrame(0, errorCode);
sendFrame(frame);
- shutdown(new IOException("protocol error"));
+ shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg))));
}
private void handleSettings(SettingsFrame frame)
@@ -655,7 +811,8 @@
ByteBufferReference ref = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
Log.logFrames(sf, "OUT");
// send preface bytes and SettingsFrame together
- connection.write(ref.get());
+ connection.writeAsync(new ByteBufferReference[] {ref});
+ connection.flushAsync();
// mark preface sent.
framesController.markPrefaceSent();
Log.logTrace("PREFACE_BYTES sent");
@@ -669,6 +826,9 @@
// cause any pending data stored before the preface was sent to be
// flushed (see PrefaceController).
Log.logTrace("finished sending connection preface");
+ debug.log(Level.DEBUG, "Triggering processing of buffered data"
+ + " after sending connection preface");
+ subscriber.onNext(List.of(EMPTY_TRIGGER));
}
/**
@@ -682,22 +842,32 @@
/**
* Creates Stream with given id.
*/
- <T> Stream<T> createStream(Exchange<T> exchange) {
- Stream<T> stream = new Stream<>(client, this, exchange, windowController);
+ final <T> Stream<T> createStream(Exchange<T> exchange) {
+ Stream<T> stream = new Stream<>(client(), this, exchange, windowController);
return stream;
}
<T> Stream.PushedStream<?,T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
PushGroup<?,T> pg = parent.exchange.getPushGroup();
- return new Stream.PushedStream<>(pg, client, this, parent, pushEx);
+ return new Stream.PushedStream<>(pg, client(), this, parent, pushEx);
}
<T> void putStream(Stream<T> stream, int streamid) {
+ // increment the reference count on the HttpClientImpl
+ // to prevent the SelectorManager thread from exiting until
+ // the stream is closed.
+ client().reference();
streams.put(streamid, stream);
}
void deleteStream(int streamid) {
- streams.remove(streamid);
+ if (streams.remove(streamid) != null) {
+ // decrement the reference count on the HttpClientImpl
+ // to allow the SelectorManager thread to exit if no
+ // other operation is pending and the facade is no
+ // longer referenced.
+ client().unreference();
+ }
windowController.removeStream(streamid);
}
@@ -728,7 +898,7 @@
// There can be no concurrent access to this buffer as all access to this buffer
// and its content happen within a single critical code block section protected
// by the sendLock. / (see sendFrame())
- private ByteBufferPool headerEncodingPool = new ByteBufferPool();
+ private final ByteBufferPool headerEncodingPool = new ByteBufferPool();
private ByteBufferReference getHeaderBuffer(int maxFrameSize) {
ByteBufferReference ref = headerEncodingPool.get(maxFrameSize);
@@ -872,6 +1042,208 @@
}
}
+ /**
+ * Returns the TubeSubscriber for reading from the connection flow.
+ * @return the TubeSubscriber for reading from the connection flow.
+ */
+ TubeSubscriber subscriber() {
+ return subscriber;
+ }
+
+ /**
+ * A simple tube subscriber for reading from the connection flow.
+ */
+ final class Http2TubeSubscriber implements TubeSubscriber {
+ volatile Flow.Subscription subscription;
+ volatile boolean completed;
+ volatile boolean dropped;
+ volatile Throwable error;
+ final ConcurrentLinkedQueue<ByteBuffer> queue
+ = new ConcurrentLinkedQueue<>();
+ final SequentialScheduler scheduler = new SequentialScheduler(
+ new SynchronizedRestartableTask(this::processQueue));
+
+ final void processQueue() {
+ try {
+ while (!queue.isEmpty() && !scheduler.isStopped()) {
+ ByteBuffer buffer = queue.poll();
+ debug.log(Level.DEBUG,
+ "sending %d to Http2Connection.asyncReceive",
+ buffer.remaining());
+ asyncReceive(ByteBufferReference.of(buffer));
+ }
+ } catch (Throwable t) {
+ Throwable x = error;
+ if (x == null) error = t;
+ } finally {
+ Throwable x = error;
+ if (x != null) {
+ debug.log(Level.DEBUG, "Stopping scheduler", x);
+ scheduler.stop();
+ Http2Connection.this.shutdown(x);
+ }
+ }
+ }
+
+
+ public void onSubscribe(Flow.Subscription subscription) {
+ // supports being called multiple time.
+ // doesn't cancel the previous subscription, since that is
+ // most probably the same as the new subscription.
+ assert this.subscription == null || dropped == false;
+ this.subscription = subscription;
+ dropped = false;
+ // TODO FIXME: request(1) should be done by the delegate.
+ if (!completed) {
+ debug.log(Level.DEBUG, "onSubscribe: requesting Long.MAX_VALUE for reading");
+ subscription.request(Long.MAX_VALUE);
+ } else {
+ debug.log(Level.DEBUG, "onSubscribe: already completed");
+ }
+ }
+
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ debug.log(Level.DEBUG, () -> "onNext: got " + Utils.remaining(item)
+ + " bytes in " + item.size() + " buffers");
+ queue.addAll(item);
+ scheduler.deferOrSchedule(client().theExecutor());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ debug.log(Level.DEBUG, () -> "onError: " + throwable);
+ error = throwable;
+ completed = true;
+ scheduler.deferOrSchedule(client().theExecutor());
+ }
+
+ @Override
+ public void onComplete() {
+ debug.log(Level.DEBUG, "EOF");
+ error = new EOFException("EOF reached while reading");
+ completed = true;
+ scheduler.deferOrSchedule(client().theExecutor());
+ }
+
+ public void dropSubscription() {
+ debug.log(Level.DEBUG, "dropSubscription");
+ // we could probably set subscription to null here...
+ // then we might not need the 'dropped' boolean?
+ dropped = true;
+ }
+ }
+
+ @Override
+ public final String toString() {
+ return dbgString();
+ }
+
+ final String dbgString() {
+ return "Http2Connection("
+ + connection.getConnectionFlow() + ")";
+ }
+
+ final class LoggingHeaderDecoder extends HeaderDecoder {
+
+ private final HeaderDecoder delegate;
+ private final System.Logger debugHpack =
+ Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
+
+ LoggingHeaderDecoder(HeaderDecoder delegate) {
+ this.delegate = delegate;
+ }
+
+ String dbgString() {
+ return Http2Connection.this.dbgString() + "/LoggingHeaderDecoder";
+ }
+
+ @Override
+ public void onDecoded(CharSequence name, CharSequence value) {
+ delegate.onDecoded(name, value);
+ }
+
+ @Override
+ public void onIndexed(int index,
+ CharSequence name,
+ CharSequence value) {
+ debugHpack.log(Level.DEBUG, "onIndexed(%s, %s, %s)%n",
+ index, name, value);
+ delegate.onIndexed(index, name, value);
+ }
+
+ @Override
+ public void onLiteral(int index,
+ CharSequence name,
+ CharSequence value,
+ boolean valueHuffman) {
+ debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n",
+ index, name, value, valueHuffman);
+ delegate.onLiteral(index, name, value, valueHuffman);
+ }
+
+ @Override
+ public void onLiteral(CharSequence name,
+ boolean nameHuffman,
+ CharSequence value,
+ boolean valueHuffman) {
+ debugHpack.log(Level.DEBUG, "onLiteral(%s, %s, %s, %s)%n",
+ name, nameHuffman, value, valueHuffman);
+ delegate.onLiteral(name, nameHuffman, value, valueHuffman);
+ }
+
+ @Override
+ public void onLiteralNeverIndexed(int index,
+ CharSequence name,
+ CharSequence value,
+ boolean valueHuffman) {
+ debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n",
+ index, name, value, valueHuffman);
+ delegate.onLiteralNeverIndexed(index, name, value, valueHuffman);
+ }
+
+ @Override
+ public void onLiteralNeverIndexed(CharSequence name,
+ boolean nameHuffman,
+ CharSequence value,
+ boolean valueHuffman) {
+ debugHpack.log(Level.DEBUG, "onLiteralNeverIndexed(%s, %s, %s, %s)%n",
+ name, nameHuffman, value, valueHuffman);
+ delegate.onLiteralNeverIndexed(name, nameHuffman, value, valueHuffman);
+ }
+
+ @Override
+ public void onLiteralWithIndexing(int index,
+ CharSequence name,
+ CharSequence value,
+ boolean valueHuffman) {
+ debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n",
+ index, name, value, valueHuffman);
+ delegate.onLiteralWithIndexing(index, name, value, valueHuffman);
+ }
+
+ @Override
+ public void onLiteralWithIndexing(CharSequence name,
+ boolean nameHuffman,
+ CharSequence value,
+ boolean valueHuffman) {
+ debugHpack.log(Level.DEBUG, "onLiteralWithIndexing(%s, %s, %s, %s)%n",
+ name, nameHuffman, value, valueHuffman);
+ delegate.onLiteralWithIndexing(name, nameHuffman, value, valueHuffman);
+ }
+
+ @Override
+ public void onSizeUpdate(int capacity) {
+ debugHpack.log(Level.DEBUG, "onSizeUpdate(%s)%n", capacity);
+ delegate.onSizeUpdate(capacity);
+ }
+
+ @Override
+ HttpHeadersImpl headers() {
+ return delegate.headers();
+ }
+ }
+
static class HeaderDecoder implements DecodingCallback {
HttpHeadersImpl headers;