# HG changeset patch # User michaelm # Date 1511349696 0 # Node ID 32f6aefec11e34a2950ae3b3601a8b7a9b24cd82 # Parent 0bd10b7df2d2d3b10bda64d54c18e0e22a04083d http-client-branch: HttpRequest/HttpResponse api change: remove link between requests, add links between responses. Fixed some redirection problems diff -r 0bd10b7df2d2 -r 32f6aefec11e src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Wed Nov 22 10:15:53 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Wed Nov 22 11:21:36 2017 +0000 @@ -124,6 +124,16 @@ } /** + * Called after a redirect or similar kind of retry where a body might + * be sent but we don't want it. Should send a RESET in h2. For http/1.1 + * we can consume small quantity of data, or close the connection in + * other cases. + */ + public CompletableFuture ignoreBody() { + return exchImpl.ignoreBody(); + } + + /** * Called when a new exchange is created to replace this exchange. * At this point it is guaranteed that readBody/readBodyAsync will * not be called. diff -r 0bd10b7df2d2 -r 32f6aefec11e src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Wed Nov 22 10:15:53 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Wed Nov 22 11:21:36 2017 +0000 @@ -154,6 +154,11 @@ boolean returnConnectionToPool, Executor executor); + /** + * Ignore/consume the body. + */ + abstract CompletableFuture ignoreBody(); + /** Gets the response headers. Completes before body is read. */ abstract CompletableFuture getResponseAsync(Executor executor); diff -r 0bd10b7df2d2 -r 32f6aefec11e src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Wed Nov 22 10:15:53 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Wed Nov 22 11:21:36 2017 +0000 @@ -325,6 +325,11 @@ return bodyCF; } + @Override + CompletableFuture ignoreBody() { + return response.ignoreBody(executor); + } + ByteBuffer drainLeftOverBytes() { synchronized (lock) { asyncReceiver.stop(); diff -r 0bd10b7df2d2 -r 32f6aefec11e src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java Wed Nov 22 10:15:53 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java Wed Nov 22 11:21:36 2017 +0000 @@ -57,6 +57,8 @@ private final BodyReader bodyReader; // used to read the body private final Http1AsyncReceiver asyncReceiver; private volatile EOFException eof; + // max number of bytes of (fixed length) body to ignore on redirect + private final static int MAX_IGNORE = 1024; // Revisit: can we get rid of this? static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE} @@ -138,12 +140,25 @@ return clen; } - public CompletableFuture readBody(HttpResponse.BodySubscriber p, + /** + * Read up to MAX_IGNORE bytes discarding + */ + public CompletableFuture ignoreBody(Executor executor) { + int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1); + if (clen == -1 || clen > MAX_IGNORE) { + connection.close(); + return MinimalFuture.completedFuture(null); // not treating as error + } else { + return readBody(HttpResponse.BodySubscriber.discard((Void)null), true, executor); + } + } + + public CompletableFuture readBody(HttpResponse.BodySubscriber p, boolean return2Cache, Executor executor) { this.return2Cache = return2Cache; - final HttpResponse.BodySubscriber pusher = p; - final CompletableFuture cf = p.getBody().toCompletableFuture(); + final HttpResponse.BodySubscriber pusher = p; + final CompletableFuture cf = p.getBody().toCompletableFuture(); int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1); diff -r 0bd10b7df2d2 -r 32f6aefec11e src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Wed Nov 22 10:15:53 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Wed Nov 22 11:21:36 2017 +0000 @@ -628,9 +628,9 @@ decodeHeaders((HeaderFrame) frame, decoder); } - // To avoid looping, an endpoint MUST NOT send a RST_STREAM in - // response to a RST_STREAM frame. - if (!(frame instanceof ResetFrame)) { + int sid = frame.streamid(); + if (sid >= nextstreamid && !(frame instanceof ResetFrame)) { + // otherwise the stream has already been reset/closed resetStream(streamid, ResetFrame.PROTOCOL_ERROR); } return; diff -r 0bd10b7df2d2 -r 32f6aefec11e src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Wed Nov 22 10:15:53 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Wed Nov 22 11:21:36 2017 +0000 @@ -100,19 +100,26 @@ public abstract int statusCode(); /** - * Returns the initial {@link HttpRequest} that initiated the exchange. + * Returns the {@link HttpRequest} corresponding to this response. + *

