# HG changeset patch # User prappo # Date 1512329663 -10800 # Node ID b7e186aa19158f659cfa08a7c51dfabb46436db7 # Parent 952aca3f605aef5a66b023f19392b9b42d5ad5a6# Parent 8d4770c22b636728f423ee2c5b197e66ec48912b http-client-branch: merge diff -r 952aca3f605a -r b7e186aa1915 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java Thu Nov 30 22:27:18 2017 +0300 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java Sun Dec 03 22:34:23 2017 +0300 @@ -129,6 +129,8 @@ Set> waiters = waiting.remove(key); debug.log(Level.DEBUG, "Opening completed: %s", key); opening.remove(key); + if (t == null && conn != null) + putConnection(conn); final Throwable cause = Utils.getCompletionCause(t); if (waiters == null) { debug.log(Level.DEBUG, "no dependent to wake up"); diff -r 952aca3f605a -r b7e186aa1915 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Thu Nov 30 22:27:18 2017 +0300 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Sun Dec 03 22:34:23 2017 +0300 @@ -77,9 +77,9 @@ @Override public void handle() { - assert !connected : "Already connected"; - assert !chan.isBlocking() : "Unexpected blocking channel"; try { + assert !connected : "Already connected"; + assert !chan.isBlocking() : "Unexpected blocking channel"; debug.log(Level.DEBUG, "ConnectEvent: finishing connect"); boolean finished = chan.finishConnect(); assert finished : "Expected channel to be connected"; @@ -88,7 +88,7 @@ connected = true; // complete async since the event runs on the SelectorManager thread cf.completeAsync(() -> null, client().theExecutor()); - } catch (IOException e) { + } catch (Throwable e) { client().theExecutor().execute( () -> cf.completeExceptionally(e)); } } @@ -102,10 +102,10 @@ @Override public CompletableFuture connectAsync() { - assert !connected : "Already connected"; - assert !chan.isBlocking() : "Unexpected blocking channel"; CompletableFuture cf = new MinimalFuture<>(); try { + assert !connected : "Already connected"; + assert !chan.isBlocking() : "Unexpected blocking channel"; boolean finished = false; PrivilegedExceptionAction pa = () -> chan.connect(address); try { diff -r 952aca3f605a -r b7e186aa1915 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Thu Nov 30 22:27:18 2017 +0300 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Sun Dec 03 22:34:23 2017 +0300 @@ -402,6 +402,7 @@ try { if (contentLength == 0) { pusher.onComplete(); + onFinished.run(); onComplete.accept(null); } } catch (Throwable t) { diff -r 952aca3f605a -r b7e186aa1915 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Thu Nov 30 22:27:18 2017 +0300 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Sun Dec 03 22:34:23 2017 +0300 @@ -320,6 +320,8 @@ DEBUG_LOGGER.log(Level.DEBUG, "Increased demand by 1"); s.request(1); } + assert currentListItr != null; + if (lb.isEmpty()) continue; } assert currentListItr != null; assert currentListItr.hasNext(); @@ -359,27 +361,38 @@ @Override public void onSubscribe(Flow.Subscription s) { - if (!subscribed.compareAndSet(false, true)) { - s.cancel(); - } else { - // check whether the stream is already closed. - // if so, we should cancel the subscription - // immediately. - boolean closed; - synchronized(this) { - closed = this.closed; - if (!closed) { - this.subscription = s; + try { + if (!subscribed.compareAndSet(false, true)) { + s.cancel(); + } else { + // check whether the stream is already closed. + // if so, we should cancel the subscription + // immediately. + boolean closed; + synchronized (this) { + closed = this.closed; + if (!closed) { + this.subscription = s; + } } - } - if (closed) { - s.cancel(); - return; + if (closed) { + s.cancel(); + return; + } + assert buffers.remainingCapacity() > 1; // should contain at least 2 + DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting " + + Math.max(1, buffers.remainingCapacity() - 1)); + s.request(Math.max(1, buffers.remainingCapacity() - 1)); } - assert buffers.remainingCapacity() > 1; // should contain at least 2 - DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting " - + Math.max(1, buffers.remainingCapacity() - 1)); - s.request(Math.max(1, buffers.remainingCapacity() - 1)); + } catch (Throwable t) { + failed = t; + try { + close(); + } catch (IOException x) { + // OK + } finally { + onError(t); + } } } @@ -392,12 +405,14 @@ throw new IllegalStateException("queue is full"); } DEBUG_LOGGER.log(Level.DEBUG, "item offered"); - } catch (Exception ex) { + } catch (Throwable ex) { failed = ex; try { close(); } catch (IOException ex1) { // OK + } finally { + onError(ex); } } } diff -r 952aca3f605a -r b7e186aa1915 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Thu Nov 30 22:27:18 2017 +0300 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Sun Dec 03 22:34:23 2017 +0300 @@ -238,10 +238,7 @@ public void incoming(List buffers, boolean complete) { debugr.log(Level.DEBUG, () -> "Adding " + Utils.remaining(buffers) + " bytes to read buffer"); - addToReadBuf(buffers); - if (complete) { - this.completing = true; - } + addToReadBuf(buffers, complete); scheduler.runOrSchedule(); } @@ -269,7 +266,7 @@ } // readBuf is kept ready for reading outside of this method - private void addToReadBuf(List buffers) { + private void addToReadBuf(List buffers, boolean complete) { synchronized (readBufferLock) { for (ByteBuffer buf : buffers) { readBuf.compact(); @@ -278,6 +275,9 @@ readBuf.put(buf); readBuf.flip(); } + if (complete) { + this.completing = complete; + } } } @@ -297,19 +297,32 @@ try { debugr.log(Level.DEBUG, () -> "processData: " + readBuf.remaining() + " bytes to unwrap " - + states(handshakeState)); - - while (readBuf.hasRemaining()) { + + states(handshakeState) + + ", " + engine.getHandshakeStatus()); + int len; + boolean completing = false; + while ((len = readBuf.remaining()) > 0) { boolean handshaking = false; try { EngineResult result; synchronized (readBufferLock) { + completing = this.completing; result = unwrapBuffer(readBuf); debugr.log(Level.DEBUG, "Unwrapped: %s", result.result); } + if (result.bytesProduced() > 0) { + debugr.log(Level.DEBUG, "sending %d", result.bytesProduced()); + count.addAndGet(result.bytesProduced()); + outgoing(result.destBuffer, false); + } if (result.status() == Status.BUFFER_UNDERFLOW) { debugr.log(Level.DEBUG, "BUFFER_UNDERFLOW"); - return; + // not enough data in the read buffer... + synchronized (readBufferLock) { + // check if we have received some data + if (readBuf.remaining() > len) continue; + return; + } } if (completing && result.status() == Status.CLOSED) { debugr.log(Level.DEBUG, "Closed: completing"); @@ -328,11 +341,6 @@ resumeActivity(); } } - if (result.bytesProduced() > 0) { - debugr.log(Level.DEBUG, "sending %d", result.bytesProduced()); - count.addAndGet(result.bytesProduced()); - outgoing(result.destBuffer, false); - } } catch (IOException ex) { errorCommon(ex); handleError(ex); @@ -340,6 +348,11 @@ if (handshaking && !completing) return; } + if (!completing) { + synchronized (readBufferLock) { + completing = this.completing && !readBuf.hasRemaining(); + } + } if (completing) { debugr.log(Level.DEBUG, "completing"); // Complete the alpnCF, if not already complete, regardless of @@ -443,6 +456,7 @@ assert complete ? buffers == Utils.EMPTY_BB_LIST : true; assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true; if (complete) { + debugw.log(Level.DEBUG, "adding SENTINEL"); writeList.add(SENTINEL); } else { writeList.addAll(buffers); @@ -507,7 +521,8 @@ try { debugw.log(Level.DEBUG, () -> "processData(" + Utils.remaining(writeList) + ")"); - while (Utils.remaining(writeList) > 0 || hsTriggered()) { + while (Utils.remaining(writeList) > 0 || hsTriggered() + || needWrap()) { ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY); EngineResult result = wrapBuffers(outbufs); debugw.log(Level.DEBUG, "wrapBuffer returned %s", result.result); @@ -538,6 +553,7 @@ if (writeList.isEmpty() && !result.needUnwrap()) { writer.addData(HS_TRIGGER); } + if (needWrap()) continue; return; } } @@ -551,11 +567,18 @@ outgoing(Utils.EMPTY_BB_LIST, true); return; } + if (writeList.isEmpty() && needWrap()) { + writer.addData(HS_TRIGGER); + } } catch (Throwable ex) { handleError(ex); } } + private boolean needWrap() { + return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP; + } + private void sendResultBytes(EngineResult result) { if (result.bytesProduced() > 0) { debugw.log(Level.DEBUG, "Sending %d bytes downstream", @@ -678,13 +701,19 @@ private void executeTasks(List tasks) { exec.execute(() -> { handshakeState.getAndUpdate((current) -> current | DOING_TASKS); - try { - tasks.forEach((r) -> { - r.run(); - }); - } catch (Throwable t) { - handleError(t); - } + List nextTasks = tasks; + do { + try { + nextTasks.forEach((r) -> { + r.run(); + }); + if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { + nextTasks = obtainTasks(); + } else break; + } catch (Throwable t) { + handleError(t); + } + } while(true); handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS); writer.addData(HS_TRIGGER); resumeActivity(); diff -r 952aca3f605a -r b7e186aa1915 test/jdk/com/sun/net/httpserver/EchoHandler.java --- a/test/jdk/com/sun/net/httpserver/EchoHandler.java Thu Nov 30 22:27:18 2017 +0300 +++ b/test/jdk/com/sun/net/httpserver/EchoHandler.java Sun Dec 03 22:34:23 2017 +0300 @@ -66,8 +66,8 @@ t.sendResponseHeaders(200, in.length); OutputStream os = t.getResponseBody(); os.write(in); - close(os); - close(is); + close(t, os); + close(t, is); } else { OutputStream os = t.getResponseBody(); byte[] buf = new byte[64 * 1024]; @@ -84,15 +84,21 @@ String s = Integer.toString(count); os.write(s.getBytes()); } - close(os); - close(is); + close(t, os); + close(t, is); } } protected void close(OutputStream os) throws IOException { - os.close(); + os.close(); } protected void close(InputStream is) throws IOException { - is.close(); + is.close(); + } + protected void close(HttpExchange t, OutputStream os) throws IOException { + close(os); + } + protected void close(HttpExchange t, InputStream is) throws IOException { + close(is); } } diff -r 952aca3f605a -r b7e186aa1915 test/jdk/java/net/httpclient/AbstractNoBody.java --- a/test/jdk/java/net/httpclient/AbstractNoBody.java Thu Nov 30 22:27:18 2017 +0300 +++ b/test/jdk/java/net/httpclient/AbstractNoBody.java Sun Dec 03 22:34:23 2017 +0300 @@ -25,6 +25,7 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; @@ -57,7 +58,8 @@ static final String SIMPLE_STRING = "Hello world. Goodbye world"; static final int ITERATION_COUNT = 10; // a shared executor helps reduce the amount of threads created by the test - static final Executor executor = Executors.newCachedThreadPool(); + static final Executor executor = Executors.newFixedThreadPool(ITERATION_COUNT * 2); + static final ExecutorService serverExecutor = Executors.newFixedThreadPool(ITERATION_COUNT * 4); @DataProvider(name = "variants") public Object[][] variants() { @@ -91,6 +93,7 @@ @BeforeTest public void setup() throws Exception { + printStamp(START, "setup"); sslContext = new SimpleSSLContext().get(); if (sslContext == null) throw new AssertionError("Unexpected null sslContext"); @@ -100,12 +103,14 @@ HttpHandler h1_chunkNoBodyHandler = new HTTP1_ChunkedNoBodyHandler(); InetSocketAddress sa = new InetSocketAddress("localhost", 0); httpTestServer = HttpServer.create(sa, 0); + httpTestServer.setExecutor(serverExecutor); httpTestServer.createContext("/http1/noBodyFixed", h1_fixedLengthNoBodyHandler); httpTestServer.createContext("/http1/noBodyChunk", h1_chunkNoBodyHandler); httpURI_fixed = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/noBodyFixed"; httpURI_chunk = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/noBodyChunk"; httpsTestServer = HttpsServer.create(sa, 0); + httpsTestServer.setExecutor(serverExecutor); httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext)); httpsTestServer.createContext("/https1/noBodyFixed", h1_fixedLengthNoBodyHandler); httpsTestServer.createContext("/https1/noBodyChunk", h1_chunkNoBodyHandler); @@ -116,14 +121,14 @@ Http2Handler h2_fixedLengthNoBodyHandler = new HTTP2_FixedLengthNoBodyHandler(); Http2Handler h2_chunkedNoBodyHandler = new HTTP2_ChunkedNoBodyHandler(); - http2TestServer = new Http2TestServer("127.0.0.1", false, 0); + http2TestServer = new Http2TestServer("127.0.0.1", false, 0, serverExecutor, null); http2TestServer.addHandler(h2_fixedLengthNoBodyHandler, "/http2/noBodyFixed"); http2TestServer.addHandler(h2_chunkedNoBodyHandler, "/http2/noBodyChunk"); int port = http2TestServer.getAddress().getPort(); http2URI_fixed = "http://127.0.0.1:" + port + "/http2/noBodyFixed"; http2URI_chunk = "http://127.0.0.1:" + port + "/http2/noBodyChunk"; - https2TestServer = new Http2TestServer("127.0.0.1", true, 0); + https2TestServer = new Http2TestServer("127.0.0.1", true, 0, serverExecutor, sslContext); https2TestServer.addHandler(h2_fixedLengthNoBodyHandler, "/https2/noBodyFixed"); https2TestServer.addHandler(h2_chunkedNoBodyHandler, "/https2/noBodyChunk"); port = https2TestServer.getAddress().getPort(); @@ -134,16 +139,34 @@ httpsTestServer.start(); http2TestServer.start(); https2TestServer.start(); + printStamp(END,"setup"); } @AfterTest public void teardown() throws Exception { + printStamp(START, "teardown"); httpTestServer.stop(0); httpsTestServer.stop(0); http2TestServer.stop(); https2TestServer.stop(); + printStamp(END, "teardown"); } + static final long start = System.nanoTime(); + static final String START = "start"; + static final String END = "end "; + static long elapsed() { return (System.nanoTime() - start)/1000_000;} + void printStamp(String what, String fmt, Object... args) { + long elapsed = elapsed(); + long sec = elapsed/1000; + long ms = elapsed % 1000; + String time = sec > 0 ? sec + "sec " : ""; + time = time + ms + "ms"; + System.out.printf("%s: %s \t [%s]\t %s%n", + getClass().getSimpleName(), what, time, String.format(fmt,args)); + } + + static class HTTP1_FixedLengthNoBodyHandler implements HttpHandler { @Override public void handle(HttpExchange t) throws IOException { diff -r 952aca3f605a -r b7e186aa1915 test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java --- a/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java Thu Nov 30 22:27:18 2017 +0300 +++ b/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java Sun Dec 03 22:34:23 2017 +0300 @@ -178,6 +178,7 @@ try { return is.readAllBytes(); } catch (IOException io) { + io.printStackTrace(); throw new CompletionException(io); } } diff -r 952aca3f605a -r b7e186aa1915 test/jdk/java/net/httpclient/ManyRequests.java --- a/test/jdk/java/net/httpclient/ManyRequests.java Thu Nov 30 22:27:18 2017 +0300 +++ b/test/jdk/java/net/httpclient/ManyRequests.java Sun Dec 03 22:34:23 2017 +0300 @@ -107,19 +107,23 @@ System.out.println("Server: received " + e.getRequestURI()); super.handle(e); } - protected void close(OutputStream os) throws IOException { + @Override + protected void close(HttpExchange t, OutputStream os) throws IOException { if (INSERT_DELAY) { try { Thread.sleep(rand.nextInt(200)); } catch (InterruptedException e) {} } - os.close(); + System.out.println("Server: close outbound: " + t.getRequestURI()); + super.close(t, os); } - protected void close(InputStream is) throws IOException { + @Override + protected void close(HttpExchange t, InputStream is) throws IOException { if (INSERT_DELAY) { try { Thread.sleep(rand.nextInt(200)); } catch (InterruptedException e) {} } - is.close(); + System.out.println("Server: close inbound: " + t.getRequestURI()); + super.close(t, is); } } diff -r 952aca3f605a -r b7e186aa1915 test/jdk/java/net/httpclient/ManyRequestsLegacy.java --- a/test/jdk/java/net/httpclient/ManyRequestsLegacy.java Thu Nov 30 22:27:18 2017 +0300 +++ b/test/jdk/java/net/httpclient/ManyRequestsLegacy.java Sun Dec 03 22:34:23 2017 +0300 @@ -204,19 +204,21 @@ super.handle(e); } @Override - protected void close(OutputStream os) throws IOException { + protected void close(HttpExchange t, OutputStream os) throws IOException { if (INSERT_DELAY) { try { Thread.sleep(rand.nextInt(200)); } catch (InterruptedException e) {} } + System.out.println("Server: close outbound: " + t.getRequestURI()); os.close(); } @Override - protected void close(InputStream is) throws IOException { + protected void close(HttpExchange t, InputStream is) throws IOException { if (INSERT_DELAY) { try { Thread.sleep(rand.nextInt(200)); } catch (InterruptedException e) {} } + System.out.println("Server: close inbound: " + t.getRequestURI()); is.close(); } } diff -r 952aca3f605a -r b7e186aa1915 test/jdk/java/net/httpclient/MockServer.java --- a/test/jdk/java/net/httpclient/MockServer.java Thu Nov 30 22:27:18 2017 +0300 +++ b/test/jdk/java/net/httpclient/MockServer.java Sun Dec 03 22:34:23 2017 +0300 @@ -45,7 +45,7 @@ */ public class MockServer extends Thread implements Closeable { - ServerSocket ss; + final ServerSocket ss; private final List sockets; private final List removals; private final List additions; @@ -296,20 +296,32 @@ @Override public void run() { - while (!closed) { - try { - System.out.println("Server waiting for connection"); - Socket s = ss.accept(); - Connection c = new Connection(s); - c.start(); - System.out.println("Server got new connection: " + c); - synchronized (additions) { - additions.add(c); + try { + while (!closed) { + try { + System.out.println("Server waiting for connection"); + Socket s = ss.accept(); + Connection c = new Connection(s); + c.start(); + System.out.println("Server got new connection: " + c); + synchronized (additions) { + additions.add(c); + } + } catch (IOException e) { + if (closed) + return; + e.printStackTrace(System.out); } - } catch (IOException e) { - if (closed) - return; - e.printStackTrace(); + } + } catch (Throwable t) { + System.out.println("Unexpected exception in accept loop: " + t); + t.printStackTrace(System.out); + } finally { + if (closed) { + System.out.println("Server closed: exiting accept loop"); + } else { + System.out.println("Server not closed: exiting accept loop and closing"); + close(); } } } diff -r 952aca3f605a -r b7e186aa1915 test/jdk/java/net/httpclient/NoBodyPartOne.java --- a/test/jdk/java/net/httpclient/NoBodyPartOne.java Thu Nov 30 22:27:18 2017 +0300 +++ b/test/jdk/java/net/httpclient/NoBodyPartOne.java Sun Dec 03 22:34:23 2017 +0300 @@ -55,6 +55,7 @@ @Test(dataProvider = "variants") public void testAsString(String uri, boolean sameClient) throws Exception { + printStamp(START, "testAsString(\"%s\", %s)", uri, sameClient); HttpClient client = null; for (int i=0; i< ITERATION_COUNT; i++) { if (!sameClient || client == null) @@ -68,10 +69,13 @@ String body = response.body(); assertEquals(body, ""); } + // We have created many clients here. Try to speed up their release. + if (!sameClient) System.gc(); } @Test(dataProvider = "variants") public void testAsFile(String uri, boolean sameClient) throws Exception { + printStamp(START, "testAsFile(\"%s\", %s)", uri, sameClient); HttpClient client = null; for (int i=0; i< ITERATION_COUNT; i++) { if (!sameClient || client == null) @@ -86,10 +90,13 @@ assertTrue(Files.exists(bodyPath)); assertEquals(Files.size(bodyPath), 0); } + // We have created many clients here. Try to speed up their release. + if (!sameClient) System.gc(); } @Test(dataProvider = "variants") public void testAsByteArray(String uri, boolean sameClient) throws Exception { + printStamp(START, "testAsByteArray(\"%s\", %s)", uri, sameClient); HttpClient client = null; for (int i=0; i< ITERATION_COUNT; i++) { if (!sameClient || client == null) @@ -102,5 +109,7 @@ byte[] body = response.body(); assertEquals(body.length, 0); } + // We have created many clients here. Try to speed up their release. + if (!sameClient) System.gc(); } } diff -r 952aca3f605a -r b7e186aa1915 test/jdk/java/net/httpclient/NoBodyPartTwo.java --- a/test/jdk/java/net/httpclient/NoBodyPartTwo.java Thu Nov 30 22:27:18 2017 +0300 +++ b/test/jdk/java/net/httpclient/NoBodyPartTwo.java Sun Dec 03 22:34:23 2017 +0300 @@ -57,6 +57,7 @@ volatile boolean consumerHasBeenCalled; @Test(dataProvider = "variants") public void testAsByteArrayConsumer(String uri, boolean sameClient) throws Exception { + printStamp(START, "testAsByteArrayConsumer(\"%s\", %s)", uri, sameClient); HttpClient client = null; for (int i=0; i< ITERATION_COUNT; i++) { if (!sameClient || client == null) @@ -73,10 +74,13 @@ client.send(req, asByteArrayConsumer(consumer)); assertTrue(consumerHasBeenCalled); } + // We have created many clients here. Try to speed up their release. + if (!sameClient) System.gc(); } @Test(dataProvider = "variants") public void testAsInputStream(String uri, boolean sameClient) throws Exception { + printStamp(START, "testAsInputStream(\"%s\", %s)", uri, sameClient); HttpClient client = null; for (int i=0; i< ITERATION_COUNT; i++) { if (!sameClient || client == null) @@ -89,10 +93,13 @@ byte[] body = response.body().readAllBytes(); assertEquals(body.length, 0); } + // We have created many clients here. Try to speed up their release. + if (!sameClient) System.gc(); } @Test(dataProvider = "variants") public void testBuffering(String uri, boolean sameClient) throws Exception { + printStamp(START, "testBuffering(\"%s\", %s)", uri, sameClient); HttpClient client = null; for (int i=0; i< ITERATION_COUNT; i++) { if (!sameClient || client == null) @@ -105,10 +112,13 @@ byte[] body = response.body(); assertEquals(body.length, 0); } + // We have created many clients here. Try to speed up their release. + if (!sameClient) System.gc(); } @Test(dataProvider = "variants") public void testDiscard(String uri, boolean sameClient) throws Exception { + printStamp(START, "testDiscard(\"%s\", %s)", uri, sameClient); HttpClient client = null; for (int i=0; i< ITERATION_COUNT; i++) { if (!sameClient || client == null) @@ -121,5 +131,7 @@ HttpResponse response = client.send(req, discard(obj)); assertEquals(response.body(), obj); } + // We have created many clients here. Try to speed up their release. + if (!sameClient) System.gc(); } } diff -r 952aca3f605a -r b7e186aa1915 test/jdk/java/net/httpclient/http2/BasicTest.java --- a/test/jdk/java/net/httpclient/http2/BasicTest.java Thu Nov 30 22:27:18 2017 +0300 +++ b/test/jdk/java/net/httpclient/http2/BasicTest.java Sun Dec 03 22:34:23 2017 +0300 @@ -118,6 +118,8 @@ public static void test() throws Exception { try { initialize(); + warmup(false); + warmup(true); simpleTest(false, false); simpleTest(false, true); simpleTest(true, false); @@ -218,8 +220,7 @@ } static void paramsTest() throws Exception { - Http2TestServer server = new Http2TestServer(true, 0, serverExec, sslContext); - server.addHandler((t -> { + httpsServer.addHandler((t -> { SSLSession s = t.getSSLSession(); String prot = s.getProtocol(); if (prot.equals("TLSv1.2")) { @@ -229,9 +230,7 @@ t.sendResponseHeaders(500, -1); } }), "/"); - server.start(); - int port = server.getAddress().getPort(); - URI u = new URI("https://127.0.0.1:"+port+"/foo"); + URI u = new URI("https://127.0.0.1:"+httpsPort+"/foo"); HttpClient client = getClient(); HttpRequest req = HttpRequest.newBuilder(u).build(); HttpResponse resp = client.send(req, asString()); @@ -243,8 +242,8 @@ System.err.println("paramsTest: DONE"); } - static void simpleTest(boolean secure, boolean ping) throws Exception { - URI uri = getURI(secure, ping); + static void warmup(boolean secure) throws Exception { + URI uri = getURI(secure); System.err.println("Request to " + uri); // Do a simple warmup request @@ -254,15 +253,17 @@ .POST(fromString(SIMPLE_STRING)) .build(); HttpResponse response = client.send(req, asString()); + checkStatus(200, response.statusCode()); + String responseBody = response.body(); HttpHeaders h = response.headers(); - - checkStatus(200, response.statusCode()); - - String responseBody = response.body(); checkStrings(SIMPLE_STRING, responseBody); - checkStrings(h.firstValue("x-hello").get(), "world"); checkStrings(h.firstValue("x-bye").get(), "universe"); + } + + static void simpleTest(boolean secure, boolean ping) throws Exception { + URI uri = getURI(secure, ping); + System.err.println("Request to " + uri); // Do loops asynchronously diff -r 952aca3f605a -r b7e186aa1915 test/jdk/java/net/httpclient/http2/server/Http2EchoHandler.java --- a/test/jdk/java/net/httpclient/http2/server/Http2EchoHandler.java Thu Nov 30 22:27:18 2017 +0300 +++ b/test/jdk/java/net/httpclient/http2/server/Http2EchoHandler.java Sun Dec 03 22:34:23 2017 +0300 @@ -31,7 +31,7 @@ public void handle(Http2TestExchange t) throws IOException { try { - System.err.println("EchoHandler received request to " + t.getRequestURI()); + System.err.printf("EchoHandler received request to %s from %s\n", t.getRequestURI(), t.getRemoteAddress()); InputStream is = t.getRequestBody(); HttpHeadersImpl map = t.getRequestHeaders(); HttpHeadersImpl map1 = t.getResponseHeaders(); diff -r 952aca3f605a -r b7e186aa1915 test/jdk/java/net/httpclient/http2/server/Http2TestServer.java --- a/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Thu Nov 30 22:27:18 2017 +0300 +++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Sun Dec 03 22:34:23 2017 +0300 @@ -35,6 +35,7 @@ import javax.net.ssl.SSLServerSocket; import javax.net.ssl.SSLServerSocketFactory; import javax.net.ssl.SNIServerName; +import jdk.incubator.http.internal.frame.ErrorFrame; /** * Waits for incoming TCP connections from a client and establishes @@ -172,8 +173,9 @@ public void stop() { // TODO: clean shutdown GoAway stopping = true; + System.err.printf("Server stopping %d connections\n", connections.size()); for (Http2TestServerConnection connection : connections.values()) { - connection.close(); + connection.close(ErrorFrame.NO_ERROR); } try { server.close(); @@ -223,7 +225,7 @@ // and if so then the client might wait // forever. connections.remove(addr, c); - c.close(); + c.close(ErrorFrame.PROTOCOL_ERROR); throw e; } } diff -r 952aca3f605a -r b7e186aa1915 test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java --- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Thu Nov 30 22:27:18 2017 +0300 +++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Sun Dec 03 22:34:23 2017 +0300 @@ -85,6 +85,8 @@ Sentinel() { super(-1,-1);} } + static final Sentinel sentinel = new Sentinel(); + class PingRequest { final byte[] pingData; final long pingStamp; @@ -114,8 +116,6 @@ } } - static Sentinel sentinel; - Http2TestServerConnection(Http2TestServer server, Socket socket, Http2TestExchangeSupplier exchangeSupplier) @@ -159,6 +159,13 @@ return ping.response(); } + void goAway(int error) throws IOException { + int laststream = nextstream >= 3 ? nextstream - 2 : 1; + + GoAwayFrame go = new GoAwayFrame(laststream, error); + outputQ.put(go); + } + /** * Returns the first PingRequest from Queue */ @@ -173,7 +180,7 @@ void handlePing(PingFrame ping) throws IOException { if (ping.streamid() != 0) { System.err.println("Invalid ping received"); - close(); + close(ErrorFrame.PROTOCOL_ERROR); return; } if (ping.getFlag(PingFrame.ACK)) { @@ -181,7 +188,7 @@ PingRequest request = getNextRequest(); if (request == null) { System.err.println("Invalid ping ACK received"); - close(); + close(ErrorFrame.PROTOCOL_ERROR); return; } else if (!Arrays.equals(request.pingData, ping.getData())) { request.fail(new RuntimeException("Wrong ping data in ACK")); @@ -231,15 +238,25 @@ sock.getSession(); // blocks until handshake done } - void close() { + void closeIncoming() { + close(-1); + } + + void close(int error) { + if (stopping) + return; stopping = true; + System.err.printf("Server connection to %s stopping. %d streams\n", + socket.getRemoteSocketAddress().toString(), streams.size()); streams.forEach((i, q) -> { - q.close(); + q.orderlyClose(); }); try { + if (error != -1) + goAway(error); + outputQ.orderlyClose(); socket.close(); - // TODO: put a reset on each stream - } catch (IOException e) { + } catch (Exception e) { } } @@ -321,8 +338,21 @@ nextstream = 3; } - exec.submit(this::readLoop); - exec.submit(this::writeLoop); + (new ConnectionThread("readLoop", this::readLoop)).start(); + (new ConnectionThread("writeLoop", this::writeLoop)).start(); + } + + class ConnectionThread extends Thread { + final Runnable r; + ConnectionThread(String name, Runnable r) { + setName(name); + setDaemon(true); + this.r = r; + } + + public void run() { + r.run(); + } } private void writeFrame(Http2Frame frame) throws IOException { @@ -369,7 +399,7 @@ return; } else if (f instanceof GoAwayFrame) { System.err.println("Closing: "+ f.toString()); - close(); + close(ErrorFrame.NO_ERROR); } else if (f instanceof PingFrame) { handlePing((PingFrame)f); } else @@ -569,7 +599,11 @@ void readLoop() { try { while (!stopping) { - Http2Frame frame = readFrame(); + Http2Frame frame = readFrameImpl(); + if (frame == null) { + closeIncoming(); + return; + } //System.err.printf("TestServer: received frame %s\n", frame); int stream = frame.streamid(); if (stream == 0) { @@ -625,7 +659,7 @@ System.err.println("Http server reader thread shutdown"); e.printStackTrace(); } - close(); + close(ErrorFrame.PROTOCOL_ERROR); } } @@ -667,6 +701,8 @@ Http2Frame frame; try { frame = outputQ.take(); + if (stopping) + break; } catch(IOException x) { if (stopping && x.getCause() instanceof InterruptedException) { break; @@ -742,27 +778,46 @@ } private Http2Frame readFrame() throws IOException { - byte[] buf = new byte[9]; - if (is.readNBytes(buf, 0, 9) != 9) - throw new IOException("readFrame: connection closed"); - int len = 0; - for (int i = 0; i < 3; i++) { - int n = buf[i] & 0xff; - //System.err.println("n = " + n); - len = (len << 8) + n; + Http2Frame f = readFrameImpl(); + if (f == null) + throw new IOException("connection closed"); + return f; + } + + // does not throw an exception for EOF + private Http2Frame readFrameImpl() throws IOException { + try { + byte[] buf = new byte[9]; + int ret; + ret=is.readNBytes(buf, 0, 9); + if (ret == 0) { + return null; + } else if (ret != 9) { + throw new IOException("readFrame: connection closed"); + } + int len = 0; + for (int i = 0; i < 3; i++) { + int n = buf[i] & 0xff; + //System.err.println("n = " + n); + len = (len << 8) + n; + } + byte[] rest = new byte[len]; + int n = is.readNBytes(rest, 0, len); + if (n != len) + throw new IOException("Error reading frame"); + List frames = new ArrayList<>(); + FramesDecoder reader = new FramesDecoder(frames::add); + reader.decode(ByteBuffer.wrap(buf)); + reader.decode(ByteBuffer.wrap(rest)); + if (frames.size()!=1) + throw new IOException("Expected 1 frame got "+frames.size()) ; + + return frames.get(0); + } catch (IOException ee) { + if (stopping) + return null; + throw ee; } - byte[] rest = new byte[len]; - int n = is.readNBytes(rest, 0, len); - if (n != len) - throw new IOException("Error reading frame"); - List frames = new ArrayList<>(); - FramesDecoder reader = new FramesDecoder(frames::add); - reader.decode(ByteBuffer.wrap(buf)); - reader.decode(ByteBuffer.wrap(rest)); - if (frames.size()!=1) - throw new IOException("Expected 1 frame got "+frames.size()) ; - - return frames.get(0); } void sendSettingsFrame() throws IOException { diff -r 952aca3f605a -r b7e186aa1915 test/jdk/java/net/httpclient/http2/server/Queue.java --- a/test/jdk/java/net/httpclient/http2/server/Queue.java Thu Nov 30 22:27:18 2017 +0300 +++ b/test/jdk/java/net/httpclient/http2/server/Queue.java Sun Dec 03 22:34:23 2017 +0300 @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.LinkedList; +import java.util.Objects; import java.util.stream.Stream; // Each stream has one of these for input. Each Http2Connection has one @@ -36,13 +37,11 @@ private boolean closed = false; private boolean closing = false; private Throwable exception = null; - private Runnable callback; - private boolean callbackDisabled = false; private int waiters; // true if someone waiting private final T closeSentinel; Queue(T closeSentinel) { - this.closeSentinel = closeSentinel; + this.closeSentinel = Objects.requireNonNull(closeSentinel); } public synchronized int size() { @@ -50,6 +49,7 @@ } public synchronized void put(T obj) throws IOException { + Objects.requireNonNull(obj); if (closed || closing) { throw new IOException("stream closed"); } @@ -59,16 +59,6 @@ if (waiters > 0) { notifyAll(); } - - if (callbackDisabled) { - return; - } - - if (q.size() > 0 && callback != null) { - // Note: calling callback while holding the lock is - // dangerous and may lead to deadlocks. - callback.run(); - } } // Other close() variants are immediate and abortive @@ -77,6 +67,7 @@ public synchronized void orderlyClose() { if (closing || closed) return; + try { put(closeSentinel); } catch (IOException e) { @@ -87,6 +78,8 @@ @Override public synchronized void close() { + if (closed) + return; closed = true; notifyAll(); } @@ -123,6 +116,7 @@ if (item.equals(closeSentinel)) { closed = true; assert q.isEmpty(); + return null; } return item; } catch (InterruptedException ex) { diff -r 952aca3f605a -r b7e186aa1915 test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java --- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java Thu Nov 30 22:27:18 2017 +0300 +++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java Sun Dec 03 22:34:23 2017 +0300 @@ -55,6 +55,7 @@ import java.security.cert.CertificateException; import java.util.List; import java.util.Queue; +import java.util.Random; import java.util.StringTokenizer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -69,6 +70,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; @Test public class SSLTubeTest { @@ -76,6 +79,17 @@ private static final long COUNTER = 600; private static final int LONGS_PER_BUF = 800; private static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF; + public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0); + + static final Random rand = new Random(); + + static int randomRange(int lower, int upper) { + if (lower > upper) + throw new IllegalArgumentException("lower > upper"); + int diff = upper - lower; + int r = lower + rand.nextInt(diff); + return r - (r % 8); // round down to multiple of 8 (align for longs) + } private static ByteBuffer getBuffer(long startingAt) { ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8); @@ -86,18 +100,31 @@ return buf; } - @Test(timeOut = 30000) - public void run() throws IOException { - /* Start of wiring */ + @Test + public void runWithSSLLoopackServer() throws IOException { ExecutorService sslExecutor = Executors.newCachedThreadPool(); + + /* Start of wiring */ /* Emulates an echo server */ -// FlowTube server = new SSLTube(createSSLEngine(false), -// sslExecutor, -// new EchoTube(16)); SSLLoopbackSubscriber server = new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor); server.start(); + run(server, sslExecutor); + } + + @Test + public void runWithEchoServer() throws IOException { + ExecutorService sslExecutor = Executors.newCachedThreadPool(); + + /* Start of wiring */ + /* Emulates an echo server */ + FlowTube server = crossOverEchoServer(sslExecutor); + + run(server, sslExecutor); + } + + private void run(FlowTube server, ExecutorService sslExecutor) throws IOException { FlowTube client = new SSLTube(createSSLEngine(true), sslExecutor, server); @@ -128,6 +155,9 @@ } } + /** + * This is a copy of the SSLLoopbackSubscriber used in FlowTest + */ static class SSLLoopbackSubscriber implements FlowTube { private final BlockingQueue buffer; private final Socket clientSock; @@ -173,7 +203,7 @@ private void clientReader() { try { InputStream is = clientSock.getInputStream(); - final int bufsize = FlowTest.randomRange(512, 16 * 1024); + final int bufsize = randomRange(512, 16 * 1024); System.out.println("clientReader: bufsize = " + bufsize); while (true) { byte[] buf = new byte[bufsize]; @@ -206,7 +236,7 @@ while (true) { ByteBuffer buf = buffer.take(); - if (buf == FlowTest.SENTINEL) { + if (buf == SENTINEL) { // finished //Utils.sleep(2000); System.out.println("clientWriter close: " + nbytes + " written"); @@ -249,7 +279,7 @@ try { InputStream is = serverSock.getInputStream(); OutputStream os = serverSock.getOutputStream(); - final int bufsize = FlowTest.randomRange(512, 16 * 1024); + final int bufsize = randomRange(512, 16 * 1024); System.out.println("serverLoopback: bufsize = " + bufsize); byte[] bb = new byte[bufsize]; while (true) { @@ -305,7 +335,7 @@ @Override public void onComplete() { try { - buffer.put(FlowTest.SENTINEL); + buffer.put(SENTINEL); } catch (InterruptedException e) { e.printStackTrace(); Utils.close(clientSock); @@ -330,150 +360,366 @@ } } -// private static final class EchoTube implements FlowTube { -// -// private final static Object EOF = new Object(); -// private final Executor executor = Executors.newSingleThreadExecutor(); -// -// private final Queue queue = new ConcurrentLinkedQueue<>(); -// private final int maxQueueSize; -// private final SequentialScheduler processingScheduler = -// new SequentialScheduler(createProcessingTask()); -// -// /* Writing into this tube */ -// private long unfulfilled; -// private Flow.Subscription subscription; -// -// /* Reading from this tube */ -// private final Demand demand = new Demand(); -// private final AtomicBoolean cancelled = new AtomicBoolean(); -// private Flow.Subscriber> subscriber; -// -// private EchoTube(int maxBufferSize) { -// if (maxBufferSize < 1) -// throw new IllegalArgumentException(); -// this.maxQueueSize = maxBufferSize; -// } -// -// @Override -// public void subscribe(Flow.Subscriber> subscriber) { -// this.subscriber = subscriber; -// System.out.println("EchoTube got subscriber: " + subscriber); -// this.subscriber.onSubscribe(new InternalSubscription()); -// } -// -// @Override -// public void onSubscribe(Flow.Subscription subscription) { -// unfulfilled = maxQueueSize; -// System.out.println("EchoTube request: " + maxQueueSize); -// (this.subscription = subscription).request(maxQueueSize); -// } -// -// @Override -// public void onNext(List item) { -// if (--unfulfilled == (maxQueueSize / 2)) { -// long req = maxQueueSize - unfulfilled; -// subscription.request(req); -// System.out.println("EchoTube request: " + req); -// unfulfilled = maxQueueSize; -// } -// System.out.println("EchoTube add " + Utils.remaining(item)); -// queue.add(item); -// processingScheduler.deferOrSchedule(executor); -// } -// -// @Override -// public void onError(Throwable throwable) { -// System.out.println("EchoTube add " + throwable); -// queue.add(throwable); -// processingScheduler.deferOrSchedule(executor); -// } -// -// @Override -// public void onComplete() { -// System.out.println("EchoTube add EOF"); -// queue.add(EOF); -// processingScheduler.deferOrSchedule(executor); -// } -// -// @Override -// public boolean isFinished() { -// return false; -// } -// -// private class InternalSubscription implements Flow.Subscription { -// -// @Override -// public void request(long n) { -// System.out.println("EchoTube got request: " + n); -// if (n <= 0) { -// throw new InternalError(); -// } -// demand.increase(n); -// processingScheduler.runOrSchedule(); -// } -// -// @Override -// public void cancel() { -// cancelled.set(true); -// } -// } -// -// @Override -// public String toString() { -// return "EchoTube"; -// } -// -// private SequentialScheduler.RestartableTask createProcessingTask() { -// return new SequentialScheduler.CompleteRestartableTask() { -// -// @Override -// protected void run() { -// try { -// while (!cancelled.get()) { -// Object item = queue.peek(); -// if (item == null) -// return; -// try { -// System.out.println("EchoTube processing item"); -// if (item instanceof List) { -// if (!demand.tryDecrement()) { -// System.out.println("EchoTube no demand"); -// return; -// } -// @SuppressWarnings("unchecked") -// List bytes = (List) item; -// Object removed = queue.remove(); -// assert removed == item; -// System.out.println("EchoTube processing " -// + Utils.remaining(bytes)); -// subscriber.onNext(bytes); -// } else if (item instanceof Throwable) { -// cancelled.set(true); -// Object removed = queue.remove(); -// assert removed == item; -// System.out.println("EchoTube processing " + item); -// subscriber.onError((Throwable) item); -// } else if (item == EOF) { -// cancelled.set(true); -// Object removed = queue.remove(); -// assert removed == item; -// System.out.println("EchoTube processing EOF"); -// subscriber.onComplete(); -// } else { -// throw new InternalError(String.valueOf(item)); -// } -// } finally { -// } -// } -// } catch(Throwable t) { -// t.printStackTrace(); -// throw t; -// } -// } -// }; -// } -// } + + /** + * Creates a cross-over FlowTube than can be plugged into a client-side + * SSLTube (in place of the SSLLoopbackSubscriber). + * Note that the only method that can be called on the return tube + * is connectFlows(). Calling any other method will trigger an + * InternalError. + * @param sslExecutor an executor + * @return a cross-over FlowTube connected to an EchoTube. + * @throws IOException + */ + FlowTube crossOverEchoServer(Executor sslExecutor) throws IOException { + LateBindingTube crossOver = new LateBindingTube(); + FlowTube server = new SSLTube(createSSLEngine(false), + sslExecutor, + crossOver); + EchoTube echo = new EchoTube(6); + server.connectFlows(FlowTube.asTubePublisher(echo), FlowTube.asTubeSubscriber(echo)); + + return new CrossOverTube(crossOver); + } + + /** + * A cross-over FlowTube that makes it possible to reverse the direction + * of flows. The typical usage is to connect an two opposite SSLTube, + * one encrypting, one decrypting, to e.g. an EchoTube, with the help + * of a LateBindingTube: + * {@code + * client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube + * } + *

