# HG changeset patch # User dfuchs # Date 1512236457 0 # Node ID 8d4770c22b636728f423ee2c5b197e66ec48912b # Parent 2d423c9b73bbab29946aeeb28c1f0ae388ae1440 http-client-barnch: fixed a few issues discovered while stress testing and a race condition in SSLFlowDelegate diff -r 2d423c9b73bb -r 8d4770c22b63 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Fri Dec 01 19:25:34 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Sat Dec 02 17:40:57 2017 +0000 @@ -77,9 +77,9 @@ @Override public void handle() { - assert !connected : "Already connected"; - assert !chan.isBlocking() : "Unexpected blocking channel"; try { + assert !connected : "Already connected"; + assert !chan.isBlocking() : "Unexpected blocking channel"; debug.log(Level.DEBUG, "ConnectEvent: finishing connect"); boolean finished = chan.finishConnect(); assert finished : "Expected channel to be connected"; @@ -88,7 +88,7 @@ connected = true; // complete async since the event runs on the SelectorManager thread cf.completeAsync(() -> null, client().theExecutor()); - } catch (IOException e) { + } catch (Throwable e) { client().theExecutor().execute( () -> cf.completeExceptionally(e)); } } @@ -102,10 +102,10 @@ @Override public CompletableFuture connectAsync() { - assert !connected : "Already connected"; - assert !chan.isBlocking() : "Unexpected blocking channel"; CompletableFuture cf = new MinimalFuture<>(); try { + assert !connected : "Already connected"; + assert !chan.isBlocking() : "Unexpected blocking channel"; boolean finished = false; PrivilegedExceptionAction pa = () -> chan.connect(address); try { diff -r 2d423c9b73bb -r 8d4770c22b63 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Fri Dec 01 19:25:34 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Sat Dec 02 17:40:57 2017 +0000 @@ -402,6 +402,7 @@ try { if (contentLength == 0) { pusher.onComplete(); + onFinished.run(); onComplete.accept(null); } } catch (Throwable t) { diff -r 2d423c9b73bb -r 8d4770c22b63 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Fri Dec 01 19:25:34 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Sat Dec 02 17:40:57 2017 +0000 @@ -320,6 +320,8 @@ DEBUG_LOGGER.log(Level.DEBUG, "Increased demand by 1"); s.request(1); } + assert currentListItr != null; + if (lb.isEmpty()) continue; } assert currentListItr != null; assert currentListItr.hasNext(); @@ -359,27 +361,38 @@ @Override public void onSubscribe(Flow.Subscription s) { - if (!subscribed.compareAndSet(false, true)) { - s.cancel(); - } else { - // check whether the stream is already closed. - // if so, we should cancel the subscription - // immediately. - boolean closed; - synchronized(this) { - closed = this.closed; - if (!closed) { - this.subscription = s; + try { + if (!subscribed.compareAndSet(false, true)) { + s.cancel(); + } else { + // check whether the stream is already closed. + // if so, we should cancel the subscription + // immediately. + boolean closed; + synchronized (this) { + closed = this.closed; + if (!closed) { + this.subscription = s; + } } - } - if (closed) { - s.cancel(); - return; + if (closed) { + s.cancel(); + return; + } + assert buffers.remainingCapacity() > 1; // should contain at least 2 + DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting " + + Math.max(1, buffers.remainingCapacity() - 1)); + s.request(Math.max(1, buffers.remainingCapacity() - 1)); } - assert buffers.remainingCapacity() > 1; // should contain at least 2 - DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting " - + Math.max(1, buffers.remainingCapacity() - 1)); - s.request(Math.max(1, buffers.remainingCapacity() - 1)); + } catch (Throwable t) { + failed = t; + try { + close(); + } catch (IOException x) { + // OK + } finally { + onError(t); + } } } @@ -392,12 +405,14 @@ throw new IllegalStateException("queue is full"); } DEBUG_LOGGER.log(Level.DEBUG, "item offered"); - } catch (Exception ex) { + } catch (Throwable ex) { failed = ex; try { close(); } catch (IOException ex1) { // OK + } finally { + onError(ex); } } } diff -r 2d423c9b73bb -r 8d4770c22b63 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Fri Dec 01 19:25:34 2017 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Sat Dec 02 17:40:57 2017 +0000 @@ -238,10 +238,7 @@ public void incoming(List buffers, boolean complete) { debugr.log(Level.DEBUG, () -> "Adding " + Utils.remaining(buffers) + " bytes to read buffer"); - addToReadBuf(buffers); - if (complete) { - this.completing = true; - } + addToReadBuf(buffers, complete); scheduler.runOrSchedule(); } @@ -269,7 +266,7 @@ } // readBuf is kept ready for reading outside of this method - private void addToReadBuf(List buffers) { + private void addToReadBuf(List buffers, boolean complete) { synchronized (readBufferLock) { for (ByteBuffer buf : buffers) { readBuf.compact(); @@ -278,6 +275,9 @@ readBuf.put(buf); readBuf.flip(); } + if (complete) { + this.completing = complete; + } } } @@ -297,19 +297,32 @@ try { debugr.log(Level.DEBUG, () -> "processData: " + readBuf.remaining() + " bytes to unwrap " - + states(handshakeState)); - - while (readBuf.hasRemaining()) { + + states(handshakeState) + + ", " + engine.getHandshakeStatus()); + int len; + boolean completing = false; + while ((len = readBuf.remaining()) > 0) { boolean handshaking = false; try { EngineResult result; synchronized (readBufferLock) { + completing = this.completing; result = unwrapBuffer(readBuf); debugr.log(Level.DEBUG, "Unwrapped: %s", result.result); } + if (result.bytesProduced() > 0) { + debugr.log(Level.DEBUG, "sending %d", result.bytesProduced()); + count.addAndGet(result.bytesProduced()); + outgoing(result.destBuffer, false); + } if (result.status() == Status.BUFFER_UNDERFLOW) { debugr.log(Level.DEBUG, "BUFFER_UNDERFLOW"); - return; + // not enough data in the read buffer... + synchronized (readBufferLock) { + // check if we have received some data + if (readBuf.remaining() > len) continue; + return; + } } if (completing && result.status() == Status.CLOSED) { debugr.log(Level.DEBUG, "Closed: completing"); @@ -328,11 +341,6 @@ resumeActivity(); } } - if (result.bytesProduced() > 0) { - debugr.log(Level.DEBUG, "sending %d", result.bytesProduced()); - count.addAndGet(result.bytesProduced()); - outgoing(result.destBuffer, false); - } } catch (IOException ex) { errorCommon(ex); handleError(ex); @@ -340,6 +348,11 @@ if (handshaking && !completing) return; } + if (!completing) { + synchronized (readBufferLock) { + completing = this.completing && !readBuf.hasRemaining(); + } + } if (completing) { debugr.log(Level.DEBUG, "completing"); // Complete the alpnCF, if not already complete, regardless of @@ -443,6 +456,7 @@ assert complete ? buffers == Utils.EMPTY_BB_LIST : true; assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true; if (complete) { + debugw.log(Level.DEBUG, "adding SENTINEL"); writeList.add(SENTINEL); } else { writeList.addAll(buffers); @@ -507,7 +521,8 @@ try { debugw.log(Level.DEBUG, () -> "processData(" + Utils.remaining(writeList) + ")"); - while (Utils.remaining(writeList) > 0 || hsTriggered()) { + while (Utils.remaining(writeList) > 0 || hsTriggered() + || needWrap()) { ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY); EngineResult result = wrapBuffers(outbufs); debugw.log(Level.DEBUG, "wrapBuffer returned %s", result.result); @@ -538,6 +553,7 @@ if (writeList.isEmpty() && !result.needUnwrap()) { writer.addData(HS_TRIGGER); } + if (needWrap()) continue; return; } } @@ -551,11 +567,18 @@ outgoing(Utils.EMPTY_BB_LIST, true); return; } + if (writeList.isEmpty() && needWrap()) { + writer.addData(HS_TRIGGER); + } } catch (Throwable ex) { handleError(ex); } } + private boolean needWrap() { + return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP; + } + private void sendResultBytes(EngineResult result) { if (result.bytesProduced() > 0) { debugw.log(Level.DEBUG, "Sending %d bytes downstream", @@ -678,13 +701,19 @@ private void executeTasks(List tasks) { exec.execute(() -> { handshakeState.getAndUpdate((current) -> current | DOING_TASKS); - try { - tasks.forEach((r) -> { - r.run(); - }); - } catch (Throwable t) { - handleError(t); - } + List nextTasks = tasks; + do { + try { + nextTasks.forEach((r) -> { + r.run(); + }); + if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { + nextTasks = obtainTasks(); + } else break; + } catch (Throwable t) { + handleError(t); + } + } while(true); handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS); writer.addData(HS_TRIGGER); resumeActivity(); diff -r 2d423c9b73bb -r 8d4770c22b63 test/jdk/com/sun/net/httpserver/EchoHandler.java --- a/test/jdk/com/sun/net/httpserver/EchoHandler.java Fri Dec 01 19:25:34 2017 +0000 +++ b/test/jdk/com/sun/net/httpserver/EchoHandler.java Sat Dec 02 17:40:57 2017 +0000 @@ -66,8 +66,8 @@ t.sendResponseHeaders(200, in.length); OutputStream os = t.getResponseBody(); os.write(in); - close(os); - close(is); + close(t, os); + close(t, is); } else { OutputStream os = t.getResponseBody(); byte[] buf = new byte[64 * 1024]; @@ -84,15 +84,21 @@ String s = Integer.toString(count); os.write(s.getBytes()); } - close(os); - close(is); + close(t, os); + close(t, is); } } protected void close(OutputStream os) throws IOException { - os.close(); + os.close(); } protected void close(InputStream is) throws IOException { - is.close(); + is.close(); + } + protected void close(HttpExchange t, OutputStream os) throws IOException { + close(os); + } + protected void close(HttpExchange t, InputStream is) throws IOException { + close(is); } } diff -r 2d423c9b73bb -r 8d4770c22b63 test/jdk/java/net/httpclient/AbstractNoBody.java --- a/test/jdk/java/net/httpclient/AbstractNoBody.java Fri Dec 01 19:25:34 2017 +0000 +++ b/test/jdk/java/net/httpclient/AbstractNoBody.java Sat Dec 02 17:40:57 2017 +0000 @@ -25,6 +25,7 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; @@ -57,7 +58,8 @@ static final String SIMPLE_STRING = "Hello world. Goodbye world"; static final int ITERATION_COUNT = 10; // a shared executor helps reduce the amount of threads created by the test - static final Executor executor = Executors.newCachedThreadPool(); + static final Executor executor = Executors.newFixedThreadPool(ITERATION_COUNT * 2); + static final ExecutorService serverExecutor = Executors.newFixedThreadPool(ITERATION_COUNT * 4); @DataProvider(name = "variants") public Object[][] variants() { @@ -91,6 +93,7 @@ @BeforeTest public void setup() throws Exception { + printStamp(START, "setup"); sslContext = new SimpleSSLContext().get(); if (sslContext == null) throw new AssertionError("Unexpected null sslContext"); @@ -100,12 +103,14 @@ HttpHandler h1_chunkNoBodyHandler = new HTTP1_ChunkedNoBodyHandler(); InetSocketAddress sa = new InetSocketAddress("localhost", 0); httpTestServer = HttpServer.create(sa, 0); + httpTestServer.setExecutor(serverExecutor); httpTestServer.createContext("/http1/noBodyFixed", h1_fixedLengthNoBodyHandler); httpTestServer.createContext("/http1/noBodyChunk", h1_chunkNoBodyHandler); httpURI_fixed = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/noBodyFixed"; httpURI_chunk = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/noBodyChunk"; httpsTestServer = HttpsServer.create(sa, 0); + httpsTestServer.setExecutor(serverExecutor); httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext)); httpsTestServer.createContext("/https1/noBodyFixed", h1_fixedLengthNoBodyHandler); httpsTestServer.createContext("/https1/noBodyChunk", h1_chunkNoBodyHandler); @@ -116,14 +121,14 @@ Http2Handler h2_fixedLengthNoBodyHandler = new HTTP2_FixedLengthNoBodyHandler(); Http2Handler h2_chunkedNoBodyHandler = new HTTP2_ChunkedNoBodyHandler(); - http2TestServer = new Http2TestServer("127.0.0.1", false, 0); + http2TestServer = new Http2TestServer("127.0.0.1", false, 0, serverExecutor, null); http2TestServer.addHandler(h2_fixedLengthNoBodyHandler, "/http2/noBodyFixed"); http2TestServer.addHandler(h2_chunkedNoBodyHandler, "/http2/noBodyChunk"); int port = http2TestServer.getAddress().getPort(); http2URI_fixed = "http://127.0.0.1:" + port + "/http2/noBodyFixed"; http2URI_chunk = "http://127.0.0.1:" + port + "/http2/noBodyChunk"; - https2TestServer = new Http2TestServer("127.0.0.1", true, 0); + https2TestServer = new Http2TestServer("127.0.0.1", true, 0, serverExecutor, sslContext); https2TestServer.addHandler(h2_fixedLengthNoBodyHandler, "/https2/noBodyFixed"); https2TestServer.addHandler(h2_chunkedNoBodyHandler, "/https2/noBodyChunk"); port = https2TestServer.getAddress().getPort(); @@ -134,16 +139,34 @@ httpsTestServer.start(); http2TestServer.start(); https2TestServer.start(); + printStamp(END,"setup"); } @AfterTest public void teardown() throws Exception { + printStamp(START, "teardown"); httpTestServer.stop(0); httpsTestServer.stop(0); http2TestServer.stop(); https2TestServer.stop(); + printStamp(END, "teardown"); } + static final long start = System.nanoTime(); + static final String START = "start"; + static final String END = "end "; + static long elapsed() { return (System.nanoTime() - start)/1000_000;} + void printStamp(String what, String fmt, Object... args) { + long elapsed = elapsed(); + long sec = elapsed/1000; + long ms = elapsed % 1000; + String time = sec > 0 ? sec + "sec " : ""; + time = time + ms + "ms"; + System.out.printf("%s: %s \t [%s]\t %s%n", + getClass().getSimpleName(), what, time, String.format(fmt,args)); + } + + static class HTTP1_FixedLengthNoBodyHandler implements HttpHandler { @Override public void handle(HttpExchange t) throws IOException { diff -r 2d423c9b73bb -r 8d4770c22b63 test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java --- a/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java Fri Dec 01 19:25:34 2017 +0000 +++ b/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java Sat Dec 02 17:40:57 2017 +0000 @@ -178,6 +178,7 @@ try { return is.readAllBytes(); } catch (IOException io) { + io.printStackTrace(); throw new CompletionException(io); } } diff -r 2d423c9b73bb -r 8d4770c22b63 test/jdk/java/net/httpclient/ManyRequests.java --- a/test/jdk/java/net/httpclient/ManyRequests.java Fri Dec 01 19:25:34 2017 +0000 +++ b/test/jdk/java/net/httpclient/ManyRequests.java Sat Dec 02 17:40:57 2017 +0000 @@ -107,19 +107,23 @@ System.out.println("Server: received " + e.getRequestURI()); super.handle(e); } - protected void close(OutputStream os) throws IOException { + @Override + protected void close(HttpExchange t, OutputStream os) throws IOException { if (INSERT_DELAY) { try { Thread.sleep(rand.nextInt(200)); } catch (InterruptedException e) {} } - os.close(); + System.out.println("Server: close outbound: " + t.getRequestURI()); + super.close(t, os); } - protected void close(InputStream is) throws IOException { + @Override + protected void close(HttpExchange t, InputStream is) throws IOException { if (INSERT_DELAY) { try { Thread.sleep(rand.nextInt(200)); } catch (InterruptedException e) {} } - is.close(); + System.out.println("Server: close inbound: " + t.getRequestURI()); + super.close(t, is); } } diff -r 2d423c9b73bb -r 8d4770c22b63 test/jdk/java/net/httpclient/ManyRequestsLegacy.java --- a/test/jdk/java/net/httpclient/ManyRequestsLegacy.java Fri Dec 01 19:25:34 2017 +0000 +++ b/test/jdk/java/net/httpclient/ManyRequestsLegacy.java Sat Dec 02 17:40:57 2017 +0000 @@ -204,19 +204,21 @@ super.handle(e); } @Override - protected void close(OutputStream os) throws IOException { + protected void close(HttpExchange t, OutputStream os) throws IOException { if (INSERT_DELAY) { try { Thread.sleep(rand.nextInt(200)); } catch (InterruptedException e) {} } + System.out.println("Server: close outbound: " + t.getRequestURI()); os.close(); } @Override - protected void close(InputStream is) throws IOException { + protected void close(HttpExchange t, InputStream is) throws IOException { if (INSERT_DELAY) { try { Thread.sleep(rand.nextInt(200)); } catch (InterruptedException e) {} } + System.out.println("Server: close inbound: " + t.getRequestURI()); is.close(); } } diff -r 2d423c9b73bb -r 8d4770c22b63 test/jdk/java/net/httpclient/MockServer.java --- a/test/jdk/java/net/httpclient/MockServer.java Fri Dec 01 19:25:34 2017 +0000 +++ b/test/jdk/java/net/httpclient/MockServer.java Sat Dec 02 17:40:57 2017 +0000 @@ -45,7 +45,7 @@ */ public class MockServer extends Thread implements Closeable { - ServerSocket ss; + final ServerSocket ss; private final List sockets; private final List removals; private final List additions; @@ -296,20 +296,32 @@ @Override public void run() { - while (!closed) { - try { - System.out.println("Server waiting for connection"); - Socket s = ss.accept(); - Connection c = new Connection(s); - c.start(); - System.out.println("Server got new connection: " + c); - synchronized (additions) { - additions.add(c); + try { + while (!closed) { + try { + System.out.println("Server waiting for connection"); + Socket s = ss.accept(); + Connection c = new Connection(s); + c.start(); + System.out.println("Server got new connection: " + c); + synchronized (additions) { + additions.add(c); + } + } catch (IOException e) { + if (closed) + return; + e.printStackTrace(System.out); } - } catch (IOException e) { - if (closed) - return; - e.printStackTrace(); + } + } catch (Throwable t) { + System.out.println("Unexpected exception in accept loop: " + t); + t.printStackTrace(System.out); + } finally { + if (closed) { + System.out.println("Server closed: exiting accept loop"); + } else { + System.out.println("Server not closed: exiting accept loop and closing"); + close(); } } } diff -r 2d423c9b73bb -r 8d4770c22b63 test/jdk/java/net/httpclient/NoBodyPartOne.java --- a/test/jdk/java/net/httpclient/NoBodyPartOne.java Fri Dec 01 19:25:34 2017 +0000 +++ b/test/jdk/java/net/httpclient/NoBodyPartOne.java Sat Dec 02 17:40:57 2017 +0000 @@ -55,6 +55,7 @@ @Test(dataProvider = "variants") public void testAsString(String uri, boolean sameClient) throws Exception { + printStamp(START, "testAsString(\"%s\", %s)", uri, sameClient); HttpClient client = null; for (int i=0; i< ITERATION_COUNT; i++) { if (!sameClient || client == null) @@ -68,10 +69,13 @@ String body = response.body(); assertEquals(body, ""); } + // We have created many clients here. Try to speed up their release. + if (!sameClient) System.gc(); } @Test(dataProvider = "variants") public void testAsFile(String uri, boolean sameClient) throws Exception { + printStamp(START, "testAsFile(\"%s\", %s)", uri, sameClient); HttpClient client = null; for (int i=0; i< ITERATION_COUNT; i++) { if (!sameClient || client == null) @@ -86,10 +90,13 @@ assertTrue(Files.exists(bodyPath)); assertEquals(Files.size(bodyPath), 0); } + // We have created many clients here. Try to speed up their release. + if (!sameClient) System.gc(); } @Test(dataProvider = "variants") public void testAsByteArray(String uri, boolean sameClient) throws Exception { + printStamp(START, "testAsByteArray(\"%s\", %s)", uri, sameClient); HttpClient client = null; for (int i=0; i< ITERATION_COUNT; i++) { if (!sameClient || client == null) @@ -102,5 +109,7 @@ byte[] body = response.body(); assertEquals(body.length, 0); } + // We have created many clients here. Try to speed up their release. + if (!sameClient) System.gc(); } } diff -r 2d423c9b73bb -r 8d4770c22b63 test/jdk/java/net/httpclient/NoBodyPartTwo.java --- a/test/jdk/java/net/httpclient/NoBodyPartTwo.java Fri Dec 01 19:25:34 2017 +0000 +++ b/test/jdk/java/net/httpclient/NoBodyPartTwo.java Sat Dec 02 17:40:57 2017 +0000 @@ -57,6 +57,7 @@ volatile boolean consumerHasBeenCalled; @Test(dataProvider = "variants") public void testAsByteArrayConsumer(String uri, boolean sameClient) throws Exception { + printStamp(START, "testAsByteArrayConsumer(\"%s\", %s)", uri, sameClient); HttpClient client = null; for (int i=0; i< ITERATION_COUNT; i++) { if (!sameClient || client == null) @@ -73,10 +74,13 @@ client.send(req, asByteArrayConsumer(consumer)); assertTrue(consumerHasBeenCalled); } + // We have created many clients here. Try to speed up their release. + if (!sameClient) System.gc(); } @Test(dataProvider = "variants") public void testAsInputStream(String uri, boolean sameClient) throws Exception { + printStamp(START, "testAsInputStream(\"%s\", %s)", uri, sameClient); HttpClient client = null; for (int i=0; i< ITERATION_COUNT; i++) { if (!sameClient || client == null) @@ -89,10 +93,13 @@ byte[] body = response.body().readAllBytes(); assertEquals(body.length, 0); } + // We have created many clients here. Try to speed up their release. + if (!sameClient) System.gc(); } @Test(dataProvider = "variants") public void testBuffering(String uri, boolean sameClient) throws Exception { + printStamp(START, "testBuffering(\"%s\", %s)", uri, sameClient); HttpClient client = null; for (int i=0; i< ITERATION_COUNT; i++) { if (!sameClient || client == null) @@ -105,10 +112,13 @@ byte[] body = response.body(); assertEquals(body.length, 0); } + // We have created many clients here. Try to speed up their release. + if (!sameClient) System.gc(); } @Test(dataProvider = "variants") public void testDiscard(String uri, boolean sameClient) throws Exception { + printStamp(START, "testDiscard(\"%s\", %s)", uri, sameClient); HttpClient client = null; for (int i=0; i< ITERATION_COUNT; i++) { if (!sameClient || client == null) @@ -121,5 +131,7 @@ HttpResponse response = client.send(req, discard(obj)); assertEquals(response.body(), obj); } + // We have created many clients here. Try to speed up their release. + if (!sameClient) System.gc(); } } diff -r 2d423c9b73bb -r 8d4770c22b63 test/jdk/java/net/httpclient/http2/server/Queue.java --- a/test/jdk/java/net/httpclient/http2/server/Queue.java Fri Dec 01 19:25:34 2017 +0000 +++ b/test/jdk/java/net/httpclient/http2/server/Queue.java Sat Dec 02 17:40:57 2017 +0000 @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.LinkedList; +import java.util.Objects; import java.util.stream.Stream; // Each stream has one of these for input. Each Http2Connection has one @@ -40,7 +41,7 @@ private final T closeSentinel; Queue(T closeSentinel) { - this.closeSentinel = closeSentinel; + this.closeSentinel = Objects.requireNonNull(closeSentinel); } public synchronized int size() { @@ -48,6 +49,7 @@ } public synchronized void put(T obj) throws IOException { + Objects.requireNonNull(obj); if (closed || closing) { throw new IOException("stream closed"); } diff -r 2d423c9b73bb -r 8d4770c22b63 test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java --- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java Fri Dec 01 19:25:34 2017 +0000 +++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java Sat Dec 02 17:40:57 2017 +0000 @@ -55,6 +55,7 @@ import java.security.cert.CertificateException; import java.util.List; import java.util.Queue; +import java.util.Random; import java.util.StringTokenizer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -69,6 +70,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; @Test public class SSLTubeTest { @@ -76,6 +79,17 @@ private static final long COUNTER = 600; private static final int LONGS_PER_BUF = 800; private static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF; + public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0); + + static final Random rand = new Random(); + + static int randomRange(int lower, int upper) { + if (lower > upper) + throw new IllegalArgumentException("lower > upper"); + int diff = upper - lower; + int r = lower + rand.nextInt(diff); + return r - (r % 8); // round down to multiple of 8 (align for longs) + } private static ByteBuffer getBuffer(long startingAt) { ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8); @@ -86,18 +100,31 @@ return buf; } - @Test(timeOut = 30000) - public void run() throws IOException { - /* Start of wiring */ + @Test + public void runWithSSLLoopackServer() throws IOException { ExecutorService sslExecutor = Executors.newCachedThreadPool(); + + /* Start of wiring */ /* Emulates an echo server */ -// FlowTube server = new SSLTube(createSSLEngine(false), -// sslExecutor, -// new EchoTube(16)); SSLLoopbackSubscriber server = new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor); server.start(); + run(server, sslExecutor); + } + + @Test + public void runWithEchoServer() throws IOException { + ExecutorService sslExecutor = Executors.newCachedThreadPool(); + + /* Start of wiring */ + /* Emulates an echo server */ + FlowTube server = crossOverEchoServer(sslExecutor); + + run(server, sslExecutor); + } + + private void run(FlowTube server, ExecutorService sslExecutor) throws IOException { FlowTube client = new SSLTube(createSSLEngine(true), sslExecutor, server); @@ -128,6 +155,9 @@ } } + /** + * This is a copy of the SSLLoopbackSubscriber used in FlowTest + */ static class SSLLoopbackSubscriber implements FlowTube { private final BlockingQueue buffer; private final Socket clientSock; @@ -173,7 +203,7 @@ private void clientReader() { try { InputStream is = clientSock.getInputStream(); - final int bufsize = FlowTest.randomRange(512, 16 * 1024); + final int bufsize = randomRange(512, 16 * 1024); System.out.println("clientReader: bufsize = " + bufsize); while (true) { byte[] buf = new byte[bufsize]; @@ -206,7 +236,7 @@ while (true) { ByteBuffer buf = buffer.take(); - if (buf == FlowTest.SENTINEL) { + if (buf == SENTINEL) { // finished //Utils.sleep(2000); System.out.println("clientWriter close: " + nbytes + " written"); @@ -249,7 +279,7 @@ try { InputStream is = serverSock.getInputStream(); OutputStream os = serverSock.getOutputStream(); - final int bufsize = FlowTest.randomRange(512, 16 * 1024); + final int bufsize = randomRange(512, 16 * 1024); System.out.println("serverLoopback: bufsize = " + bufsize); byte[] bb = new byte[bufsize]; while (true) { @@ -305,7 +335,7 @@ @Override public void onComplete() { try { - buffer.put(FlowTest.SENTINEL); + buffer.put(SENTINEL); } catch (InterruptedException e) { e.printStackTrace(); Utils.close(clientSock); @@ -330,150 +360,366 @@ } } -// private static final class EchoTube implements FlowTube { -// -// private final static Object EOF = new Object(); -// private final Executor executor = Executors.newSingleThreadExecutor(); -// -// private final Queue queue = new ConcurrentLinkedQueue<>(); -// private final int maxQueueSize; -// private final SequentialScheduler processingScheduler = -// new SequentialScheduler(createProcessingTask()); -// -// /* Writing into this tube */ -// private long unfulfilled; -// private Flow.Subscription subscription; -// -// /* Reading from this tube */ -// private final Demand demand = new Demand(); -// private final AtomicBoolean cancelled = new AtomicBoolean(); -// private Flow.Subscriber> subscriber; -// -// private EchoTube(int maxBufferSize) { -// if (maxBufferSize < 1) -// throw new IllegalArgumentException(); -// this.maxQueueSize = maxBufferSize; -// } -// -// @Override -// public void subscribe(Flow.Subscriber> subscriber) { -// this.subscriber = subscriber; -// System.out.println("EchoTube got subscriber: " + subscriber); -// this.subscriber.onSubscribe(new InternalSubscription()); -// } -// -// @Override -// public void onSubscribe(Flow.Subscription subscription) { -// unfulfilled = maxQueueSize; -// System.out.println("EchoTube request: " + maxQueueSize); -// (this.subscription = subscription).request(maxQueueSize); -// } -// -// @Override -// public void onNext(List item) { -// if (--unfulfilled == (maxQueueSize / 2)) { -// long req = maxQueueSize - unfulfilled; -// subscription.request(req); -// System.out.println("EchoTube request: " + req); -// unfulfilled = maxQueueSize; -// } -// System.out.println("EchoTube add " + Utils.remaining(item)); -// queue.add(item); -// processingScheduler.deferOrSchedule(executor); -// } -// -// @Override -// public void onError(Throwable throwable) { -// System.out.println("EchoTube add " + throwable); -// queue.add(throwable); -// processingScheduler.deferOrSchedule(executor); -// } -// -// @Override -// public void onComplete() { -// System.out.println("EchoTube add EOF"); -// queue.add(EOF); -// processingScheduler.deferOrSchedule(executor); -// } -// -// @Override -// public boolean isFinished() { -// return false; -// } -// -// private class InternalSubscription implements Flow.Subscription { -// -// @Override -// public void request(long n) { -// System.out.println("EchoTube got request: " + n); -// if (n <= 0) { -// throw new InternalError(); -// } -// demand.increase(n); -// processingScheduler.runOrSchedule(); -// } -// -// @Override -// public void cancel() { -// cancelled.set(true); -// } -// } -// -// @Override -// public String toString() { -// return "EchoTube"; -// } -// -// private SequentialScheduler.RestartableTask createProcessingTask() { -// return new SequentialScheduler.CompleteRestartableTask() { -// -// @Override -// protected void run() { -// try { -// while (!cancelled.get()) { -// Object item = queue.peek(); -// if (item == null) -// return; -// try { -// System.out.println("EchoTube processing item"); -// if (item instanceof List) { -// if (!demand.tryDecrement()) { -// System.out.println("EchoTube no demand"); -// return; -// } -// @SuppressWarnings("unchecked") -// List bytes = (List) item; -// Object removed = queue.remove(); -// assert removed == item; -// System.out.println("EchoTube processing " -// + Utils.remaining(bytes)); -// subscriber.onNext(bytes); -// } else if (item instanceof Throwable) { -// cancelled.set(true); -// Object removed = queue.remove(); -// assert removed == item; -// System.out.println("EchoTube processing " + item); -// subscriber.onError((Throwable) item); -// } else if (item == EOF) { -// cancelled.set(true); -// Object removed = queue.remove(); -// assert removed == item; -// System.out.println("EchoTube processing EOF"); -// subscriber.onComplete(); -// } else { -// throw new InternalError(String.valueOf(item)); -// } -// } finally { -// } -// } -// } catch(Throwable t) { -// t.printStackTrace(); -// throw t; -// } -// } -// }; -// } -// } + + /** + * Creates a cross-over FlowTube than can be plugged into a client-side + * SSLTube (in place of the SSLLoopbackSubscriber). + * Note that the only method that can be called on the return tube + * is connectFlows(). Calling any other method will trigger an + * InternalError. + * @param sslExecutor an executor + * @return a cross-over FlowTube connected to an EchoTube. + * @throws IOException + */ + FlowTube crossOverEchoServer(Executor sslExecutor) throws IOException { + LateBindingTube crossOver = new LateBindingTube(); + FlowTube server = new SSLTube(createSSLEngine(false), + sslExecutor, + crossOver); + EchoTube echo = new EchoTube(6); + server.connectFlows(FlowTube.asTubePublisher(echo), FlowTube.asTubeSubscriber(echo)); + + return new CrossOverTube(crossOver); + } + + /** + * A cross-over FlowTube that makes it possible to reverse the direction + * of flows. The typical usage is to connect an two opposite SSLTube, + * one encrypting, one decrypting, to e.g. an EchoTube, with the help + * of a LateBindingTube: + * {@code + * client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube + * } + *

