http-client-branch: HttpRequest/HttpResponse api change: remove link between requests, add links between responses. Fixed some redirection problems http-client-branch
authormichaelm
Wed, 22 Nov 2017 11:21:36 +0000
branchhttp-client-branch
changeset 55852 32f6aefec11e
parent 55851 0bd10b7df2d2
child 55853 937985fc3c45
http-client-branch: HttpRequest/HttpResponse api change: remove link between requests, add links between responses. Fixed some redirection problems
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponseImpl.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
test/jdk/java/net/httpclient/ManyRequestsLegacy.java
test/jdk/java/net/httpclient/SmokeTest.java
test/jdk/java/net/httpclient/http2/RedirectTest.java
test/jdk/java/net/httpclient/http2/server/BodyInputStream.java
test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java
test/jdk/java/net/httpclient/http2/server/Http2RedirectHandler.java
test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java
test/jdk/java/net/httpclient/http2/server/Queue.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java	Wed Nov 22 11:21:36 2017 +0000
@@ -124,6 +124,16 @@
     }
 
     /**
+     * Called after a redirect or similar kind of retry where a body might
+     * be sent but we don't want it. Should send a RESET in h2. For http/1.1
+     * we can consume small quantity of data, or close the connection in
+     * other cases.
+     */
+    public CompletableFuture<Void> ignoreBody() {
+        return exchImpl.ignoreBody();
+    }
+
+    /**
      * Called when a new exchange is created to replace this exchange.
      * At this point it is guaranteed that readBody/readBodyAsync will
      * not be called.
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java	Wed Nov 22 11:21:36 2017 +0000
@@ -154,6 +154,11 @@
                                                 boolean returnConnectionToPool,
                                                 Executor executor);
 
+    /**
+     * Ignore/consume the body.
+     */
+    abstract CompletableFuture<Void> ignoreBody();
+
     /** Gets the response headers. Completes before body is read. */
     abstract CompletableFuture<Response> getResponseAsync(Executor executor);
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java	Wed Nov 22 11:21:36 2017 +0000
@@ -325,6 +325,11 @@
         return bodyCF;
     }
 
+    @Override
+    CompletableFuture<Void> ignoreBody() {
+        return response.ignoreBody(executor);
+    }
+
     ByteBuffer drainLeftOverBytes() {
         synchronized (lock) {
             asyncReceiver.stop();
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java	Wed Nov 22 11:21:36 2017 +0000
@@ -57,6 +57,8 @@
     private final BodyReader bodyReader; // used to read the body
     private final Http1AsyncReceiver asyncReceiver;
     private volatile EOFException eof;
+    // max number of bytes of (fixed length) body to ignore on redirect
+    private final static int MAX_IGNORE = 1024;
 
     // Revisit: can we get rid of this?
     static enum State {INITIAL, READING_HEADERS, READING_BODY, DONE}
@@ -138,12 +140,25 @@
         return clen;
     }
 
-    public CompletableFuture<T> readBody(HttpResponse.BodySubscriber<T> p,
+    /**
+     * Read up to MAX_IGNORE bytes discarding
+     */
+    public CompletableFuture<Void> ignoreBody(Executor executor) {
+        int clen = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
+        if (clen == -1 || clen > MAX_IGNORE) {
+            connection.close();
+            return MinimalFuture.completedFuture(null); // not treating as error
+        } else {
+            return readBody(HttpResponse.BodySubscriber.discard((Void)null), true, executor);
+        }
+    }
+
+    public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
                                          boolean return2Cache,
                                          Executor executor) {
         this.return2Cache = return2Cache;
-        final HttpResponse.BodySubscriber<T> pusher = p;
-        final CompletableFuture<T> cf = p.getBody().toCompletableFuture();
+        final HttpResponse.BodySubscriber<U> pusher = p;
+        final CompletableFuture<U> cf = p.getBody().toCompletableFuture();
 
         int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1);
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Wed Nov 22 11:21:36 2017 +0000
@@ -628,9 +628,9 @@
                     decodeHeaders((HeaderFrame) frame, decoder);
                 }
 