+ * Note that the only method that can be called on the CrossOverTube is + * connectFlows(). Calling any other method will cause an InternalError to + * be thrown. + * Also connectFlows() can be called only once. + */ + private static final class CrossOverTube implements FlowTube { + final LateBindingTube tube; + CrossOverTube(LateBindingTube tube) { + this.tube = tube; + } + + @Override + public void subscribe(Flow.Subscriber> subscriber) { + throw newInternalError(); + } + + @Override + public void connectFlows(TubePublisher writePublisher, TubeSubscriber readSubscriber) { + tube.start(writePublisher, readSubscriber); + } + + @Override + public boolean isFinished() { + return tube.isFinished(); + } + + Error newInternalError() { + InternalError error = new InternalError(); + error.printStackTrace(System.out); + return error; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + throw newInternalError(); + } + + @Override + public void onError(Throwable throwable) { + throw newInternalError(); + } + + @Override + public void onComplete() { + throw newInternalError(); + } + + @Override + public void onNext(List item) { + throw newInternalError(); + } + } + + /** + * A late binding tube that makes it possible to create an + * SSLTube before the right-hand-side tube has been created. + * The typical usage is to make it possible to connect two + * opposite SSLTube (one encrypting, one decrypting) through a + * CrossOverTube: + * {@code + * client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube + * } + *

+ * Note that this class only supports a single call to start(): it cannot be + * subscribed more than once from its left-hand-side (the cross over tube side). + */ + private static class LateBindingTube implements FlowTube { + + final CompletableFuture>> futurePublisher + = new CompletableFuture<>(); + final ConcurrentLinkedQueue>>> queue + = new ConcurrentLinkedQueue<>(); + AtomicReference>> subscriberRef = new AtomicReference<>(); + SequentialScheduler scheduler = SequentialScheduler.synchronizedScheduler(this::loop); + AtomicReference errorRef = new AtomicReference<>(); + private volatile boolean finished; + private volatile boolean completed; + + + public void start(Flow.Publisher> publisher, + Flow.Subscriber> subscriber) { + subscriberRef.set(subscriber); + futurePublisher.complete(publisher); + scheduler.runOrSchedule(); + } + + @Override + public boolean isFinished() { + return finished; + } + + @Override + public void subscribe(Flow.Subscriber> subscriber) { + futurePublisher.thenAccept((p) -> p.subscribe(subscriber)); + scheduler.runOrSchedule(); + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + queue.add((s) -> s.onSubscribe(subscription)); + scheduler.runOrSchedule(); + } + + @Override + public void onNext(List item) { + queue.add((s) -> s.onNext(item)); + scheduler.runOrSchedule(); + } + + @Override + public void onError(Throwable throwable) { + System.out.println("LateBindingTube onError"); + throwable.printStackTrace(System.out); + queue.add((s) -> { + errorRef.compareAndSet(null, throwable); + try { + System.out.println("LateBindingTube subscriber onError: " + throwable); + s.onError(errorRef.get()); + } finally { + finished = true; + System.out.println("LateBindingTube finished"); + } + }); + scheduler.runOrSchedule(); + } + + @Override + public void onComplete() { + System.out.println("LateBindingTube completing"); + queue.add((s) -> { + completed = true; + try { + System.out.println("LateBindingTube complete subscriber"); + s.onComplete(); + } finally { + finished = true; + System.out.println("LateBindingTube finished"); + } + }); + scheduler.runOrSchedule(); + } + + private void loop() { + if (finished) { + scheduler.stop(); + return; + } + Flow.Subscriber> subscriber = subscriberRef.get(); + if (subscriber == null) return; + try { + Consumer>> s; + while ((s = queue.poll()) != null) { + s.accept(subscriber); + } + } catch (Throwable t) { + if (errorRef.compareAndSet(null, t)) { + onError(t); + } + } + } + } + + /** + * An echo tube that just echoes back whatever bytes it receives. + * This cannot be plugged to the right-hand-side of an SSLTube + * since handshake data cannot be simply echoed back, and + * application data most likely also need to be decrypted and + * re-encrypted. + */ + private static final class EchoTube implements FlowTube { + + private final static Object EOF = new Object(); + private final Executor executor = Executors.newSingleThreadExecutor(); + + private final Queue queue = new ConcurrentLinkedQueue<>(); + private final int maxQueueSize; + private final SequentialScheduler processingScheduler = + new SequentialScheduler(createProcessingTask()); + + /* Writing into this tube */ + private volatile long requested; + private Flow.Subscription subscription; + + /* Reading from this tube */ + private final Demand demand = new Demand(); + private final AtomicBoolean cancelled = new AtomicBoolean(); + private Flow.Subscriber> subscriber; + + private EchoTube(int maxBufferSize) { + if (maxBufferSize < 1) + throw new IllegalArgumentException(); + this.maxQueueSize = maxBufferSize; + } + + @Override + public void subscribe(Flow.Subscriber> subscriber) { + this.subscriber = subscriber; + System.out.println("EchoTube got subscriber: " + subscriber); + this.subscriber.onSubscribe(new InternalSubscription()); + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + System.out.println("EchoTube request: " + maxQueueSize); + (this.subscription = subscription).request(requested = maxQueueSize); + } + + private void requestMore() { + Flow.Subscription s = subscription; + if (s == null || cancelled.get()) return; + long unfulfilled = queue.size() + --requested; + if (unfulfilled <= maxQueueSize/2) { + long req = maxQueueSize - unfulfilled; + requested += req; + s.request(req); + System.out.printf("EchoTube request: %s [requested:%s, queue:%s, unfulfilled:%s]%n", + req, requested-req, queue.size(), unfulfilled ); + } + } + + @Override + public void onNext(List item) { + System.out.printf("EchoTube add %s [requested:%s, queue:%s]%n", + Utils.remaining(item), requested, queue.size()); + queue.add(item); + processingScheduler.deferOrSchedule(executor); + } + + @Override + public void onError(Throwable throwable) { + System.out.println("EchoTube add " + throwable); + queue.add(throwable); + processingScheduler.deferOrSchedule(executor); + } + + @Override + public void onComplete() { + System.out.println("EchoTube add EOF"); + queue.add(EOF); + processingScheduler.deferOrSchedule(executor); + } + + @Override + public boolean isFinished() { + return cancelled.get(); + } + + private class InternalSubscription implements Flow.Subscription { + + @Override + public void request(long n) { + System.out.println("EchoTube got request: " + n); + if (n <= 0) { + throw new InternalError(); + } + if (demand.increase(n)) { + processingScheduler.deferOrSchedule(executor); + } + } + + @Override + public void cancel() { + cancelled.set(true); + } + } + + @Override + public String toString() { + return "EchoTube"; + } + + int transmitted = 0; + private SequentialScheduler.RestartableTask createProcessingTask() { + return new SequentialScheduler.CompleteRestartableTask() { + + @Override + protected void run() { + try { + while (!cancelled.get()) { + Object item = queue.peek(); + if (item == null) { + System.out.printf("EchoTube: queue empty, requested=%s, demand=%s, transmitted=%s%n", + requested, demand.get(), transmitted); + requestMore(); + return; + } + try { + System.out.printf("EchoTube processing item, requested=%s, demand=%s, transmitted=%s%n", + requested, demand.get(), transmitted); + if (item instanceof List) { + if (!demand.tryDecrement()) { + System.out.println("EchoTube no demand"); + return; + } + @SuppressWarnings("unchecked") + List bytes = (List) item; + Object removed = queue.remove(); + assert removed == item; + System.out.println("EchoTube processing " + + Utils.remaining(bytes)); + transmitted++; + subscriber.onNext(bytes); + requestMore(); + } else if (item instanceof Throwable) { + cancelled.set(true); + Object removed = queue.remove(); + assert removed == item; + System.out.println("EchoTube processing " + item); + subscriber.onError((Throwable) item); + } else if (item == EOF) { + cancelled.set(true); + Object removed = queue.remove(); + assert removed == item; + System.out.println("EchoTube processing EOF"); + subscriber.onComplete(); + } else { + throw new InternalError(String.valueOf(item)); + } + } finally { + } + } + } catch(Throwable t) { + t.printStackTrace(); + throw t; + } + } + }; + } + } /** * The final subscriber which receives the decrypted looped-back data. Just @@ -518,13 +764,13 @@ if (--unfulfilled == (REQUEST_WINDOW / 2)) { long req = REQUEST_WINDOW - unfulfilled; System.out.println("EndSubscriber request " + req); + unfulfilled = REQUEST_WINDOW; subscription.request(req); - unfulfilled = REQUEST_WINDOW; } long currval = counter.get(); if (currval % 500 == 0) { - System.out.println("End: " + currval); + System.out.println("EndSubscriber: " + currval); } System.out.println("EndSubscriber onNext " + Utils.remaining(buffers));