http-client-branch: HttpRequest/HttpResponse api change: remove link between requests, add links between responses. Fixed some redirection problems
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Wed Nov 22 11:21:36 2017 +0000
@@ -124,6 +124,16 @@
}
/**
+ * Called after a redirect or similar kind of retry where a body might
+ * be sent but we don't want it. Should send a RESET in h2. For http/1.1
+ * we can consume small quantity of data, or close the connection in
+ * other cases.
+ */
+ public CompletableFuture<Void> ignoreBody() {
+ return exchImpl.ignoreBody();
+ }
+
+ /**
* Called when a new exchange is created to replace this exchange.
* At this point it is guaranteed that readBody/readBodyAsync will
* not be called.
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Wed Nov 22 11:21:36 2017 +0000
@@ -154,6 +154,11 @@
boolean returnConnectionToPool,
Executor executor);
+ /**
+ * Ignore/consume the body.
+ */
+ abstract CompletableFuture<Void> ignoreBody();
+
/** Gets the response headers. Completes before body is read. */
abstract CompletableFuture<Response> getResponseAsync(Executor executor);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Wed Nov 22 11:21:36 2017 +0000
@@ -325,6 +325,11 @@
return bodyCF;
}
+ @Override
+ CompletableFuture<Void> ignoreBody() {
+ return response.ignoreBody(executor);
+ }
+
ByteBuffer drainLeftOverBytes() {
synchronized (lock) {
asyncReceiver.stop();
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java Wed Nov 22 11:21:36 2017 +0000
@@ -57,6 +57,8 @@
private final BodyReader bodyReader; // used to read the body
private final Http1AsyncReceiver asyncReceiver;
private volatile EOFException eof;
+ // max number of bytes of (fixed length) body to ignore on redirect
+ private final static int MAX_IGNORE = 1024;
// Revisit: can we get rid of this?
static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
@@ -138,12 +140,25 @@
return clen;
}
- public CompletableFuture<T> readBody(HttpResponse.BodySubscriber<T> p,
+ /**
+ * Read up to MAX_IGNORE bytes discarding
+ */
+ public CompletableFuture<Void> ignoreBody(Executor executor) {
+ int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
+ if (clen == -1 || clen > MAX_IGNORE) {
+ connection.close();
+ return MinimalFuture.completedFuture(null); // not treating as error
+ } else {
+ return readBody(HttpResponse.BodySubscriber.discard((Void)null), true, executor);
+ }
+ }
+
+ public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
boolean return2Cache,
Executor executor) {
this.return2Cache = return2Cache;
- final HttpResponse.BodySubscriber<T> pusher = p;
- final CompletableFuture<T> cf = p.getBody().toCompletableFuture();
+ final HttpResponse.BodySubscriber<U> pusher = p;
+ final CompletableFuture<U> cf = p.getBody().toCompletableFuture();
int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Wed Nov 22 11:21:36 2017 +0000
@@ -628,9 +628,9 @@
decodeHeaders((HeaderFrame) frame, decoder);
}
- // To avoid looping, an endpoint MUST NOT send a RST_STREAM in
- // response to a RST_STREAM frame.
- if (!(frame instanceof ResetFrame)) {
+ int sid = frame.streamid();
+ if (sid >= nextstreamid && !(frame instanceof ResetFrame)) {
+ // otherwise the stream has already been reset/closed
resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
}
return;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Wed Nov 22 11:21:36 2017 +0000
@@ -100,19 +100,26 @@
public abstract int statusCode();
/**
- * Returns the initial {@link HttpRequest} that initiated the exchange.
+ * Returns the {@link HttpRequest} corresponding to this response.
+ * <p>
+ * This may not be the original request provided by the caller,
+ * for example, if that request was redirected.
+ *
+ * @see #previousResponse()
*
* @return the request
*/
public abstract HttpRequest request();
/**
- * Returns the final {@link HttpRequest} that was sent on the wire for the
- * exchange ( may, or may not, be the same as the initial request ).
+ * Returns an {@code Optional} containing the previous intermediate response if
+ * one was received. An intermediate response is one that is received
+ * as a result of redirection or authentication. If no previous response
+ * was received then an empty {@code Optional} is returned.
*
- * @return the request
+ * @return an Optional containing the HttpResponse, if any.
*/
- public abstract HttpRequest finalRequest();
+ public abstract Optional<HttpResponse<T>> previousResponse();
/**
* Returns the received response headers.
@@ -126,6 +133,9 @@
* may represent the body after it was read (such as {@code byte[]}, or
* {@code String}, or {@code Path}) or it may represent an object with
* which the body is read, such as an {@link java.io.InputStream}.
+ * <p>
+ * If this {@code HttpResponse} was returned from an invocation of
+ * {@link #previousResponse()} then this method returns {@code null}
*
* @return the body
*/
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponseImpl.java Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponseImpl.java Wed Nov 22 11:21:36 2017 +0000
@@ -28,6 +28,7 @@
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import javax.net.ssl.SSLParameters;
@@ -41,7 +42,7 @@
final int responseCode;
final Exchange<T> exchange;
final HttpRequest initialRequest;
- final HttpRequestImpl finalRequest;
+ final Optional<HttpResponse<T>> previousResponse;
final HttpHeaders headers;
final SSLParameters sslParameters;
final URI uri;
@@ -53,16 +54,17 @@
public HttpResponseImpl(HttpRequest initialRequest,
Response response,
+ HttpResponse<T> previousResponse,
T body,
Exchange<T> exch) {
this.responseCode = response.statusCode();
this.exchange = exch;
this.initialRequest = initialRequest;
- this.finalRequest = exchange.request();
+ this.previousResponse = Optional.ofNullable(previousResponse);
this.headers = response.headers();
//this.trailers = trailers;
this.sslParameters = exch.client().sslParameters();
- this.uri = finalRequest.uri();
+ this.uri = response.request().uri();
this.version = response.version();
this.connection = exch.exchImpl.connection();
this.stream = null;
@@ -105,8 +107,8 @@
}
@Override
- public HttpRequest finalRequest() {
- return finalRequest;
+ public Optional<HttpResponse<T>> previousResponse() {
+ return previousResponse;
}
@Override
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java Wed Nov 22 11:21:36 2017 +0000
@@ -35,7 +35,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.UnaryOperator;
+import java.util.function.Function;
import jdk.incubator.http.HttpResponse.UntrustedBodyHandler;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.MinimalFuture;
@@ -71,6 +71,7 @@
Exchange<T> previous;
volatile Throwable retryCause;
volatile boolean expiredOnce;
+ volatile HttpResponse<T> response = null;
// Maximum number of times a request will be retried/redirected
// for any reason
@@ -224,11 +225,11 @@
.thenCompose((Response r) -> {
Exchange<T> exch = getExchange();
return exch.readBodyAsync(responseHandler)
- .thenApply((T body) ->
- new HttpResponseImpl<>(userRequest,
- r,
- body,
- exch));
+ .thenApply((T body) -> {
+ this.response =
+ new HttpResponseImpl<>(userRequest, r, this.response, body, exch);
+ return this.response;
+ });
});
}
@@ -280,11 +281,15 @@
}
return completedFuture(response);
} else {
- currentreq = newrequest;
- expiredOnce = false;
- setExchange(new Exchange<>(currentreq, this, acc));
- //reads body off previous, and then waits for next response
- return responseAsyncImpl();
+ this.response =
+ new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
+ Exchange<T> oldExch = exch;
+ return exch.ignoreBody().handle((r,t) -> {
+ currentreq = newrequest;
+ expiredOnce = false;
+ setExchange(new Exchange<>(currentreq, this, acc));
+ return responseAsyncImpl();
+ }).thenCompose(Function.identity());
} })
.handle((response, ex) -> {
// 5. handle errors and cancel any timer set
@@ -300,7 +305,7 @@
} else {
return errorCF;
} })
- .thenCompose(UnaryOperator.identity());
+ .thenCompose(Function.identity());
}
return cf;
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Wed Nov 22 11:21:36 2017 +0000
@@ -752,6 +752,20 @@
}
}
+ /**
+ * Send a RESET frame to tell server to stop sending data on this stream
+ */
+ @Override
+ public CompletableFuture<Void> ignoreBody() {
+ try {
+ connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
+ return MinimalFuture.completedFuture(null);
+ } catch (Throwable e) {
+ Log.logTrace("Error resetting stream {0}", e.toString());
+ return MinimalFuture.failedFuture(e);
+ }
+ }
+
DataFrame getDataFrame(ByteBuffer buffer) {
int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
// blocks waiting for stream send window, if exhausted
@@ -1023,7 +1037,7 @@
responseCF.completeExceptionally(t);
} else {
HttpResponseImpl<T> resp =
- new HttpResponseImpl<>(r.request, r, body, getExchange());
+ new HttpResponseImpl<>(r.request, r, null, body, getExchange());
responseCF.complete(resp);
}
});
--- a/test/jdk/java/net/httpclient/ManyRequestsLegacy.java Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/ManyRequestsLegacy.java Wed Nov 22 11:21:36 2017 +0000
@@ -51,6 +51,7 @@
import java.net.URI;
import java.net.URLConnection;
import java.security.NoSuchAlgorithmException;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
@@ -132,7 +133,7 @@
@Override
public HttpRequest request() {return request;}
@Override
- public HttpRequest finalRequest() {return request;}
+ public Optional<HttpResponse<byte[]>> previousResponse() {return Optional.empty();}
@Override
public HttpHeaders headers() { return error(); }
@Override
--- a/test/jdk/java/net/httpclient/SmokeTest.java Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/SmokeTest.java Wed Nov 22 11:21:36 2017 +0000
@@ -749,7 +749,13 @@
} else {
responseHeaders.add("Location", SmokeTest.midSizedFilename);
}
- t.sendResponseHeaders(301, -1);
+ t.sendResponseHeaders(301, 64 * 1024);
+ byte[] bb = new byte[1024];
+ OutputStream os = t.getResponseBody();
+ for (int i=0; i<64; i++) {
+ os.write(bb);
+ }
+ os.close();
t.close();
}
--- a/test/jdk/java/net/httpclient/http2/RedirectTest.java Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/RedirectTest.java Wed Nov 22 11:21:36 2017 +0000
@@ -39,17 +39,19 @@
import java.util.function.*;
import java.util.Arrays;
import java.util.Iterator;
+import java.util.Objects;
import org.testng.annotations.Test;
import static jdk.incubator.http.HttpClient.Version.HTTP_2;
import static jdk.incubator.http.HttpRequest.BodyPublisher.fromString;
import static jdk.incubator.http.HttpResponse.BodyHandler.asString;
public class RedirectTest {
- static int httpPort, altPort;
- static Http2TestServer httpServer, altServer;
+ static int httpPort;
+ static Http2TestServer httpServer;
static HttpClient client;
static String httpURIString, altURIString1, altURIString2;
+ static URI httpURI, altURI1, altURI2;
static Supplier<String> sup(String... args) {
Iterator<String> i = Arrays.asList(args).iterator();
@@ -57,28 +59,56 @@
return () -> i.next();
}
+ static class Redirector extends Http2RedirectHandler {
+ private InetSocketAddress remoteAddr;
+ private boolean error = false;
+
+ Redirector(Supplier<String> supplier) {
+ super(supplier);
+ }
+
+ protected synchronized void examineExchange(Http2TestExchange ex) {
+ InetSocketAddress addr = ex.getRemoteAddress();
+ if (remoteAddr == null) {
+ remoteAddr = addr;
+ return;
+ }
+ // check that the client addr/port stays the same, proving
+ // that the connection didn't get dropped.
+ if (!remoteAddr.equals(addr)) {
+ System.err.printf("Error %s/%s\n", remoteAddr.toString(),
+ addr.toString());
+ error = true;
+ }
+ }
+
+ public synchronized boolean error() {
+ return error;
+ }
+ }
+
static void initialize() throws Exception {
try {
client = getClient();
httpServer = new Http2TestServer(false, 0, null, null);
-
httpPort = httpServer.getAddress().getPort();
- altServer = new Http2TestServer(false, 0, null, null);
- altPort = altServer.getAddress().getPort();
// urls are accessed in sequence below. The first two are on
// different servers. Third on same server as second. So, the
// client should use the same http connection.
httpURIString = "http://127.0.0.1:" + httpPort + "/foo/";
- altURIString1 = "http://127.0.0.1:" + altPort + "/redir";
- altURIString2 = "http://127.0.0.1:" + altPort + "/redir/again";
+ httpURI = URI.create(httpURIString);
+ altURIString1 = "http://127.0.0.1:" + httpPort + "/redir";
+ altURI1 = URI.create(altURIString1);
+ altURIString2 = "http://127.0.0.1:" + httpPort + "/redir_again";
+ altURI2 = URI.create(altURIString2);
- httpServer.addHandler(new Http2RedirectHandler(sup(altURIString1)), "/foo");
- altServer.addHandler(new Http2RedirectHandler(sup(altURIString2)), "/redir");
- altServer.addHandler(new Http2EchoHandler(), "/redir/again");
+ Redirector r = new Redirector(sup(altURIString1, altURIString2));
+ httpServer.addHandler(r, "/foo");
+ httpServer.addHandler(r, "/redir");
+ httpServer.addHandler(new Http2EchoHandler(), "/redir_again");
httpServer.start();
- altServer.start();
} catch (Throwable e) {
System.err.println("Throwing now");
e.printStackTrace();
@@ -91,12 +121,8 @@
try {
initialize();
simpleTest();
- } catch (Throwable tt) {
- System.err.println("tt caught");
- tt.printStackTrace();
} finally {
httpServer.stop();
- altServer.stop();
}
}
@@ -122,6 +148,15 @@
}
}
+ static void checkURIs(URI expected, URI found) throws Exception {
+ System.out.printf ("Expected: %s, Found: %s\n", expected.toString(), found.toString());
+ if (!expected.equals(found)) {
+ System.err.printf ("Test failed: wrong URI %s/%s\n",
+ expected.toString(), found.toString());
+ throw new RuntimeException("Test failed");
+ }
+ }
+
static void checkStrings(String expected, String found) throws Exception {
if (!expected.equals(found)) {
System.err.printf ("Test failed: wrong string %s/%s\n",
@@ -146,6 +181,16 @@
checkStatus(200, response.statusCode());
String responseBody = response.body();
checkStrings(SIMPLE_STRING, responseBody);
+ checkURIs(response.uri(), altURI2);
+
+ // check two previous responses
+ HttpResponse<String> prev = response.previousResponse()
+ .orElseThrow(() -> new RuntimeException("no previous response"));
+ checkURIs(prev.uri(), altURI1);
+
+ prev = prev.previousResponse()
+ .orElseThrow(() -> new RuntimeException("no previous response"));
+ checkURIs(prev.uri(), httpURI);
System.err.println("DONE");
Thread.sleep (6000);
--- a/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java Wed Nov 22 11:21:36 2017 +0000
@@ -62,9 +62,6 @@
Http2Frame frame;
do {
frame = q.take();
- if (frame.type() == ResetFrame.TYPE) {
- conn.handleStreamReset((ResetFrame) frame); // throws IOException
- }
// ignoring others for now Wupdates handled elsewhere
if (frame.type() != DataFrame.TYPE) {
System.out.println("Ignoring " + frame.toString() + " CHECK THIS");
--- a/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java Wed Nov 22 11:21:36 2017 +0000
@@ -36,7 +36,7 @@
final int streamid;
int window;
- boolean closed;
+ volatile boolean closed;
boolean goodToGo = false; // not allowed to send until headers sent
final Http2TestServerConnection conn;
final Queue outputQ;
@@ -116,10 +116,11 @@
@Override
public void close() {
- if (closed) {
- return;
+ if (closed) return;
+ synchronized (this) {
+ if (closed) return;
+ closed = true;
}
- closed = true;
try {
send(EMPTY_BARRAY, 0, 0, DataFrame.END_STREAM);
} catch (IOException ex) {
--- a/test/jdk/java/net/httpclient/http2/server/Http2RedirectHandler.java Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2RedirectHandler.java Wed Nov 22 11:21:36 2017 +0000
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.util.function.Supplier;
import jdk.incubator.http.internal.common.HttpHeadersImpl;
@@ -36,15 +37,26 @@
@Override
public void handle(Http2TestExchange t) throws IOException {
+ examineExchange(t);
try (InputStream is = t.getRequestBody()) {
is.readAllBytes();
String location = supplier.get();
- System.err.println("RedirectHandler received request to " + t.getRequestURI());
+ System.err.printf("RedirectHandler request to %s from %s\n",
+ t.getRequestURI().toString(), t.getRemoteAddress().toString());
System.err.println("Redirecting to: " + location);
HttpHeadersImpl map1 = t.getResponseHeaders();
map1.addHeader("Location", location);
- t.sendResponseHeaders(301, 0);
+ t.sendResponseHeaders(301, 1024);
+ byte[] bb = new byte[1024];
+ OutputStream os = t.getResponseBody();
+ os.write(bb);
+ os.close();
t.close();
}
}
+
+ // override in sub-class to examine the exchange, but don't
+ // alter transaction state by reading the request body etc.
+ protected void examineExchange(Http2TestExchange t) {
+ }
}
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Wed Nov 22 11:21:36 2017 +0000
@@ -65,6 +65,7 @@
final Http2TestServer server;
@SuppressWarnings({"rawtypes","unchecked"})
final Map<Integer, Queue> streams; // input q per stream
+ final Map<Integer, BodyOutputStream> outStreams; // output q per stream
final HashSet<Integer> pushStreams;
final Queue<Http2Frame> outputQ;
volatile int nextstream;
@@ -86,6 +87,12 @@
final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
+ static class Sentinel extends Http2Frame {
+ Sentinel() { super(-1,-1);}
+ }
+
+ static Sentinel sentinel;
+
Http2TestServerConnection(Http2TestServer server,
Socket socket,
Http2TestExchangeSupplier exchangeSupplier)
@@ -98,7 +105,8 @@
this.server = server;
this.exchangeSupplier = exchangeSupplier;
this.streams = Collections.synchronizedMap(new HashMap<>());
- this.outputQ = new Queue<>();
+ this.outStreams = Collections.synchronizedMap(new HashMap<>());
+ this.outputQ = new Queue<>(sentinel);
this.socket = socket;
this.socket.setTcpNoDelay(true);
this.serverSettings = SettingsFrame.getDefaultSettings();
@@ -267,11 +275,6 @@
//System.err.printf("TestServer: wrote %d bytes\n", c);
}
- void handleStreamReset(ResetFrame resetFrame) throws IOException {
- // TODO: cleanup
- throw new IOException("Stream reset");
- }
-
private void handleCommonFrame(Http2Frame f) throws IOException {
if (f instanceof SettingsFrame) {
SettingsFrame sf = (SettingsFrame) f;
@@ -371,7 +374,7 @@
headers.setHeader(":scheme", "http"); // always in this case
headers.setHeader(":authority", host);
headers.setHeader(":path", uri.getPath());
- Queue q = new Queue();
+ Queue q = new Queue(sentinel);
String body = getRequestBody(request);
addHeaders(getHeaders(request), headers);
headers.setHeader("Content-length", Integer.toString(body.length()));
@@ -413,7 +416,7 @@
}
boolean endStreamReceived = endStream;
HttpHeadersImpl headers = decodeHeaders(frames);
- Queue q = new Queue();
+ Queue q = new Queue(sentinel);
streams.put(streamid, q);
exec.submit(() -> {
handleRequest(headers, q, streamid, endStreamReceived);
@@ -454,6 +457,7 @@
try (bis;
BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this))
{
+ outStreams.put(streamid, bos);
String us = scheme + "://" + authority + path;
URI uri = new URI(us);
boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1;
@@ -519,6 +523,17 @@
Consumer<Integer> r = updaters.get(stream);
r.accept(wup.getUpdate());
}
+ } else if (frame.type() == ResetFrame.TYPE) {
+ // do orderly close on input q
+ // and close the output q immediately
+ // This should mean depending on what the
+ // handler is doing: either an EOF on read
+ // or an IOException if writing the response.
+ q.orderlyClose();
+ BodyOutputStream oq = outStreams.get(stream);
+ if (oq != null)
+ oq.closeInternal();
+
} else {
q.put(frame);
}
@@ -613,6 +628,7 @@
promisedStreamid,
clientSettings.getParameter(
SettingsFrame.INITIAL_WINDOW_SIZE), this);
+ outStreams.put(promisedStreamid, oo);
oo.goodToGo();
exec.submit(() -> {
try {
--- a/test/jdk/java/net/httpclient/http2/server/Queue.java Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Queue.java Wed Nov 22 11:21:36 2017 +0000
@@ -35,24 +35,24 @@
public class Queue<T> implements ExceptionallyCloseable {
private final LinkedList<T> q = new LinkedList<>();
- private volatile boolean closed = false;
- private volatile Throwable exception = null;
+ private boolean closed = false;
+ private boolean closing = false;
+ private Throwable exception = null;
private Runnable callback;
private boolean callbackDisabled = false;
private int waiters; // true if someone waiting
+ private final T closeSentinel;
+
+ Queue(T closeSentinel) {
+ this.closeSentinel = closeSentinel;
+ }
public synchronized int size() {
return q.size();
}
-// public synchronized boolean tryPut(T obj) throws IOException {
-// if (closed) return false;
-// put(obj);
-// return true;
-// }
-
public synchronized void put(T obj) throws IOException {
- if (closed) {
+ if (closed || closing) {
throw new IOException("stream closed");
}
@@ -73,30 +73,19 @@
}
}
-// public synchronized void disableCallback() {
-// callbackDisabled = true;
-// }
-
-// public synchronized void enableCallback() {
-// callbackDisabled = false;
-// while (q.size() > 0) {
-// callback.run();
-// }
-// }
+ // Other close() variants are immediate and abortive
+ // This allows whatever is on Q to be processed first.
-// /**
-// * callback is invoked any time put is called where
-// * the Queue was empty.
-// */
-// public synchronized void registerPutCallback(Runnable callback) {
-// Objects.requireNonNull(callback);
-// this.callback = callback;
-// if (q.size() > 0) {
-// // Note: calling callback while holding the lock is
-// // dangerous and may lead to deadlocks.
-// callback.run();
-// }
-// }
+ public synchronized void orderlyClose() {
+ if (closing || closed)
+ return;
+ try {
+ put(closeSentinel);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ closing = true;
+ }
@Override
public synchronized void close() {
@@ -132,7 +121,12 @@
}
waiters--;
}
- return q.removeFirst();
+ T item = q.removeFirst();
+ if (item.equals(closeSentinel)) {
+ closed = true;
+ assert q.isEmpty();
+ }
+ return item;
} catch (InterruptedException ex) {
throw new IOException(ex);
}
@@ -146,26 +140,9 @@
if (q.isEmpty()) {
return null;
}
- T res = q.removeFirst();
- return res;
+ return take();
}
-// public synchronized T[] pollAll(T[] type) throws IOException {
-// T[] ret = q.toArray(type);
-// q.clear();
-// return ret;
-// }
-
-// public synchronized void pushback(T v) {
-// q.addFirst(v);
-// }
-
-// public synchronized void pushbackAll(T[] v) {
-// for (int i=v.length-1; i>=0; i--) {
-// q.addFirst(v[i]);
-// }
-// }
-
private IOException newIOException(String msg) {
if (exception == null) {
return new IOException(msg);
@@ -173,5 +150,4 @@
return new IOException(msg, exception);
}
}
-
}