-                // To avoid looping, an endpoint MUST NOT send a RST_STREAM in
-                // response to a RST_STREAM frame.
-                if (!(frame instanceof ResetFrame)) {
+                int sid = frame.streamid();
+                if (sid >= nextstreamid && !(frame instanceof ResetFrame)) {
+                    // otherwise the stream has already been reset/closed
                     resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
                 }
                 return;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java	Wed Nov 22 11:21:36 2017 +0000
@@ -100,19 +100,26 @@
     public abstract int statusCode();
 
     /**
-     * Returns the initial {@link HttpRequest} that initiated the exchange.
+     * Returns the {@link HttpRequest} corresponding to this response.
+     * <p>
+     * This may not be the original request provided by the caller,
+     * for example, if that request was redirected.
+     *
+     * @see #previousResponse()
      *
      * @return the request
      */
     public abstract HttpRequest request();
 
     /**
-     * Returns the final {@link HttpRequest} that was sent on the wire for the
-     * exchange ( may, or may not, be the same as the initial request ).
+     * Returns an {@code Optional} containing the previous intermediate response if
+     * one was received. An intermediate response is one that is received
+     * as a result of redirection or authentication. If no previous response
+     * was received then an empty {@code Optional} is returned.
      *
-     * @return the request
+     * @return an Optional containing the HttpResponse, if any.
      */
-    public abstract HttpRequest finalRequest();
+    public abstract Optional<HttpResponse<T>> previousResponse();
 
     /**
      * Returns the received response headers.
@@ -126,6 +133,9 @@
      * may represent the body after it was read (such as {@code byte[]}, or
      * {@code String}, or {@code Path}) or it may represent an object with
      * which the body is read, such as an {@link java.io.InputStream}.
+     * <p>
+     * If this {@code HttpResponse} was returned from an invocation of
+     * {@link #previousResponse()} then this method returns {@code null}
      *
      * @return the body
      */
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponseImpl.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponseImpl.java	Wed Nov 22 11:21:36 2017 +0000
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 import javax.net.ssl.SSLParameters;
@@ -41,7 +42,7 @@
     final int responseCode;
     final Exchange<T> exchange;
     final HttpRequest initialRequest;
-    final HttpRequestImpl finalRequest;
+    final Optional<HttpResponse<T>> previousResponse;
     final HttpHeaders headers;
     final SSLParameters sslParameters;
     final URI uri;
@@ -53,16 +54,17 @@
 
     public HttpResponseImpl(HttpRequest initialRequest,
                             Response response,
+                            HttpResponse<T> previousResponse,
                             T body,
                             Exchange<T> exch) {
         this.responseCode = response.statusCode();
         this.exchange = exch;
         this.initialRequest = initialRequest;
-        this.finalRequest = exchange.request();
+        this.previousResponse = Optional.ofNullable(previousResponse);
         this.headers = response.headers();
         //this.trailers = trailers;
         this.sslParameters = exch.client().sslParameters();
-        this.uri = finalRequest.uri();
+        this.uri = response.request().uri();
         this.version = response.version();
         this.connection = exch.exchImpl.connection();
         this.stream = null;
@@ -105,8 +107,8 @@
     }
 
     @Override
-    public HttpRequest finalRequest() {
-        return finalRequest;
+    public Optional<HttpResponse<T>> previousResponse() {
+        return previousResponse;
     }
 
     @Override
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/MultiExchange.java	Wed Nov 22 11:21:36 2017 +0000
@@ -35,7 +35,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.UnaryOperator;
+import java.util.function.Function;
 import jdk.incubator.http.HttpResponse.UntrustedBodyHandler;
 import jdk.incubator.http.internal.common.Log;
 import jdk.incubator.http.internal.common.MinimalFuture;
@@ -71,6 +71,7 @@
     Exchange<T> previous;
     volatile Throwable retryCause;
     volatile boolean expiredOnce;
+    volatile HttpResponse<T> response = null;
 
     // Maximum number of times a request will be retried/redirected
     // for any reason
@@ -224,11 +225,11 @@
                     .thenCompose((Response r) -> {
                         Exchange<T> exch = getExchange();
                         return exch.readBodyAsync(responseHandler)
-                                   .thenApply((T body) ->
-                                           new HttpResponseImpl<>(userRequest,
-                                                                  r,
-                                                                  body,
-                                                                  exch));
+                            .thenApply((T body) -> {
+                                this.response =
+                                    new HttpResponseImpl<>(userRequest, r, this.response, body, exch);
+                                return this.response;
+                            });
                     });
     }
 
@@ -280,11 +281,15 @@
                             }
                             return completedFuture(response);
                         } else {
-                            currentreq = newrequest;
-                            expiredOnce = false;
-                            setExchange(new Exchange<>(currentreq, this, acc));
-                            //reads body off previous, and then waits for next response
-                            return responseAsyncImpl();
+                            this.response =
+                                new HttpResponseImpl<>(currentreq, response, this.response, null, exch);
+                            Exchange<T> oldExch = exch;
+                            return exch.ignoreBody().handle((r,t) -> {
+                                currentreq = newrequest;
+                                expiredOnce = false;
+                                setExchange(new Exchange<>(currentreq, this, acc));
+                                return responseAsyncImpl();
+                            }).thenCompose(Function.identity());
                         } })
                      .handle((response, ex) -> {
                         // 5. handle errors and cancel any timer set
@@ -300,7 +305,7 @@
                         } else {
                             return errorCF;
                         } })