+ * Note that the only method that can be called on the CrossOverTube is + * connectFlows(). Calling any other method will cause an InternalError to + * be thrown. + * Also connectFlows() can be called only once. + */ + private static final class CrossOverTube implements FlowTube { + final LateBindingTube tube; + CrossOverTube(LateBindingTube tube) { + this.tube = tube; + } + + @Override + public void subscribe(Flow.Subscriber> subscriber) { + throw newInternalError(); + } + + @Override + public void connectFlows(TubePublisher writePublisher, TubeSubscriber readSubscriber) { + tube.start(writePublisher, readSubscriber); + } + + @Override + public boolean isFinished() { + return tube.isFinished(); + } + + Error newInternalError() { + InternalError error = new InternalError(); + error.printStackTrace(System.out); + return error; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + throw newInternalError(); + } + + @Override + public void onError(Throwable throwable) { + throw newInternalError(); + } + + @Override + public void onComplete() { + throw newInternalError(); + } + + @Override + public void onNext(List item) { + throw newInternalError(); + } + } + + /** + * A late binding tube that makes it possible to create an + * SSLTube before the right-hand-side tube has been created. + * The typical usage is to make it possible to connect two + * opposite SSLTube (one encrypting, one decrypting) through a + * CrossOverTube: + * {@code + * client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube + * } + *

+ * Note that this class only supports a single call to start(): it cannot be + * subscribed more than once from its left-hand-side (the cross over tube side). + */ + private static class LateBindingTube implements FlowTube { + + final CompletableFuture>> futurePublisher + = new CompletableFuture<>(); + final ConcurrentLinkedQueue>>> queue + = new ConcurrentLinkedQueue<>(); + AtomicReference>> subscriberRef = new AtomicReference<>(); + SequentialScheduler scheduler = SequentialScheduler.synchronizedScheduler(this::loop); + AtomicReference errorRef = new AtomicReference<>(); + private volatile boolean finished; + private volatile boolean completed; + + + public void start(Flow.Publisher> publisher, + Flow.Subscriber> subscriber) { + subscriberRef.set(subscriber); + futurePublisher.complete(publisher); + scheduler.runOrSchedule(); + } + + @Override + public boolean isFinished() { + return finished; + } + + @Override + public void subscribe(Flow.Subscriber> subscriber) { + futurePublisher.thenAccept((p) -> p.subscribe(subscriber)); + scheduler.runOrSchedule(); + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + queue.add((s) -> s.onSubscribe(subscription)); + scheduler.runOrSchedule(); + } + + @Override + public void onNext(List item) { + queue.add((s) -> s.onNext(item)); + scheduler.runOrSchedule(); + } + + @Override + public void onError(Throwable throwable) { + System.out.println("LateBindingTube onError"); + throwable.printStackTrace(System.out); + queue.add((s) -> { + errorRef.compareAndSet(null, throwable); + try { + System.out.println("LateBindingTube subscriber onError: " + throwable); + s.onError(errorRef.get()); + } finally { + finished = true; + System.out.println("LateBindingTube finished"); + } + }); + scheduler.runOrSchedule(); + } + + @Override + public void onComplete() { + System.out.println("LateBindingTube completing"); + queue.add((s) -> { + completed = true; + try { + System.out.println("LateBindingTube complete subscriber"); + s.onComplete(); + } finally { + finished = true; + System.out.println("LateBindingTube finished"); + } + }); + scheduler.runOrSchedule(); + } + + private void loop() { + if (finished) { + scheduler.stop(); + return; + } + Flow.Subscriber> subscriber = subscriberRef.get(); + if (subscriber == null) return; + try { + Consumer>> s; + while ((s = queue.poll()) != null) { + s.accept(subscriber); + } + } catch (Throwable t) { + if (errorRef.compareAndSet(null, t)) { + onError(t); + } + } + } + } + + /** + * An echo tube that just echoes back whatever bytes it receives. + * This cannot be plugged to the right-hand-side of an SSLTube + * since handshake data cannot be simply echoed back, and + * application data most likely also need to be decrypted and + * re-encrypted. + */ + private static final class EchoTube implements FlowTube { + + private final static Object EOF = new Object(); + private final Executor executor = Executors.newSingleThreadExecutor(); + + private final Queue queue = new ConcurrentLinkedQueue<>(); + private final int maxQueueSize; + private final SequentialScheduler processingScheduler = + new SequentialScheduler(createProcessingTask()); + + /* Writing into this tube */ + private volatile long requested; + private Flow.Subscription subscription; + + /* Reading from this tube */ + private final Demand demand = new Demand(); + private final AtomicBoolean cancelled = new AtomicBoolean(); + private Flow.Subscriber> subscriber; + + private EchoTube(int maxBufferSize) { + if (maxBufferSize < 1) + throw new IllegalArgumentException(); + this.maxQueueSize = maxBufferSize; + } + + @Override + public void subscribe(Flow.Subscriber> subscriber) { + this.subscriber = subscriber; + System.out.println("EchoTube got subscriber: " + subscriber); + this.subscriber.onSubscribe(new InternalSubscription()); + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + System.out.println("EchoTube request: " + maxQueueSize); + (this.subscription = subscription).request(requested = maxQueueSize); + } + + private void requestMore() { + Flow.Subscription s = subscription; + if (s == null || cancelled.get()) return; + long unfulfilled = queue.size() + --requested; + if (unfulfilled <= maxQueueSize/2) { + long req = maxQueueSize - unfulfilled; + requested += req; + s.request(req); + System.out.printf("EchoTube request: %s [requested:%s, queue:%s, unfulfilled:%s]%n", + req, requested-req, queue.size(), unfulfilled ); + } + } + + @Override + public void onNext(List item) { + System.out.printf("EchoTube add %s [requested:%s, queue:%s]%n", + Utils.remaining(item), requested, queue.size()); + queue.add(item); + processingScheduler.deferOrSchedule(executor); + } + + @Override + public void onError(Throwable throwable) { + System.out.println("EchoTube add " + throwable); + queue.add(throwable); + processingScheduler.deferOrSchedule(executor); + } + + @Override + public void onComplete() { + System.out.println("EchoTube add EOF"); + queue.add(EOF); + processingScheduler.deferOrSchedule(executor); + } + + @Override + public boolean isFinished() { + return cancelled.get(); + } + + private class InternalSubscription implements Flow.Subscription { + + @Override + public void request(long n) { + System.out.println("EchoTube got request: " + n); + if (n <= 0) { + throw new InternalError(); + } + if (demand.increase(n)) { + processingScheduler.deferOrSchedule(executor); + } + } + + @Override + public void cancel() { + cancelled.set(true); + } + } + + @Override + public String toString() { + return "EchoTube"; + } + + int transmitted = 0; + private SequentialScheduler.RestartableTask createProcessingTask() { + return new SequentialScheduler.CompleteRestartableTask() { + + @Override + protected void run() { + try { + while (!cancelled.get()) { + Object item = queue.peek(); + if (item == null) { + System.out.printf("EchoTube: queue empty, requested=%s, demand=%s, transmitted=%s%n", + requested, demand.get(), transmitted); + requestMore(); + return; + } + try { + System.out.printf("EchoTube processing item, requested=%s, demand=%s, transmitted=%s%n", + requested, demand.get(), transmitted); + if (item instanceof List) { + if (!demand.tryDecrement()) { + System.out.println("EchoTube no demand"); + return; + } + @SuppressWarnings("unchecked") + List bytes = (List) item; + Object removed = queue.remove(); + assert removed == item; + System.out.println("EchoTube processing " + + Utils.remaining(bytes)); + transmitted++; + subscriber.onNext(bytes); + requestMore(); + } else if (item instanceof Throwable) { + cancelled.set(true); + Object removed = queue.remove(); + assert removed == item; + System.out.println("EchoTube processing " + item); + subscriber.onError((Throwable) item); + } else if (item == EOF) { + cancelled.set(true); + Object removed = queue.remove(); + assert removed == item; + System.out.println("EchoTube processing EOF"); + subscriber.onComplete(); + } else { + throw new InternalError(String.valueOf(item)); + } + } finally { + } + } + } catch(Throwable t) { + t.printStackTrace(); + throw t; + } + } + }; + } + } /** * The final subscriber which receives the decrypted looped-back data. Just @@ -518,13 +764,13 @@ if (--unfulfilled == (REQUEST_WINDOW / 2)) { long req = REQUEST_WINDOW - unfulfilled; System.out.println("EndSubscriber request " + req); + unfulfilled = REQUEST_WINDOW; subscription.request(req); - unfulfilled = REQUEST_WINDOW; } long currval = counter.get(); if (currval % 500 == 0) { - System.out.println("End: " + currval); + System.out.println("EndSubscriber: " + currval); } System.out.println("EndSubscriber onNext " + Utils.remaining(buffers));