+ * This may not be the original request provided by the caller, + * for example, if that request was redirected. + * + * @see #previousResponse() * * @return the request */ public abstract HttpRequest request(); /** - * Returns the final {@link HttpRequest} that was sent on the wire for the - * exchange ( may, or may not, be the same as the initial request ). + * Returns an {@code Optional} containing the previous intermediate response if + * one was received. An intermediate response is one that is received + * as a result of redirection or authentication. If no previous response + * was received then an empty {@code Optional} is returned. * - * @return the request + * @return an Optional containing the HttpResponse, if any. */ - public abstract HttpRequest finalRequest(); + public abstract Optional> previousResponse(); /** * Returns the received response headers. @@ -126,6 +133,9 @@ * may represent the body after it was read (such as {@code byte[]}, or * {@code String}, or {@code Path}) or it may represent an object with * which the body is read, such as an {@link java.io.InputStream}. + *

+ * If this {@code HttpResponse} was returned from an invocation of + * {@link #previousResponse()} then this method returns {@code null} * * @return the body */ diff -r 0bd10b7df2d2 -r 32f6aefec11e src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponseImpl.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponseImpl.java Wed Nov 22 10:15:53 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponseImpl.java Wed Nov 22 11:21:36 2017 +0000 @@ -28,6 +28,7 @@ import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import javax.net.ssl.SSLParameters; @@ -41,7 +42,7 @@ final int responseCode; final Exchange exchange; final HttpRequest initialRequest; - final HttpRequestImpl finalRequest; + final Optional> previousResponse; final HttpHeaders headers; final SSLParameters sslParameters; final URI uri; @@ -53,16 +54,17 @@ public HttpResponseImpl(HttpRequest initialRequest, Response response, + HttpResponse previousResponse, T body, Exchange exch) { this.responseCode = response.statusCode(); this.exchange = exch; this.initialRequest = initialRequest; - this.finalRequest = exchange.request(); + this.previousResponse = Optional.ofNullable(previousResponse); this.headers = response.headers(); //this.trailers = trailers; this.sslParameters = exch.client().sslParameters(); - this.uri = finalRequest.uri(); + this.uri = response.request().uri(); this.version = response.version(); this.connection = exch.exchImpl.connection(); this.stream = null; @@ -105,8 +107,8 @@ } @Override - public HttpRequest finalRequest() { - return finalRequest; + public Optional> previousResponse() { + return previousResponse; } @Override diff -r 0bd10b7df2d2 -r 32f6aefec11e src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java Wed Nov 22 10:15:53 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java Wed Nov 22 11:21:36 2017 +0000 @@ -35,7 +35,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.UnaryOperator; +import java.util.function.Function; import jdk.incubator.http.HttpResponse.UntrustedBodyHandler; import jdk.incubator.http.internal.common.Log; import jdk.incubator.http.internal.common.MinimalFuture; @@ -71,6 +71,7 @@ Exchange previous; volatile Throwable retryCause; volatile boolean expiredOnce; + volatile HttpResponse response = null; // Maximum number of times a request will be retried/redirected // for any reason @@ -224,11 +225,11 @@ .thenCompose((Response r) -> { Exchange exch = getExchange(); return exch.readBodyAsync(responseHandler) - .thenApply((T body) -> - new HttpResponseImpl<>(userRequest, - r, - body, - exch)); + .thenApply((T body) -> { + this.response = + new HttpResponseImpl<>(userRequest, r, this.response, body, exch); + return this.response; + }); }); } @@ -280,11 +281,15 @@ } return completedFuture(response); } else { - currentreq = newrequest; - expiredOnce = false; - setExchange(new Exchange<>(currentreq, this, acc)); - //reads body off previous, and then waits for next response - return responseAsyncImpl(); + this.response = + new HttpResponseImpl<>(currentreq, response, this.response, null, exch); + Exchange oldExch = exch; + return exch.ignoreBody().handle((r,t) -> { + currentreq = newrequest; + expiredOnce = false; + setExchange(new Exchange<>(currentreq, this, acc)); + return responseAsyncImpl(); + }).thenCompose(Function.identity()); } }) .handle((response, ex) -> { // 5. handle errors and cancel any timer set @@ -300,7 +305,7 @@ } else { return errorCF; } }) - .thenCompose(UnaryOperator.identity()); + .thenCompose(Function.identity()); } return cf; } diff -r 0bd10b7df2d2 -r 32f6aefec11e src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Wed Nov 22 10:15:53 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Wed Nov 22 11:21:36 2017 +0000 @@ -752,6 +752,20 @@ } } + /** + * Send a RESET frame to tell server to stop sending data on this stream + */ + @Override + public CompletableFuture ignoreBody() { + try { + connection.resetStream(streamid, ResetFrame.STREAM_CLOSED); + return MinimalFuture.completedFuture(null); + } catch (Throwable e) { + Log.logTrace("Error resetting stream {0}", e.toString()); + return MinimalFuture.failedFuture(e); + } + } + DataFrame getDataFrame(ByteBuffer buffer) { int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining()); // blocks waiting for stream send window, if exhausted @@ -1023,7 +1037,7 @@ responseCF.completeExceptionally(t); } else { HttpResponseImpl resp = - new HttpResponseImpl<>(r.request, r, body, getExchange()); + new HttpResponseImpl<>(r.request, r, null, body, getExchange()); responseCF.complete(resp); } }); diff -r 0bd10b7df2d2 -r 32f6aefec11e test/jdk/java/net/httpclient/ManyRequestsLegacy.java --- a/test/jdk/java/net/httpclient/ManyRequestsLegacy.java Wed Nov 22 10:15:53 2017 +0000 +++ b/test/jdk/java/net/httpclient/ManyRequestsLegacy.java Wed Nov 22 11:21:36 2017 +0000 @@ -51,6 +51,7 @@ import java.net.URI; import java.net.URLConnection; import java.security.NoSuchAlgorithmException; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import javax.net.ssl.SSLContext; @@ -132,7 +133,7 @@ @Override public HttpRequest request() {return request;} @Override - public HttpRequest finalRequest() {return request;} + public Optional> previousResponse() {return Optional.empty();} @Override public HttpHeaders headers() { return error(); } @Override diff -r 0bd10b7df2d2 -r 32f6aefec11e test/jdk/java/net/httpclient/SmokeTest.java --- a/test/jdk/java/net/httpclient/SmokeTest.java Wed Nov 22 10:15:53 2017 +0000 +++ b/test/jdk/java/net/httpclient/SmokeTest.java Wed Nov 22 11:21:36 2017 +0000 @@ -749,7 +749,13 @@ } else { responseHeaders.add("Location", SmokeTest.midSizedFilename); } - t.sendResponseHeaders(301, -1); + t.sendResponseHeaders(301, 64 * 1024); + byte[] bb = new byte[1024]; + OutputStream os = t.getResponseBody(); + for (int i=0; i<64; i++) { + os.write(bb); + } + os.close(); t.close(); } diff -r 0bd10b7df2d2 -r 32f6aefec11e test/jdk/java/net/httpclient/http2/RedirectTest.java --- a/test/jdk/java/net/httpclient/http2/RedirectTest.java Wed Nov 22 10:15:53 2017 +0000 +++ b/test/jdk/java/net/httpclient/http2/RedirectTest.java Wed Nov 22 11:21:36 2017 +0000 @@ -39,17 +39,19 @@ import java.util.function.*; import java.util.Arrays; import java.util.Iterator; +import java.util.Objects; import org.testng.annotations.Test; import static jdk.incubator.http.HttpClient.Version.HTTP_2; import static jdk.incubator.http.HttpRequest.BodyPublisher.fromString; import static jdk.incubator.http.HttpResponse.BodyHandler.asString; public class RedirectTest { - static int httpPort, altPort; - static Http2TestServer httpServer, altServer; + static int httpPort; + static Http2TestServer httpServer; static HttpClient client; static String httpURIString, altURIString1, altURIString2; + static URI httpURI, altURI1, altURI2; static Supplier sup(String... args) { Iterator i = Arrays.asList(args).iterator(); @@ -57,28 +59,56 @@ return () -> i.next(); } + static class Redirector extends Http2RedirectHandler { + private InetSocketAddress remoteAddr; + private boolean error = false; + + Redirector(Supplier supplier) { + super(supplier); + } + + protected synchronized void examineExchange(Http2TestExchange ex) { + InetSocketAddress addr = ex.getRemoteAddress(); + if (remoteAddr == null) { + remoteAddr = addr; + return; + } + // check that the client addr/port stays the same, proving + // that the connection didn't get dropped. + if (!remoteAddr.equals(addr)) { + System.err.printf("Error %s/%s\n", remoteAddr.toString(), + addr.toString()); + error = true; + } + } + + public synchronized boolean error() { + return error; + } + } + static void initialize() throws Exception { try { client = getClient(); httpServer = new Http2TestServer(false, 0, null, null); - httpPort = httpServer.getAddress().getPort(); - altServer = new Http2TestServer(false, 0, null, null); - altPort = altServer.getAddress().getPort(); // urls are accessed in sequence below. The first two are on // different servers. Third on same server as second. So, the // client should use the same http connection. httpURIString = "http://127.0.0.1:" + httpPort + "/foo/"; - altURIString1 = "http://127.0.0.1:" + altPort + "/redir"; - altURIString2 = "http://127.0.0.1:" + altPort + "/redir/again"; + httpURI = URI.create(httpURIString); + altURIString1 = "http://127.0.0.1:" + httpPort + "/redir"; + altURI1 = URI.create(altURIString1); + altURIString2 = "http://127.0.0.1:" + httpPort + "/redir_again"; + altURI2 = URI.create(altURIString2); - httpServer.addHandler(new Http2RedirectHandler(sup(altURIString1)), "/foo"); - altServer.addHandler(new Http2RedirectHandler(sup(altURIString2)), "/redir"); - altServer.addHandler(new Http2EchoHandler(), "/redir/again"); + Redirector r = new Redirector(sup(altURIString1, altURIString2)); + httpServer.addHandler(r, "/foo"); + httpServer.addHandler(r, "/redir"); + httpServer.addHandler(new Http2EchoHandler(), "/redir_again"); httpServer.start(); - altServer.start(); } catch (Throwable e) { System.err.println("Throwing now"); e.printStackTrace(); @@ -91,12 +121,8 @@ try { initialize(); simpleTest(); - } catch (Throwable tt) { - System.err.println("tt caught"); - tt.printStackTrace(); } finally { httpServer.stop(); - altServer.stop(); } } @@ -122,6 +148,15 @@ } } + static void checkURIs(URI expected, URI found) throws Exception { + System.out.printf ("Expected: %s, Found: %s\n", expected.toString(), found.toString()); + if (!expected.equals(found)) { + System.err.printf ("Test failed: wrong URI %s/%s\n", + expected.toString(), found.toString()); + throw new RuntimeException("Test failed"); + } + } + static void checkStrings(String expected, String found) throws Exception { if (!expected.equals(found)) { System.err.printf ("Test failed: wrong string %s/%s\n", @@ -146,6 +181,16 @@ checkStatus(200, response.statusCode()); String responseBody = response.body(); checkStrings(SIMPLE_STRING, responseBody); + checkURIs(response.uri(), altURI2); + + // check two previous responses + HttpResponse prev = response.previousResponse() + .orElseThrow(() -> new RuntimeException("no previous response")); + checkURIs(prev.uri(), altURI1); + + prev = prev.previousResponse() + .orElseThrow(() -> new RuntimeException("no previous response")); + checkURIs(prev.uri(), httpURI); System.err.println("DONE"); Thread.sleep (6000); diff -r 0bd10b7df2d2 -r 32f6aefec11e test/jdk/java/net/httpclient/http2/server/BodyInputStream.java --- a/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java Wed Nov 22 10:15:53 2017 +0000 +++ b/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java Wed Nov 22 11:21:36 2017 +0000 @@ -62,9 +62,6 @@ Http2Frame frame; do { frame = q.take(); - if (frame.type() == ResetFrame.TYPE) { - conn.handleStreamReset((ResetFrame) frame); // throws IOException - } // ignoring others for now Wupdates handled elsewhere if (frame.type() != DataFrame.TYPE) { System.out.println("Ignoring " + frame.toString() + " CHECK THIS"); diff -r 0bd10b7df2d2 -r 32f6aefec11e test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java --- a/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java Wed Nov 22 10:15:53 2017 +0000 +++ b/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java Wed Nov 22 11:21:36 2017 +0000 @@ -36,7 +36,7 @@ final int streamid; int window; - boolean closed; + volatile boolean closed; boolean goodToGo = false; // not allowed to send until headers sent final Http2TestServerConnection conn; final Queue outputQ; @@ -116,10 +116,11 @@ @Override public void close() { - if (closed) { - return; + if (closed) return; + synchronized (this) { + if (closed) return; + closed = true; } - closed = true; try { send(EMPTY_BARRAY, 0, 0, DataFrame.END_STREAM); } catch (IOException ex) { diff -r 0bd10b7df2d2 -r 32f6aefec11e test/jdk/java/net/httpclient/http2/server/Http2RedirectHandler.java --- a/test/jdk/java/net/httpclient/http2/server/Http2RedirectHandler.java Wed Nov 22 10:15:53 2017 +0000 +++ b/test/jdk/java/net/httpclient/http2/server/Http2RedirectHandler.java Wed Nov 22 11:21:36 2017 +0000 @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.util.function.Supplier; import jdk.incubator.http.internal.common.HttpHeadersImpl; @@ -36,15 +37,26 @@ @Override public void handle(Http2TestExchange t) throws IOException { + examineExchange(t); try (InputStream is = t.getRequestBody()) { is.readAllBytes(); String location = supplier.get(); - System.err.println("RedirectHandler received request to " + t.getRequestURI()); + System.err.printf("RedirectHandler request to %s from %s\n", + t.getRequestURI().toString(), t.getRemoteAddress().toString()); System.err.println("Redirecting to: " + location); HttpHeadersImpl map1 = t.getResponseHeaders(); map1.addHeader("Location", location); - t.sendResponseHeaders(301, 0); + t.sendResponseHeaders(301, 1024); + byte[] bb = new byte[1024]; + OutputStream os = t.getResponseBody(); + os.write(bb); + os.close(); t.close(); } } + + // override in sub-class to examine the exchange, but don't + // alter transaction state by reading the request body etc. + protected void examineExchange(Http2TestExchange t) { + } } diff -r 0bd10b7df2d2 -r 32f6aefec11e test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java --- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Wed Nov 22 10:15:53 2017 +0000 +++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Wed Nov 22 11:21:36 2017 +0000 @@ -65,6 +65,7 @@ final Http2TestServer server; @SuppressWarnings({"rawtypes","unchecked"}) final Map streams; // input q per stream + final Map outStreams; // output q per stream final HashSet pushStreams; final Queue outputQ; volatile int nextstream; @@ -86,6 +87,12 @@ final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(); + static class Sentinel extends Http2Frame { + Sentinel() { super(-1,-1);} + } + + static Sentinel sentinel; + Http2TestServerConnection(Http2TestServer server, Socket socket, Http2TestExchangeSupplier exchangeSupplier) @@ -98,7 +105,8 @@ this.server = server; this.exchangeSupplier = exchangeSupplier; this.streams = Collections.synchronizedMap(new HashMap<>()); - this.outputQ = new Queue<>(); + this.outStreams = Collections.synchronizedMap(new HashMap<>()); + this.outputQ = new Queue<>(sentinel); this.socket = socket; this.socket.setTcpNoDelay(true); this.serverSettings = SettingsFrame.getDefaultSettings(); @@ -267,11 +275,6 @@ //System.err.printf("TestServer: wrote %d bytes\n", c); } - void handleStreamReset(ResetFrame resetFrame) throws IOException { - // TODO: cleanup - throw new IOException("Stream reset"); - } - private void handleCommonFrame(Http2Frame f) throws IOException { if (f instanceof SettingsFrame) { SettingsFrame sf = (SettingsFrame) f; @@ -371,7 +374,7 @@ headers.setHeader(":scheme", "http"); // always in this case headers.setHeader(":authority", host); headers.setHeader(":path", uri.getPath()); - Queue q = new Queue(); + Queue q = new Queue(sentinel); String body = getRequestBody(request); addHeaders(getHeaders(request), headers); headers.setHeader("Content-length", Integer.toString(body.length())); @@ -413,7 +416,7 @@ } boolean endStreamReceived = endStream; HttpHeadersImpl headers = decodeHeaders(frames); - Queue q = new Queue(); + Queue q = new Queue(sentinel); streams.put(streamid, q); exec.submit(() -> { handleRequest(headers, q, streamid, endStreamReceived); @@ -454,6 +457,7 @@ try (bis; BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this)) { + outStreams.put(streamid, bos); String us = scheme + "://" + authority + path; URI uri = new URI(us); boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1; @@ -519,6 +523,17 @@ Consumer r = updaters.get(stream); r.accept(wup.getUpdate()); } + } else if (frame.type() == ResetFrame.TYPE) { + // do orderly close on input q + // and close the output q immediately + // This should mean depending on what the + // handler is doing: either an EOF on read + // or an IOException if writing the response. + q.orderlyClose(); + BodyOutputStream oq = outStreams.get(stream); + if (oq != null) + oq.closeInternal(); + } else { q.put(frame); } @@ -613,6 +628,7 @@ promisedStreamid, clientSettings.getParameter( SettingsFrame.INITIAL_WINDOW_SIZE), this); + outStreams.put(promisedStreamid, oo); oo.goodToGo(); exec.submit(() -> { try { diff -r 0bd10b7df2d2 -r 32f6aefec11e test/jdk/java/net/httpclient/http2/server/Queue.java --- a/test/jdk/java/net/httpclient/http2/server/Queue.java Wed Nov 22 10:15:53 2017 +0000 +++ b/test/jdk/java/net/httpclient/http2/server/Queue.java Wed Nov 22 11:21:36 2017 +0000 @@ -35,24 +35,24 @@ public class Queue implements ExceptionallyCloseable { private final LinkedList q = new LinkedList<>(); - private volatile boolean closed = false; - private volatile Throwable exception = null; + private boolean closed = false; + private boolean closing = false; + private Throwable exception = null; private Runnable callback; private boolean callbackDisabled = false; private int waiters; // true if someone waiting + private final T closeSentinel; + + Queue(T closeSentinel) { + this.closeSentinel = closeSentinel; + } public synchronized int size() { return q.size(); } -// public synchronized boolean tryPut(T obj) throws IOException { -// if (closed) return false; -// put(obj); -// return true; -// } - public synchronized void put(T obj) throws IOException { - if (closed) { + if (closed || closing) { throw new IOException("stream closed"); } @@ -73,30 +73,19 @@ } } -// public synchronized void disableCallback() { -// callbackDisabled = true; -// } - -// public synchronized void enableCallback() { -// callbackDisabled = false; -// while (q.size() > 0) { -// callback.run(); -// } -// } + // Other close() variants are immediate and abortive + // This allows whatever is on Q to be processed first. -// /** -// * callback is invoked any time put is called where -// * the Queue was empty. -// */ -// public synchronized void registerPutCallback(Runnable callback) { -// Objects.requireNonNull(callback); -// this.callback = callback; -// if (q.size() > 0) { -// // Note: calling callback while holding the lock is -// // dangerous and may lead to deadlocks. -// callback.run(); -// } -// } + public synchronized void orderlyClose() { + if (closing || closed) + return; + try { + put(closeSentinel); + } catch (IOException e) { + e.printStackTrace(); + } + closing = true; + } @Override public synchronized void close() { @@ -132,7 +121,12 @@ } waiters--; } - return q.removeFirst(); + T item = q.removeFirst(); + if (item.equals(closeSentinel)) { + closed = true; + assert q.isEmpty(); + } + return item; } catch (InterruptedException ex) { throw new IOException(ex); } @@ -146,26 +140,9 @@ if (q.isEmpty()) { return null; } - T res = q.removeFirst(); - return res; + return take(); } -// public synchronized T[] pollAll(T[] type) throws IOException { -// T[] ret = q.toArray(type); -// q.clear(); -// return ret; -// } - -// public synchronized void pushback(T v) { -// q.addFirst(v); -// } - -// public synchronized void pushbackAll(T[] v) { -// for (int i=v.length-1; i>=0; i--) { -// q.addFirst(v[i]); -// } -// } - private IOException newIOException(String msg) { if (exception == null) { return new IOException(msg); @@ -173,5 +150,4 @@ return new IOException(msg, exception); } } - }