-                     .thenCompose(UnaryOperator.identity());
+                     .thenCompose(Function.identity());
         }
         return cf;
     }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Wed Nov 22 11:21:36 2017 +0000
@@ -752,6 +752,20 @@
         }
     }
 
+    /**
+     * Send a RESET frame to tell server to stop sending data on this stream
+     */
+    @Override
+    public CompletableFuture<Void> ignoreBody() {
+        try {
+            connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
+            return MinimalFuture.completedFuture(null);
+        } catch (Throwable e) {
+            Log.logTrace("Error resetting stream {0}", e.toString());
+            return MinimalFuture.failedFuture(e);
+        }
+    }
+
     DataFrame getDataFrame(ByteBuffer buffer) {
         int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
         // blocks waiting for stream send window, if exhausted
@@ -1023,7 +1037,7 @@
                         responseCF.completeExceptionally(t);
                     } else {
                         HttpResponseImpl<T> resp =
-                                new HttpResponseImpl<>(r.request, r, body, getExchange());
+                                new HttpResponseImpl<>(r.request, r, null, body, getExchange());
                         responseCF.complete(resp);
                     }
                 });
--- a/test/jdk/java/net/httpclient/ManyRequestsLegacy.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/ManyRequestsLegacy.java	Wed Nov 22 11:21:36 2017 +0000
@@ -51,6 +51,7 @@
 import java.net.URI;
 import java.net.URLConnection;
 import java.security.NoSuchAlgorithmException;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import javax.net.ssl.SSLContext;
@@ -132,7 +133,7 @@
             @Override
             public HttpRequest request() {return request;}
             @Override
-            public HttpRequest finalRequest() {return request;}
+            public Optional<HttpResponse<byte[]>> previousResponse() {return Optional.empty();}
             @Override
             public HttpHeaders headers() { return error(); }
             @Override
--- a/test/jdk/java/net/httpclient/SmokeTest.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/SmokeTest.java	Wed Nov 22 11:21:36 2017 +0000
@@ -749,7 +749,13 @@
             } else {
                 responseHeaders.add("Location", SmokeTest.midSizedFilename);
             }
-            t.sendResponseHeaders(301, -1);
+            t.sendResponseHeaders(301, 64 * 1024);
+            byte[] bb = new byte[1024];
+            OutputStream os = t.getResponseBody();
+            for (int i=0; i<64; i++) {
+                os.write(bb);
+            }
+            os.close();
             t.close();
         }
 
