--- a/jdk/src/java.httpclient/share/classes/java/net/http/Http2Connection.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/Http2Connection.java Sat Apr 30 00:30:31 2016 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -24,42 +24,767 @@
package java.net.http;
import java.io.IOException;
-import java.net.Authenticator;
-import java.net.CookieManager;
-import java.net.ProxySelector;
+import java.net.InetSocketAddress;
import java.net.URI;
-import static java.net.http.Utils.BUFSIZE;
+import java.net.http.HttpConnection.Mode;
import java.nio.ByteBuffer;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import static java.nio.channels.SelectionKey.OP_CONNECT;
-import static java.nio.channels.SelectionKey.OP_READ;
-import static java.nio.channels.SelectionKey.OP_WRITE;
-import java.nio.channels.Selector;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.*;
-import java.security.NoSuchAlgorithmException;
-import java.util.ListIterator;
-import java.util.Optional;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLParameters;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import sun.net.httpclient.hpack.Encoder;
+import sun.net.httpclient.hpack.Decoder;
+import static java.net.http.SettingsFrame.*;
+import static java.net.http.Utils.BUFSIZE;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Formatter;
+import java.util.stream.Collectors;
+import sun.net.httpclient.hpack.DecodingCallback;
+
+/**
+ * An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
+ * over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
+ *
+ * Http2Connections belong to a Http2ClientImpl, (one of) which belongs
+ * to a HttpClientImpl.
+ *
+ * Creation cases:
+ * 1) upgraded HTTP/1.1 plain tcp connection
+ * 2) prior knowledge directly created plain tcp connection
+ * 3) directly created HTTP/2 SSL connection which uses ALPN.
+ *
+ * Sending is done by writing directly to underlying HttpConnection object which
+ * is operating in async mode. No flow control applies on output at this level
+ * and all writes are just executed as puts to an output Q belonging to HttpConnection
+ * Flow control is implemented by HTTP/2 protocol itself.
+ *
+ * Hpack header compression
+ * and outgoing stream creation is also done here, because these operations
+ * must be synchronized at the socket level. Stream objects send frames simply
+ * by placing them on the connection's output Queue. sendFrame() is called
+ * from a higher level (Stream) thread.
+ *
+ * asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
+ * incoming Http2Frames, and directs them to the appropriate Stream.incoming()
+ * or handles them directly itself. This thread performs hpack decompression
+ * and incoming stream creation (Server push). Incoming frames destined for a
+ * stream are provided by calling Stream.incoming().
+ */
+class Http2Connection implements BufferHandler {
+
+ final Queue<Http2Frame> outputQ;
+ volatile boolean closed;
+
+ //-------------------------------------
+ final HttpConnection connection;
+ HttpClientImpl client;
+ final Http2ClientImpl client2;
+ Map<Integer,Stream> streams;
+ int nextstreamid = 3; // stream 1 is registered separately
+ int nextPushStream = 2;
+ Encoder hpackOut;
+ Decoder hpackIn;
+ SettingsFrame clientSettings, serverSettings;
+ ByteBufferConsumer bbc;
+ final LinkedList<ByteBuffer> freeList;
+ final String key; // for HttpClientImpl.connections map
+ FrameReader reader;
+
+ // Connection level flow control windows
+ int sendWindow = INITIAL_WINDOW_SIZE;
+
+ final static int DEFAULT_FRAME_SIZE = 16 * 1024;
+ private static ByteBuffer[] empty = Utils.EMPTY_BB_ARRAY;
+
+ final ExecutorWrapper executor;
+
+ /**
+ * This is established by the protocol spec and the peer will update it with
+ * WINDOW_UPDATEs, which affects the sendWindow.
+ */
+ final static int INITIAL_WINDOW_SIZE = 64 * 1024 - 1;
+
+ // TODO: need list of control frames from other threads
+ // that need to be sent
+
+ /**
+ * Case 1) Create from upgraded HTTP/1.1 connection.
+ * Is ready to use. Will not be SSL. exchange is the Exchange
+ * that initiated the connection, whose response will be delivered
+ * on a Stream.
+ */
+ Http2Connection(HttpConnection connection, Http2ClientImpl client2,
+ Exchange exchange) throws IOException, InterruptedException {
+ this.outputQ = new Queue<>();
+ String msg = "Connection send window size " + Integer.toString(sendWindow);
+ Log.logTrace(msg);
+
+ //this.initialExchange = exchange;
+ assert !(connection instanceof SSLConnection);
+ this.connection = connection;
+ this.client = client2.client();
+ this.client2 = client2;
+ this.executor = client.executorWrapper();
+ this.freeList = new LinkedList<>();
+ this.key = keyFor(connection);
+ streams = Collections.synchronizedMap(new HashMap<>());
+ initCommon();
+ //sendConnectionPreface();
+ Stream initialStream = createStream(exchange);
+ initialStream.registerStream(1);
+ initialStream.requestSent();
+ sendConnectionPreface();
+ connection.configureMode(Mode.ASYNC);
+ // start reading and writing
+ // start reading
+ AsyncConnection asyncConn = (AsyncConnection)connection;
+ asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
+ asyncReceive(connection.getRemaining());
+ asyncConn.startReading();
+ }
+
+ // async style but completes immediately
+ static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
+ Http2ClientImpl client2, Exchange exchange) {
+ CompletableFuture<Http2Connection> cf = new CompletableFuture<>();
+ try {
+ Http2Connection c = new Http2Connection(connection, client2, exchange);
+ cf.complete(c);
+ } catch (IOException | InterruptedException e) {
+ cf.completeExceptionally(e);
+ }
+ return cf;
+ }
+
+ /**
+ * Cases 2) 3)
+ *
+ * request is request to be sent.
+ */
+ Http2Connection(HttpRequestImpl request) throws IOException, InterruptedException {
+ InetSocketAddress proxy = request.proxy();
+ URI uri = request.uri();
+ InetSocketAddress addr = Utils.getAddress(request);
+ String msg = "Connection send window size " + Integer.toString(sendWindow);
+ Log.logTrace(msg);
+ this.key = keyFor(uri, proxy);
+ this.connection = HttpConnection.getConnection(addr, request, this);
+ streams = Collections.synchronizedMap(new HashMap<>());
+ this.client = request.client();
+ this.client2 = client.client2();
+ this.executor = client.executorWrapper();
+ this.freeList = new LinkedList<>();
+ this.outputQ = new Queue<>();
+ nextstreamid = 1;
+ initCommon();
+ connection.connect();
+ connection.configureMode(Mode.ASYNC);
+ // start reading
+ AsyncConnection asyncConn = (AsyncConnection)connection;
+ asyncConn.setAsyncCallbacks(this::asyncReceive, this::shutdown);
+ sendConnectionPreface();
+ asyncConn.startReading();
+ }
+
+ // NEW
+ synchronized void obtainSendWindow(int amount) throws InterruptedException {
+ while (amount > 0) {
+ int n = Math.min(amount, sendWindow);
+ sendWindow -= n;
+ amount -= n;
+ if (amount > 0)
+ wait();
+ }
+ }
+
+ synchronized void updateSendWindow(int amount) {
+ if (sendWindow == 0) {
+ sendWindow += amount;
+ notifyAll();
+ } else
+ sendWindow += amount;
+ }
+
+ synchronized int sendWindow() {
+ return sendWindow;
+ }
+
+ static String keyFor(HttpConnection connection) {
+ boolean isProxy = connection.isProxied();
+ boolean isSecure = connection.isSecure();
+ InetSocketAddress addr = connection.address();
+
+ return keyString(isSecure, isProxy, addr.getHostString(), addr.getPort());
+ }
+
+ static String keyFor(URI uri, InetSocketAddress proxy) {
+ boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
+ boolean isProxy = proxy != null;
+
+ String host;
+ int port;
+
+ if (isProxy) {
+ host = proxy.getHostString();
+ port = proxy.getPort();
+ } else {
+ host = uri.getHost();
+ port = uri.getPort();
+ }
+ return keyString(isSecure, isProxy, host, port);
+ }
+
+ // {C,S}:{H:P}:host:port
+ // C indicates clear text connection "http"
+ // S indicates secure "https"
+ // H indicates host (direct) connection
+ // P indicates proxy
+ // Eg: "S:H:foo.com:80"
+ static String keyString(boolean secure, boolean proxy, String host, int port) {
+ char c1 = secure ? 'S' : 'C';
+ char c2 = proxy ? 'P' : 'H';
+
+ StringBuilder sb = new StringBuilder(128);
+ sb.append(c1).append(':').append(c2).append(':')
+ .append(host).append(':').append(port);
+ return sb.toString();
+ }
+
+ String key() {
+ return this.key;
+ }
+
+ void putConnection() {
+ client2.putConnection(this);
+ }
+
+ private static String toHexdump1(ByteBuffer bb) {
+ bb.mark();
+ StringBuilder sb = new StringBuilder(512);
+ Formatter f = new Formatter(sb);
+
+ while (bb.hasRemaining()) {
+ int i = Byte.toUnsignedInt(bb.get());
+ f.format("%02x:", i);
+ }
+ sb.deleteCharAt(sb.length()-1);
+ bb.reset();
+ return sb.toString();
+ }
+
+ private static String toHexdump(ByteBuffer bb) {
+ List<String> words = new ArrayList<>();
+ int i = 0;
+ bb.mark();
+ while (bb.hasRemaining()) {
+ if (i % 2 == 0) {
+ words.add("");
+ }
+ byte b = bb.get();
+ String hex = Integer.toHexString(256 + Byte.toUnsignedInt(b)).substring(1);
+ words.set(i / 2, words.get(i / 2) + hex);
+ i++;
+ }
+ bb.reset();
+ return words.stream().collect(Collectors.joining(" "));
+ }
+
+ private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder) {
+ boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
+
+ ByteBuffer[] buffers = frame.getHeaderBlock();
+ for (int i = 0; i < buffers.length; i++) {
+ hpackIn.decode(buffers[i], endOfHeaders, decoder);
+ }
+ }
+
+ int getInitialSendWindowSize() {
+ return serverSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE);
+ }
+
+ void close() {
+ GoAwayFrame f = new GoAwayFrame();
+ f.setDebugData("Requested by user".getBytes());
+ // TODO: set last stream. For now zero ok.
+ sendFrame(f);
+ }
+
+ // BufferHandler methods
+
+ @Override
+ public ByteBuffer getBuffer(int n) {
+ return client.getBuffer(n);
+ }
+
+ @Override
+ public void returnBuffer(ByteBuffer buf) {
+ client.returnBuffer(buf);
+ }
-class Http2Connection {
- static CompletableFuture<Http2Connection> createAsync(
- HttpConnection connection, Http2ClientImpl client2, Exchange exchange) {
- return null;
+ @Override
+ public void setMinBufferSize(int n) {
+ client.setMinBufferSize(n);
+ }
+
+ private final Object readlock = new Object();
+
+ void asyncReceive(ByteBuffer buffer) {
+ synchronized (readlock) {
+ try {
+ if (reader == null) {
+ reader = new FrameReader(buffer);
+ } else {
+ reader.input(buffer);
+ }
+ while (true) {
+ if (reader.haveFrame()) {
+ List<ByteBuffer> buffers = reader.frame();
+
+ ByteBufferConsumer bbc = new ByteBufferConsumer(buffers, this::getBuffer);
+ processFrame(bbc);
+ if (bbc.consumed()) {
+ reader = new FrameReader();
+ return;
+ } else {
+ reader = new FrameReader(reader);
+ }
+ } else
+ return;
+ }
+ } catch (Throwable e) {
+ String msg = Utils.stackTrace(e);
+ Log.logTrace(msg);
+ shutdown(e);
+ }
+ }
+ }
+
+ void shutdown(Throwable t) {
+ System.err.println("Shutdown: " + t);
+ t.printStackTrace();
+ closed = true;
+ client2.deleteConnection(this);
+ Collection<Stream> c = streams.values();
+ for (Stream s : c) {
+ s.cancelImpl(t);
+ }
+ connection.close();
+ }
+
+ /**
+ * Handles stream 0 (common) frames that apply to whole connection and passes
+ * other stream specific frames to that Stream object.
+ *
+ * Invokes Stream.incoming() which is expected to process frame without
+ * blocking.
+ */
+ void processFrame(ByteBufferConsumer bbc) throws IOException, InterruptedException {
+ Http2Frame frame = Http2Frame.readIncoming(bbc);
+ Log.logFrames(frame, "IN");
+ int streamid = frame.streamid();
+ if (streamid == 0) {
+ handleCommonFrame(frame);
+ } else {
+ Stream stream = getStream(streamid);
+ if (stream == null) {
+ // should never receive a frame with unknown stream id
+ resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
+ }
+ if (frame instanceof PushPromiseFrame) {
+ PushPromiseFrame pp = (PushPromiseFrame)frame;
+ handlePushPromise(stream, pp);
+ } else if (frame instanceof HeaderFrame) {
+ // decode headers (or continuation)
+ decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
+ stream.incoming(frame);
+ } else
+ stream.incoming(frame);
+ }
+ }
+
+ private void handlePushPromise(Stream parent, PushPromiseFrame pp)
+ throws IOException, InterruptedException {
+
+ HttpRequestImpl parentReq = parent.request;
+ int promisedStreamid = pp.getPromisedStream();
+ if (promisedStreamid != nextPushStream) {
+ resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
+ return;
+ } else {
+ nextPushStream += 2;
+ }
+ HeaderDecoder decoder = new HeaderDecoder();
+ decodeHeaders(pp, decoder);
+ HttpHeadersImpl headers = decoder.headers();
+ HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
+
+ Stream.PushedStream pushStream = createPushStream(parent, pushReq);
+ pushStream.registerStream(promisedStreamid);
+ parent.incoming_pushPromise(pushReq, pushStream);
+ }
+
+ private void handleCommonFrame(Http2Frame frame)
+ throws IOException, InterruptedException {
+
+ switch (frame.type()) {
+ case SettingsFrame.TYPE:
+ { SettingsFrame f = (SettingsFrame)frame;
+ handleSettings(f);}
+ break;
+ case PingFrame.TYPE:
+ { PingFrame f = (PingFrame)frame;
+ handlePing(f);}
+ break;
+ case GoAwayFrame.TYPE:
+ { GoAwayFrame f = (GoAwayFrame)frame;
+ handleGoAway(f);}
+ break;
+ case WindowUpdateFrame.TYPE:
+ { WindowUpdateFrame f = (WindowUpdateFrame)frame;
+ handleWindowUpdate(f);}
+ break;
+ default:
+ protocolError(ErrorFrame.PROTOCOL_ERROR);
+ }
+ }
+
+ void resetStream(int streamid, int code) throws IOException, InterruptedException {
+ Log.logError(
+ "Resetting stream {0,number,integer} with error code {1,number,integer}",
+ streamid, code);
+ ResetFrame frame = new ResetFrame();
+ frame.streamid(streamid);
+ frame.setErrorCode(code);
+ sendFrame(frame);
+ streams.remove(streamid);
+ }
+
+ private void handleWindowUpdate(WindowUpdateFrame f)
+ throws IOException, InterruptedException {
+ updateSendWindow(f.getUpdate());
+ }
+
+ private void protocolError(int errorCode)
+ throws IOException, InterruptedException {
+ GoAwayFrame frame = new GoAwayFrame();
+ frame.setErrorCode(errorCode);
+ sendFrame(frame);
+ String msg = "Error code: " + errorCode;
+ shutdown(new IOException("protocol error"));
+ }
+
+ private void handleSettings(SettingsFrame frame)
+ throws IOException, InterruptedException {
+ if (frame.getFlag(SettingsFrame.ACK)) {
+ // ignore ack frames for now.
+ return;
+ }
+ serverSettings = frame;
+ SettingsFrame ack = getAckFrame(frame.streamid());
+ sendFrame(ack);
+ }
+
+ private void handlePing(PingFrame frame)
+ throws IOException, InterruptedException {
+ frame.setFlag(PingFrame.ACK);
+ sendFrame(frame);
+ }
+
+ private void handleGoAway(GoAwayFrame frame)
+ throws IOException, InterruptedException {
+ //System.err.printf("GoAWAY: %s\n", ErrorFrame.stringForCode(frame.getErrorCode()));
+ shutdown(new IOException("GOAWAY received"));
+ }
+
+ private void initCommon() {
+ clientSettings = client2.getClientSettings();
+
+ // serverSettings will be updated by server
+ serverSettings = SettingsFrame.getDefaultSettings();
+ hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
+ hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
+ }
+
+ /**
+ * Max frame size we are allowed to send
+ */
+ public int getMaxSendFrameSize() {
+ int param = serverSettings.getParameter(MAX_FRAME_SIZE);
+ if (param == -1) {
+ param = DEFAULT_FRAME_SIZE;
+ }
+ return param;
+ }
+
+ /**
+ * Max frame size we will receive
+ */
+ public int getMaxReceiveFrameSize() {
+ return clientSettings.getParameter(MAX_FRAME_SIZE);
+ }
+
+ // Not sure how useful this is.
+ public int getMaxHeadersSize() {
+ return serverSettings.getParameter(MAX_HEADER_LIST_SIZE);
+ }
+
+ private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
+
+ private static final byte[] PREFACE_BYTES =
+ CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
+
+ /**
+ * Sends Connection preface and Settings frame with current preferred
+ * values
+ */
+ private void sendConnectionPreface() throws IOException {
+ ByteBufferGenerator bg = new ByteBufferGenerator(this);
+ bg.getBuffer(PREFACE_BYTES.length).put(PREFACE_BYTES);
+ ByteBuffer[] ba = bg.getBufferArray();
+ connection.write(ba, 0, ba.length);
+
+ bg = new ByteBufferGenerator(this);
+ SettingsFrame sf = client2.getClientSettings();
+ Log.logFrames(sf, "OUT");
+ sf.writeOutgoing(bg);
+ WindowUpdateFrame wup = new WindowUpdateFrame();
+ wup.streamid(0);
+ // send a Window update for the receive buffer we are using
+ // minus the initial 64 K specified in protocol
+ wup.setUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1));
+ wup.computeLength();
+ wup.writeOutgoing(bg);
+ Log.logFrames(wup, "OUT");
+ ba = bg.getBufferArray();
+ connection.write(ba, 0, ba.length);
+ }
+
+ /**
+ * Returns an existing Stream with given id, or null if doesn't exist
+ */
+ Stream getStream(int streamid) {
+ return streams.get(streamid);
+ }
+
+ /**
+ * Creates Stream with given id.
+ */
+ Stream createStream(Exchange exchange) {
+ Stream stream = new Stream(client, this, exchange);
+ return stream;
+ }
+
+ Stream.PushedStream createPushStream(Stream parent, HttpRequestImpl pushReq) {
+ Stream.PushGroup<?> pg = parent.request.pushGroup();
+ return new Stream.PushedStream(pg, client, this, parent, pushReq);
+ }
+
+ void putStream(Stream stream, int streamid) {
+ streams.put(streamid, stream);
+ }
+
+ void deleteStream(Stream stream) {
+ streams.remove(stream.streamid);
+ }
+
+ static final int MAX_STREAM = Integer.MAX_VALUE - 2;
+
+ // Number of header bytes in a Headers Frame
+ final static int HEADERS_HEADER_SIZE = 15;
+
+ // Number of header bytes in a Continuation frame
+ final static int CONTIN_HEADER_SIZE = 9;
+
+ /**
+ * Encode the headers into a List<ByteBuffer> and then create HEADERS
+ * and CONTINUATION frames from the list and return the List<Http2Frame>.
+ *
+ * @param frame
+ * @return
+ */
+ private LinkedList<Http2Frame> encodeHeaders(OutgoingHeaders frame) {
+ LinkedList<ByteBuffer> buffers = new LinkedList<>();
+ ByteBuffer buf = getBuffer();
+ buffers.add(buf);
+ encodeHeadersImpl(frame.stream.getRequestPseudoHeaders(), buffers);
+ encodeHeadersImpl(frame.getUserHeaders(), buffers);
+ encodeHeadersImpl(frame.getSystemHeaders(), buffers);
+
+ for (ByteBuffer b : buffers) {
+ b.flip();
}
- Http2Connection(HttpConnection connection, Http2ClientImpl client2,
- Exchange exchange) throws IOException, InterruptedException {
+ LinkedList<Http2Frame> frames = new LinkedList<>();
+ int maxframesize = getMaxSendFrameSize();
+
+ HeadersFrame oframe = new HeadersFrame();
+ oframe.setFlags(frame.getFlags());
+ oframe.streamid(frame.streamid());
+
+ oframe.setHeaderBlock(getBufferArray(buffers, maxframesize));
+ frames.add(oframe);
+ // Any buffers left?
+ boolean done = buffers.isEmpty();
+ if (done) {
+ oframe.setFlag(HeaderFrame.END_HEADERS);
+ } else {
+ ContinuationFrame cf = null;
+ while (!done) {
+ cf = new ContinuationFrame();
+ cf.streamid(frame.streamid());
+ cf.setHeaderBlock(getBufferArray(buffers, maxframesize));
+ frames.add(cf);
+ done = buffers.isEmpty();
+ }
+ cf.setFlag(HeaderFrame.END_HEADERS);
+ }
+ return frames;
+ }
+
+ // should always return at least one buffer
+ private static ByteBuffer[] getBufferArray(LinkedList<ByteBuffer> list, int maxsize) {
+ assert maxsize >= BUFSIZE;
+ LinkedList<ByteBuffer> newlist = new LinkedList<>();
+ int size = list.size();
+ int nbytes = 0;
+ for (int i=0; i<size; i++) {
+ ByteBuffer buf = list.getFirst();
+ if (nbytes + buf.remaining() <= maxsize) {
+ nbytes += buf.remaining();
+ newlist.add(buf);
+ list.remove();
+ } else {
+ break;
+ }
+ }
+ return newlist.toArray(empty);
+ }
+
+ /**
+ * Encode all the headers from the given HttpHeadersImpl into the given List.
+ */
+ private void encodeHeadersImpl(HttpHeaders hdrs, LinkedList<ByteBuffer> buffers) {
+ ByteBuffer buffer;
+ if (!(buffer = buffers.getLast()).hasRemaining()) {
+ buffer = getBuffer();
+ buffers.add(buffer);
+ }
+ for (Map.Entry<String, List<String>> e : hdrs.map().entrySet()) {
+ String key = e.getKey();
+ String lkey = key.toLowerCase();
+ List<String> values = e.getValue();
+ for (String value : values) {
+ hpackOut.header(lkey, value);
+ boolean encoded = false;
+ do {
+ encoded = hpackOut.encode(buffer);
+ if (!encoded) {
+ buffer = getBuffer();
+ buffers.add(buffer);
+ }
+ } while (!encoded);
+ }
+ }
+ }
+
+ public void sendFrames(List<Http2Frame> frames) throws IOException, InterruptedException {
+ for (Http2Frame frame : frames) {
+ sendFrame(frame);
+ }
}
- Stream getStream(int i) {return null;}
- Stream createStream(Exchange ex) {return null;}
- void putConnection() {}
+ static Throwable getExceptionFrom(CompletableFuture<?> cf) {
+ try {
+ cf.get();
+ return null;
+ } catch (Throwable e) {
+ if (e.getCause() != null)
+ return e.getCause();
+ else
+ return e;
+ }
+ }
+
+
+ void execute(Runnable r) {
+ executor.execute(r, null);
+ }
+
+ private final Object sendlock = new Object();
+
+ /**
+ *
+ */
+ void sendFrame(Http2Frame frame) {
+ synchronized (sendlock) {
+ try {
+ if (frame instanceof OutgoingHeaders) {
+ OutgoingHeaders oh = (OutgoingHeaders) frame;
+ Stream stream = oh.getStream();
+ stream.registerStream(nextstreamid);
+ oh.streamid(nextstreamid);
+ nextstreamid += 2;
+ // set outgoing window here. This allows thread sending
+ // body to proceed.
+ stream.updateOutgoingWindow(getInitialSendWindowSize());
+ LinkedList<Http2Frame> frames = encodeHeaders(oh);
+ for (Http2Frame f : frames) {
+ sendOneFrame(f);
+ }
+ } else {
+ sendOneFrame(frame);
+ }
+
+ } catch (IOException e) {
+ if (!closed) {
+ Log.logError(e);
+ shutdown(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Send a frame.
+ *
+ * @param frame
+ * @throws IOException
+ */
+ private void sendOneFrame(Http2Frame frame) throws IOException {
+ ByteBufferGenerator bbg = new ByteBufferGenerator(this);
+ frame.computeLength();
+ Log.logFrames(frame, "OUT");
+ frame.writeOutgoing(bbg);
+ ByteBuffer[] currentBufs = bbg.getBufferArray();
+ connection.write(currentBufs, 0, currentBufs.length);
+ }
+
+
+ private SettingsFrame getAckFrame(int streamid) {
+ SettingsFrame frame = new SettingsFrame();
+ frame.setFlag(SettingsFrame.ACK);
+ frame.streamid(streamid);
+ return frame;
+ }
+
+ static class HeaderDecoder implements DecodingCallback {
+ HttpHeadersImpl headers;
+
+ HeaderDecoder() {
+ this.headers = new HttpHeadersImpl();
+ }
+
+ @Override
+ public void onDecoded(CharSequence name, CharSequence value) {
+ headers.addHeader(name.toString(), value.toString());
+ }
+
+ HttpHeadersImpl headers() {
+ return headers;
+ }
+ }
}