--- a/test/jdk/java/net/httpclient/http2/RedirectTest.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/RedirectTest.java	Wed Nov 22 11:21:36 2017 +0000
@@ -39,17 +39,19 @@
 import java.util.function.*;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.Objects;
 import org.testng.annotations.Test;
 import static jdk.incubator.http.HttpClient.Version.HTTP_2;
 import static jdk.incubator.http.HttpRequest.BodyPublisher.fromString;
 import static jdk.incubator.http.HttpResponse.BodyHandler.asString;
 
 public class RedirectTest {
-    static int httpPort, altPort;
-    static Http2TestServer httpServer, altServer;
+    static int httpPort;
+    static Http2TestServer httpServer;
     static HttpClient client;
 
     static String httpURIString, altURIString1, altURIString2;
+    static URI httpURI, altURI1, altURI2;
 
     static Supplier<String> sup(String... args) {
         Iterator<String> i = Arrays.asList(args).iterator();
@@ -57,28 +59,56 @@
         return () -> i.next();
     }
 
+    static class Redirector extends Http2RedirectHandler {
+        private InetSocketAddress remoteAddr;
+        private boolean error = false;
+
+        Redirector(Supplier<String> supplier) {
+            super(supplier);
+        }
+
+        protected synchronized void examineExchange(Http2TestExchange ex) {
+            InetSocketAddress addr = ex.getRemoteAddress();
+            if (remoteAddr == null) {
+                remoteAddr = addr;
+                return;
+            }
+            // check that the client addr/port stays the same, proving
+            // that the connection didn't get dropped.
+            if (!remoteAddr.equals(addr)) {
+                System.err.printf("Error %s/%s\n", remoteAddr.toString(),
+                        addr.toString());
+                error = true;
+            }
+        }
+
+        public synchronized boolean error() {
+            return error;
+        }
+    }
+
     static void initialize() throws Exception {
         try {
             client = getClient();
             httpServer = new Http2TestServer(false, 0, null, null);
-
             httpPort = httpServer.getAddress().getPort();
-            altServer = new Http2TestServer(false, 0, null, null);
-            altPort = altServer.getAddress().getPort();
 
             // urls are accessed in sequence below. The first two are on
             // different servers. Third on same server as second. So, the
             // client should use the same http connection.
             httpURIString = "http://127.0.0.1:" + httpPort + "/foo/";
-            altURIString1 = "http://127.0.0.1:" + altPort + "/redir";
-            altURIString2 = "http://127.0.0.1:" + altPort + "/redir/again";
+            httpURI = URI.create(httpURIString);
+            altURIString1 = "http://127.0.0.1:" + httpPort + "/redir";
+            altURI1 = URI.create(altURIString1);
+            altURIString2 = "http://127.0.0.1:" + httpPort + "/redir_again";
+            altURI2 = URI.create(altURIString2);
 
-            httpServer.addHandler(new Http2RedirectHandler(sup(altURIString1)), "/foo");
-            altServer.addHandler(new Http2RedirectHandler(sup(altURIString2)), "/redir");
-            altServer.addHandler(new Http2EchoHandler(), "/redir/again");
+            Redirector r = new Redirector(sup(altURIString1, altURIString2));
+            httpServer.addHandler(r, "/foo");
+            httpServer.addHandler(r, "/redir");
+            httpServer.addHandler(new Http2EchoHandler(), "/redir_again");
 
             httpServer.start();
-            altServer.start();
         } catch (Throwable e) {
             System.err.println("Throwing now");
             e.printStackTrace();
@@ -91,12 +121,8 @@
         try {
             initialize();
             simpleTest();
-        } catch (Throwable tt) {
-            System.err.println("tt caught");
-            tt.printStackTrace();
         } finally {
             httpServer.stop();
-            altServer.stop();
         }
     }
 
@@ -122,6 +148,15 @@
         }
     }
 
+    static void checkURIs(URI expected, URI found) throws Exception {
+        System.out.printf ("Expected: %s, Found: %s\n", expected.toString(), found.toString());
+        if (!expected.equals(found)) {
+            System.err.printf ("Test failed: wrong URI %s/%s\n",
+                expected.toString(), found.toString());
+            throw new RuntimeException("Test failed");
+        }
+    }
+
     static void checkStrings(String expected, String found) throws Exception {
         if (!expected.equals(found)) {
             System.err.printf ("Test failed: wrong string %s/%s\n",
@@ -146,6 +181,16 @@
         checkStatus(200, response.statusCode());
         String responseBody = response.body();
         checkStrings(SIMPLE_STRING, responseBody);
+        checkURIs(response.uri(), altURI2);
+
+        // check two previous responses
+        HttpResponse<String> prev = response.previousResponse()
+            .orElseThrow(() -> new RuntimeException("no previous response"));
+        checkURIs(prev.uri(), altURI1);
+
+        prev = prev.previousResponse()
+            .orElseThrow(() -> new RuntimeException("no previous response"));
+        checkURIs(prev.uri(), httpURI);
 
         System.err.println("DONE");
         Thread.sleep (6000);
--- a/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java	Wed Nov 22 11:21:36 2017 +0000
@@ -62,9 +62,6 @@
         Http2Frame frame;
         do {
             frame = q.take();
-            if (frame.type() == ResetFrame.TYPE) {
-                conn.handleStreamReset((ResetFrame) frame); // throws IOException
-            }
             // ignoring others for now Wupdates handled elsewhere
             if (frame.type() != DataFrame.TYPE) {
                 System.out.println("Ignoring " + frame.toString() + " CHECK THIS");
--- a/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java	Wed Nov 22 11:21:36 2017 +0000
@@ -36,7 +36,7 @@
 
     final int streamid;
     int window;
-    boolean closed;
+    volatile boolean closed;
     boolean goodToGo = false; // not allowed to send until headers sent
     final Http2TestServerConnection conn;
     final Queue outputQ;
@@ -116,10 +116,11 @@
 
     @Override
     public void close() {
-        if (closed) {
-            return;
+        if (closed) return;
+        synchronized (this) {
+            if (closed) return;
+            closed = true;
         }
-        closed = true;
         try {
             send(EMPTY_BARRAY, 0, 0, DataFrame.END_STREAM);
         } catch (IOException ex) {
--- a/test/jdk/java/net/httpclient/http2/server/Http2RedirectHandler.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2RedirectHandler.java	Wed Nov 22 11:21:36 2017 +0000
@@ -23,6 +23,7 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.function.Supplier;
 import jdk.incubator.http.internal.common.HttpHeadersImpl;
 
@@ -36,15 +37,26 @@
 
     @Override
     public void handle(Http2TestExchange t) throws IOException {
+        examineExchange(t);
         try (InputStream is = t.getRequestBody()) {
             is.readAllBytes();
             String location = supplier.get();
-            System.err.println("RedirectHandler received request to " + t.getRequestURI());
+            System.err.printf("RedirectHandler request to %s from %s\n",
+                t.getRequestURI().toString(), t.getRemoteAddress().toString());
             System.err.println("Redirecting to: " + location);
             HttpHeadersImpl map1 = t.getResponseHeaders();
             map1.addHeader("Location", location);
-            t.sendResponseHeaders(301, 0);
+            t.sendResponseHeaders(301, 1024);
+            byte[] bb = new byte[1024];
+            OutputStream os = t.getResponseBody();
+            os.write(bb);
+            os.close();
             t.close();
         }
     }
+
+    // override in sub-class to examine the exchange, but don't
+    // alter transaction state by reading the request body etc.
+    protected void examineExchange(Http2TestExchange t) {
+    }
 }
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java	Wed Nov 22 11:21:36 2017 +0000
@@ -65,6 +65,7 @@
     final Http2TestServer server;
     @SuppressWarnings({"rawtypes","unchecked"})
     final Map<Integer, Queue> streams; // input q per stream
+    final Map<Integer, BodyOutputStream> outStreams; // output q per stream
     final HashSet<Integer> pushStreams;
     final Queue<Http2Frame> outputQ;
     volatile int nextstream;
@@ -86,6 +87,12 @@
 
     final static byte[] clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes();
 
+    static class Sentinel extends Http2Frame {
+        Sentinel() { super(-1,-1);}
+    }
+
+    static Sentinel sentinel;
+
     Http2TestServerConnection(Http2TestServer server,
                               Socket socket,
                               Http2TestExchangeSupplier exchangeSupplier)
@@ -98,7 +105,8 @@
         this.server = server;
         this.exchangeSupplier = exchangeSupplier;
         this.streams = Collections.synchronizedMap(new HashMap<>());
-        this.outputQ = new Queue<>();
+        this.outStreams = Collections.synchronizedMap(new HashMap<>());
+        this.outputQ = new Queue<>(sentinel);
         this.socket = socket;
         this.socket.setTcpNoDelay(true);
         this.serverSettings = SettingsFrame.getDefaultSettings();
@@ -267,11 +275,6 @@
         //System.err.printf("TestServer: wrote %d bytes\n", c);
     }
 
-    void handleStreamReset(ResetFrame resetFrame) throws IOException {
-        // TODO: cleanup
-        throw new IOException("Stream reset");
-    }
-
     private void handleCommonFrame(Http2Frame f) throws IOException {
         if (f instanceof SettingsFrame) {
             SettingsFrame sf = (SettingsFrame) f;
@@ -371,7 +374,7 @@
         headers.setHeader(":scheme", "http"); // always in this case
         headers.setHeader(":authority", host);
         headers.setHeader(":path", uri.getPath());
-        Queue q = new Queue();
+        Queue q = new Queue(sentinel);
         String body = getRequestBody(request);
         addHeaders(getHeaders(request), headers);
         headers.setHeader("Content-length", Integer.toString(body.length()));
@@ -413,7 +416,7 @@
         }
         boolean endStreamReceived = endStream;
         HttpHeadersImpl headers = decodeHeaders(frames);
-        Queue q = new Queue();
+        Queue q = new Queue(sentinel);
         streams.put(streamid, q);
         exec.submit(() -> {
             handleRequest(headers, q, streamid, endStreamReceived);
@@ -454,6 +457,7 @@
         try (bis;
              BodyOutputStream bos = new BodyOutputStream(streamid, winsize, this))
         {
+            outStreams.put(streamid, bos);
             String us = scheme + "://" + authority + path;
             URI uri = new URI(us);
             boolean pushAllowed = clientSettings.getParameter(SettingsFrame.ENABLE_PUSH) == 1;
@@ -519,6 +523,17 @@
                                 Consumer<Integer> r = updaters.get(stream);
                                 r.accept(wup.getUpdate());
                             }
+                        } else if (frame.type() == ResetFrame.TYPE) {
+                            // do orderly close on input q
+                            // and close the output q immediately
+                            // This should mean depending on what the
+                            // handler is doing: either an EOF on read
+                            // or an IOException if writing the response.
+                            q.orderlyClose();
+                            BodyOutputStream oq = outStreams.get(stream);
+                            if (oq != null)
+                                oq.closeInternal();
+
                         } else {
                             q.put(frame);
                         }
@@ -613,6 +628,7 @@
                 promisedStreamid,
                 clientSettings.getParameter(
                         SettingsFrame.INITIAL_WINDOW_SIZE), this);
+        outStreams.put(promisedStreamid, oo);
         oo.goodToGo();
         exec.submit(() -> {
             try {
--- a/test/jdk/java/net/httpclient/http2/server/Queue.java	Wed Nov 22 10:15:53 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Queue.java	Wed Nov 22 11:21:36 2017 +0000
@@ -35,24 +35,24 @@
 public class Queue<T> implements ExceptionallyCloseable {
 
     private final LinkedList<T> q = new LinkedList<>();
-    private volatile boolean closed = false;
-    private volatile Throwable exception = null;
+    private boolean closed = false;
+    private boolean closing = false;
+    private Throwable exception = null;
     private Runnable callback;
     private boolean callbackDisabled = false;
     private int waiters; // true if someone waiting
+    private final T closeSentinel;
+
+    Queue(T closeSentinel) {
+        this.closeSentinel = closeSentinel;
+    }
 
     public synchronized int size() {
         return q.size();
     }
 
-//    public synchronized boolean tryPut(T obj) throws IOException {
-//        if (closed) return false;
-//        put(obj);
-//        return true;
-//    }
-
     public synchronized void put(T obj) throws IOException {
-        if (closed) {
+        if (closed || closing) {
             throw new IOException("stream closed");
         }
 
@@ -73,30 +73,19 @@
         }
     }
 
-//    public synchronized void disableCallback() {
-//        callbackDisabled = true;
-//    }
-
-//    public synchronized void enableCallback() {
-//        callbackDisabled = false;
-//        while (q.size() > 0) {
-//            callback.run();
-//        }
-//    }
+    // Other close() variants are immediate and abortive
+    // This allows whatever is on Q to be processed first.
 
-//    /**
-//     * callback is invoked any time put is called where
-//     * the Queue was empty.
-//     */
-//    public synchronized void registerPutCallback(Runnable callback) {
-//        Objects.requireNonNull(callback);
-//        this.callback = callback;
-//        if (q.size() > 0) {
-//            // Note: calling callback while holding the lock is
-//            // dangerous and may lead to deadlocks.
-//            callback.run();
-//        }
-//    }
+    public synchronized void orderlyClose() {
+        if (closing || closed)
+            return;
+        try {
+            put(closeSentinel);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        closing = true;
+    }
 
     @Override
     public synchronized void close() {
@@ -132,7 +121,12 @@
                 }
                 waiters--;
             }
-            return q.removeFirst();
+            T item = q.removeFirst();
+            if (item.equals(closeSentinel)) {
+                closed = true;
+                assert q.isEmpty();
+            }
+            return item;
         } catch (InterruptedException ex) {
             throw new IOException(ex);
         }
@@ -146,26 +140,9 @@
         if (q.isEmpty()) {
             return null;
         }
-        T res = q.removeFirst();
-        return res;
+        return take();
     }
 
-//    public synchronized T[] pollAll(T[] type) throws IOException {
-//        T[] ret = q.toArray(type);
-//        q.clear();
-//        return ret;
-//    }
-
-//    public synchronized void pushback(T v) {
-//        q.addFirst(v);
-//    }
-
-//    public synchronized void pushbackAll(T[] v) {
-//        for (int i=v.length-1; i>=0; i--) {
-//            q.addFirst(v[i]);
-//        }
-//    }
-
     private IOException newIOException(String msg) {
         if (exception == null) {
             return new IOException(msg);
@@ -173,5 +150,4 @@
             return new IOException(msg, exception);
         }
     }
-
 }