--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java Thu Nov 30 22:27:18 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java Sun Dec 03 22:34:23 2017 +0300
@@ -129,6 +129,8 @@
Set<CompletableFuture<Http2Connection>> waiters = waiting.remove(key);
debug.log(Level.DEBUG, "Opening completed: %s", key);
opening.remove(key);
+ if (t == null && conn != null)
+ putConnection(conn);
final Throwable cause = Utils.getCompletionCause(t);
if (waiters == null) {
debug.log(Level.DEBUG, "no dependent to wake up");
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Thu Nov 30 22:27:18 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Sun Dec 03 22:34:23 2017 +0300
@@ -77,9 +77,9 @@
@Override
public void handle() {
- assert !connected : "Already connected";
- assert !chan.isBlocking() : "Unexpected blocking channel";
try {
+ assert !connected : "Already connected";
+ assert !chan.isBlocking() : "Unexpected blocking channel";
debug.log(Level.DEBUG, "ConnectEvent: finishing connect");
boolean finished = chan.finishConnect();
assert finished : "Expected channel to be connected";
@@ -88,7 +88,7 @@
connected = true;
// complete async since the event runs on the SelectorManager thread
cf.completeAsync(() -> null, client().theExecutor());
- } catch (IOException e) {
+ } catch (Throwable e) {
client().theExecutor().execute( () -> cf.completeExceptionally(e));
}
}
@@ -102,10 +102,10 @@
@Override
public CompletableFuture<Void> connectAsync() {
- assert !connected : "Already connected";
- assert !chan.isBlocking() : "Unexpected blocking channel";
CompletableFuture<Void> cf = new MinimalFuture<>();
try {
+ assert !connected : "Already connected";
+ assert !chan.isBlocking() : "Unexpected blocking channel";
boolean finished = false;
PrivilegedExceptionAction<Boolean> pa = () -> chan.connect(address);
try {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Thu Nov 30 22:27:18 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Sun Dec 03 22:34:23 2017 +0300
@@ -402,6 +402,7 @@
try {
if (contentLength == 0) {
pusher.onComplete();
+ onFinished.run();
onComplete.accept(null);
}
} catch (Throwable t) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Thu Nov 30 22:27:18 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Sun Dec 03 22:34:23 2017 +0300
@@ -320,6 +320,8 @@
DEBUG_LOGGER.log(Level.DEBUG, "Increased demand by 1");
s.request(1);
}
+ assert currentListItr != null;
+ if (lb.isEmpty()) continue;
}
assert currentListItr != null;
assert currentListItr.hasNext();
@@ -359,27 +361,38 @@
@Override
public void onSubscribe(Flow.Subscription s) {
- if (!subscribed.compareAndSet(false, true)) {
- s.cancel();
- } else {
- // check whether the stream is already closed.
- // if so, we should cancel the subscription
- // immediately.
- boolean closed;
- synchronized(this) {
- closed = this.closed;
- if (!closed) {
- this.subscription = s;
+ try {
+ if (!subscribed.compareAndSet(false, true)) {
+ s.cancel();
+ } else {
+ // check whether the stream is already closed.
+ // if so, we should cancel the subscription
+ // immediately.
+ boolean closed;
+ synchronized (this) {
+ closed = this.closed;
+ if (!closed) {
+ this.subscription = s;
+ }
}
- }
- if (closed) {
- s.cancel();
- return;
+ if (closed) {
+ s.cancel();
+ return;
+ }
+ assert buffers.remainingCapacity() > 1; // should contain at least 2
+ DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
+ + Math.max(1, buffers.remainingCapacity() - 1));
+ s.request(Math.max(1, buffers.remainingCapacity() - 1));
}
- assert buffers.remainingCapacity() > 1; // should contain at least 2
- DEBUG_LOGGER.log(Level.DEBUG, () -> "onSubscribe: requesting "
- + Math.max(1, buffers.remainingCapacity() - 1));
- s.request(Math.max(1, buffers.remainingCapacity() - 1));
+ } catch (Throwable t) {
+ failed = t;
+ try {
+ close();
+ } catch (IOException x) {
+ // OK
+ } finally {
+ onError(t);
+ }
}
}
@@ -392,12 +405,14 @@
throw new IllegalStateException("queue is full");
}
DEBUG_LOGGER.log(Level.DEBUG, "item offered");
- } catch (Exception ex) {
+ } catch (Throwable ex) {
failed = ex;
try {
close();
} catch (IOException ex1) {
// OK
+ } finally {
+ onError(ex);
}
}
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Thu Nov 30 22:27:18 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Sun Dec 03 22:34:23 2017 +0300
@@ -238,10 +238,7 @@
public void incoming(List<ByteBuffer> buffers, boolean complete) {
debugr.log(Level.DEBUG, () -> "Adding " + Utils.remaining(buffers)
+ " bytes to read buffer");
- addToReadBuf(buffers);
- if (complete) {
- this.completing = true;
- }
+ addToReadBuf(buffers, complete);
scheduler.runOrSchedule();
}
@@ -269,7 +266,7 @@
}
// readBuf is kept ready for reading outside of this method
- private void addToReadBuf(List<ByteBuffer> buffers) {
+ private void addToReadBuf(List<ByteBuffer> buffers, boolean complete) {
synchronized (readBufferLock) {
for (ByteBuffer buf : buffers) {
readBuf.compact();
@@ -278,6 +275,9 @@
readBuf.put(buf);
readBuf.flip();
}
+ if (complete) {
+ this.completing = complete;
+ }
}
}
@@ -297,19 +297,32 @@
try {
debugr.log(Level.DEBUG, () -> "processData: " + readBuf.remaining()
+ " bytes to unwrap "
- + states(handshakeState));
-
- while (readBuf.hasRemaining()) {
+ + states(handshakeState)
+ + ", " + engine.getHandshakeStatus());
+ int len;
+ boolean completing = false;
+ while ((len = readBuf.remaining()) > 0) {
boolean handshaking = false;
try {
EngineResult result;
synchronized (readBufferLock) {
+ completing = this.completing;
result = unwrapBuffer(readBuf);
debugr.log(Level.DEBUG, "Unwrapped: %s", result.result);
}
+ if (result.bytesProduced() > 0) {
+ debugr.log(Level.DEBUG, "sending %d", result.bytesProduced());
+ count.addAndGet(result.bytesProduced());
+ outgoing(result.destBuffer, false);
+ }
if (result.status() == Status.BUFFER_UNDERFLOW) {
debugr.log(Level.DEBUG, "BUFFER_UNDERFLOW");
- return;
+ // not enough data in the read buffer...
+ synchronized (readBufferLock) {
+ // check if we have received some data
+ if (readBuf.remaining() > len) continue;
+ return;
+ }
}
if (completing && result.status() == Status.CLOSED) {
debugr.log(Level.DEBUG, "Closed: completing");
@@ -328,11 +341,6 @@
resumeActivity();
}
}
- if (result.bytesProduced() > 0) {
- debugr.log(Level.DEBUG, "sending %d", result.bytesProduced());
- count.addAndGet(result.bytesProduced());
- outgoing(result.destBuffer, false);
- }
} catch (IOException ex) {
errorCommon(ex);
handleError(ex);
@@ -340,6 +348,11 @@
if (handshaking && !completing)
return;
}
+ if (!completing) {
+ synchronized (readBufferLock) {
+ completing = this.completing && !readBuf.hasRemaining();
+ }
+ }
if (completing) {
debugr.log(Level.DEBUG, "completing");
// Complete the alpnCF, if not already complete, regardless of
@@ -443,6 +456,7 @@
assert complete ? buffers == Utils.EMPTY_BB_LIST : true;
assert buffers != Utils.EMPTY_BB_LIST ? complete == false : true;
if (complete) {
+ debugw.log(Level.DEBUG, "adding SENTINEL");
writeList.add(SENTINEL);
} else {
writeList.addAll(buffers);
@@ -507,7 +521,8 @@
try {
debugw.log(Level.DEBUG, () -> "processData(" + Utils.remaining(writeList) + ")");
- while (Utils.remaining(writeList) > 0 || hsTriggered()) {
+ while (Utils.remaining(writeList) > 0 || hsTriggered()
+ || needWrap()) {
ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
EngineResult result = wrapBuffers(outbufs);
debugw.log(Level.DEBUG, "wrapBuffer returned %s", result.result);
@@ -538,6 +553,7 @@
if (writeList.isEmpty() && !result.needUnwrap()) {
writer.addData(HS_TRIGGER);
}
+ if (needWrap()) continue;
return;
}
}
@@ -551,11 +567,18 @@
outgoing(Utils.EMPTY_BB_LIST, true);
return;
}
+ if (writeList.isEmpty() && needWrap()) {
+ writer.addData(HS_TRIGGER);
+ }
} catch (Throwable ex) {
handleError(ex);
}
}
+ private boolean needWrap() {
+ return engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP;
+ }
+
private void sendResultBytes(EngineResult result) {
if (result.bytesProduced() > 0) {
debugw.log(Level.DEBUG, "Sending %d bytes downstream",
@@ -678,13 +701,19 @@
private void executeTasks(List<Runnable> tasks) {
exec.execute(() -> {
handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
- try {
- tasks.forEach((r) -> {
- r.run();
- });
- } catch (Throwable t) {
- handleError(t);
- }
+ List<Runnable> nextTasks = tasks;
+ do {
+ try {
+ nextTasks.forEach((r) -> {
+ r.run();
+ });
+ if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
+ nextTasks = obtainTasks();
+ } else break;
+ } catch (Throwable t) {
+ handleError(t);
+ }
+ } while(true);
handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
writer.addData(HS_TRIGGER);
resumeActivity();
--- a/test/jdk/com/sun/net/httpserver/EchoHandler.java Thu Nov 30 22:27:18 2017 +0300
+++ b/test/jdk/com/sun/net/httpserver/EchoHandler.java Sun Dec 03 22:34:23 2017 +0300
@@ -66,8 +66,8 @@
t.sendResponseHeaders(200, in.length);
OutputStream os = t.getResponseBody();
os.write(in);
- close(os);
- close(is);
+ close(t, os);
+ close(t, is);
} else {
OutputStream os = t.getResponseBody();
byte[] buf = new byte[64 * 1024];
@@ -84,15 +84,21 @@
String s = Integer.toString(count);
os.write(s.getBytes());
}
- close(os);
- close(is);
+ close(t, os);
+ close(t, is);
}
}
protected void close(OutputStream os) throws IOException {
- os.close();
+ os.close();
}
protected void close(InputStream is) throws IOException {
- is.close();
+ is.close();
+ }
+ protected void close(HttpExchange t, OutputStream os) throws IOException {
+ close(os);
+ }
+ protected void close(HttpExchange t, InputStream is) throws IOException {
+ close(is);
}
}
--- a/test/jdk/java/net/httpclient/AbstractNoBody.java Thu Nov 30 22:27:18 2017 +0300
+++ b/test/jdk/java/net/httpclient/AbstractNoBody.java Sun Dec 03 22:34:23 2017 +0300
@@ -25,6 +25,7 @@
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
@@ -57,7 +58,8 @@
static final String SIMPLE_STRING = "Hello world. Goodbye world";
static final int ITERATION_COUNT = 10;
// a shared executor helps reduce the amount of threads created by the test
- static final Executor executor = Executors.newCachedThreadPool();
+ static final Executor executor = Executors.newFixedThreadPool(ITERATION_COUNT * 2);
+ static final ExecutorService serverExecutor = Executors.newFixedThreadPool(ITERATION_COUNT * 4);
@DataProvider(name = "variants")
public Object[][] variants() {
@@ -91,6 +93,7 @@
@BeforeTest
public void setup() throws Exception {
+ printStamp(START, "setup");
sslContext = new SimpleSSLContext().get();
if (sslContext == null)
throw new AssertionError("Unexpected null sslContext");
@@ -100,12 +103,14 @@
HttpHandler h1_chunkNoBodyHandler = new HTTP1_ChunkedNoBodyHandler();
InetSocketAddress sa = new InetSocketAddress("localhost", 0);
httpTestServer = HttpServer.create(sa, 0);
+ httpTestServer.setExecutor(serverExecutor);
httpTestServer.createContext("/http1/noBodyFixed", h1_fixedLengthNoBodyHandler);
httpTestServer.createContext("/http1/noBodyChunk", h1_chunkNoBodyHandler);
httpURI_fixed = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/noBodyFixed";
httpURI_chunk = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/noBodyChunk";
httpsTestServer = HttpsServer.create(sa, 0);
+ httpsTestServer.setExecutor(serverExecutor);
httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
httpsTestServer.createContext("/https1/noBodyFixed", h1_fixedLengthNoBodyHandler);
httpsTestServer.createContext("/https1/noBodyChunk", h1_chunkNoBodyHandler);
@@ -116,14 +121,14 @@
Http2Handler h2_fixedLengthNoBodyHandler = new HTTP2_FixedLengthNoBodyHandler();
Http2Handler h2_chunkedNoBodyHandler = new HTTP2_ChunkedNoBodyHandler();
- http2TestServer = new Http2TestServer("127.0.0.1", false, 0);
+ http2TestServer = new Http2TestServer("127.0.0.1", false, 0, serverExecutor, null);
http2TestServer.addHandler(h2_fixedLengthNoBodyHandler, "/http2/noBodyFixed");
http2TestServer.addHandler(h2_chunkedNoBodyHandler, "/http2/noBodyChunk");
int port = http2TestServer.getAddress().getPort();
http2URI_fixed = "http://127.0.0.1:" + port + "/http2/noBodyFixed";
http2URI_chunk = "http://127.0.0.1:" + port + "/http2/noBodyChunk";
- https2TestServer = new Http2TestServer("127.0.0.1", true, 0);
+ https2TestServer = new Http2TestServer("127.0.0.1", true, 0, serverExecutor, sslContext);
https2TestServer.addHandler(h2_fixedLengthNoBodyHandler, "/https2/noBodyFixed");
https2TestServer.addHandler(h2_chunkedNoBodyHandler, "/https2/noBodyChunk");
port = https2TestServer.getAddress().getPort();
@@ -134,16 +139,34 @@
httpsTestServer.start();
http2TestServer.start();
https2TestServer.start();
+ printStamp(END,"setup");
}
@AfterTest
public void teardown() throws Exception {
+ printStamp(START, "teardown");
httpTestServer.stop(0);
httpsTestServer.stop(0);
http2TestServer.stop();
https2TestServer.stop();
+ printStamp(END, "teardown");
}
+ static final long start = System.nanoTime();
+ static final String START = "start";
+ static final String END = "end ";
+ static long elapsed() { return (System.nanoTime() - start)/1000_000;}
+ void printStamp(String what, String fmt, Object... args) {
+ long elapsed = elapsed();
+ long sec = elapsed/1000;
+ long ms = elapsed % 1000;
+ String time = sec > 0 ? sec + "sec " : "";
+ time = time + ms + "ms";
+ System.out.printf("%s: %s \t [%s]\t %s%n",
+ getClass().getSimpleName(), what, time, String.format(fmt,args));
+ }
+
+
static class HTTP1_FixedLengthNoBodyHandler implements HttpHandler {
@Override
public void handle(HttpExchange t) throws IOException {
--- a/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java Thu Nov 30 22:27:18 2017 +0300
+++ b/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java Sun Dec 03 22:34:23 2017 +0300
@@ -178,6 +178,7 @@
try {
return is.readAllBytes();
} catch (IOException io) {
+ io.printStackTrace();
throw new CompletionException(io);
}
}
--- a/test/jdk/java/net/httpclient/ManyRequests.java Thu Nov 30 22:27:18 2017 +0300
+++ b/test/jdk/java/net/httpclient/ManyRequests.java Sun Dec 03 22:34:23 2017 +0300
@@ -107,19 +107,23 @@
System.out.println("Server: received " + e.getRequestURI());
super.handle(e);
}
- protected void close(OutputStream os) throws IOException {
+ @Override
+ protected void close(HttpExchange t, OutputStream os) throws IOException {
if (INSERT_DELAY) {
try { Thread.sleep(rand.nextInt(200)); }
catch (InterruptedException e) {}
}
- os.close();
+ System.out.println("Server: close outbound: " + t.getRequestURI());
+ super.close(t, os);
}
- protected void close(InputStream is) throws IOException {
+ @Override
+ protected void close(HttpExchange t, InputStream is) throws IOException {
if (INSERT_DELAY) {
try { Thread.sleep(rand.nextInt(200)); }
catch (InterruptedException e) {}
}
- is.close();
+ System.out.println("Server: close inbound: " + t.getRequestURI());
+ super.close(t, is);
}
}
--- a/test/jdk/java/net/httpclient/ManyRequestsLegacy.java Thu Nov 30 22:27:18 2017 +0300
+++ b/test/jdk/java/net/httpclient/ManyRequestsLegacy.java Sun Dec 03 22:34:23 2017 +0300
@@ -204,19 +204,21 @@
super.handle(e);
}
@Override
- protected void close(OutputStream os) throws IOException {
+ protected void close(HttpExchange t, OutputStream os) throws IOException {
if (INSERT_DELAY) {
try { Thread.sleep(rand.nextInt(200)); }
catch (InterruptedException e) {}
}
+ System.out.println("Server: close outbound: " + t.getRequestURI());
os.close();
}
@Override
- protected void close(InputStream is) throws IOException {
+ protected void close(HttpExchange t, InputStream is) throws IOException {
if (INSERT_DELAY) {
try { Thread.sleep(rand.nextInt(200)); }
catch (InterruptedException e) {}
}
+ System.out.println("Server: close inbound: " + t.getRequestURI());
is.close();
}
}
--- a/test/jdk/java/net/httpclient/MockServer.java Thu Nov 30 22:27:18 2017 +0300
+++ b/test/jdk/java/net/httpclient/MockServer.java Sun Dec 03 22:34:23 2017 +0300
@@ -45,7 +45,7 @@
*/
public class MockServer extends Thread implements Closeable {
- ServerSocket ss;
+ final ServerSocket ss;
private final List<Connection> sockets;
private final List<Connection> removals;
private final List<Connection> additions;
@@ -296,20 +296,32 @@
@Override
public void run() {
- while (!closed) {
- try {
- System.out.println("Server waiting for connection");
- Socket s = ss.accept();
- Connection c = new Connection(s);
- c.start();
- System.out.println("Server got new connection: " + c);
- synchronized (additions) {
- additions.add(c);
+ try {
+ while (!closed) {
+ try {
+ System.out.println("Server waiting for connection");
+ Socket s = ss.accept();
+ Connection c = new Connection(s);
+ c.start();
+ System.out.println("Server got new connection: " + c);
+ synchronized (additions) {
+ additions.add(c);
+ }
+ } catch (IOException e) {
+ if (closed)
+ return;
+ e.printStackTrace(System.out);
}
- } catch (IOException e) {
- if (closed)
- return;
- e.printStackTrace();
+ }
+ } catch (Throwable t) {
+ System.out.println("Unexpected exception in accept loop: " + t);
+ t.printStackTrace(System.out);
+ } finally {
+ if (closed) {
+ System.out.println("Server closed: exiting accept loop");
+ } else {
+ System.out.println("Server not closed: exiting accept loop and closing");
+ close();
}
}
}
--- a/test/jdk/java/net/httpclient/NoBodyPartOne.java Thu Nov 30 22:27:18 2017 +0300
+++ b/test/jdk/java/net/httpclient/NoBodyPartOne.java Sun Dec 03 22:34:23 2017 +0300
@@ -55,6 +55,7 @@
@Test(dataProvider = "variants")
public void testAsString(String uri, boolean sameClient) throws Exception {
+ printStamp(START, "testAsString(\"%s\", %s)", uri, sameClient);
HttpClient client = null;
for (int i=0; i< ITERATION_COUNT; i++) {
if (!sameClient || client == null)
@@ -68,10 +69,13 @@
String body = response.body();
assertEquals(body, "");
}
+ // We have created many clients here. Try to speed up their release.
+ if (!sameClient) System.gc();
}
@Test(dataProvider = "variants")
public void testAsFile(String uri, boolean sameClient) throws Exception {
+ printStamp(START, "testAsFile(\"%s\", %s)", uri, sameClient);
HttpClient client = null;
for (int i=0; i< ITERATION_COUNT; i++) {
if (!sameClient || client == null)
@@ -86,10 +90,13 @@
assertTrue(Files.exists(bodyPath));
assertEquals(Files.size(bodyPath), 0);
}
+ // We have created many clients here. Try to speed up their release.
+ if (!sameClient) System.gc();
}
@Test(dataProvider = "variants")
public void testAsByteArray(String uri, boolean sameClient) throws Exception {
+ printStamp(START, "testAsByteArray(\"%s\", %s)", uri, sameClient);
HttpClient client = null;
for (int i=0; i< ITERATION_COUNT; i++) {
if (!sameClient || client == null)
@@ -102,5 +109,7 @@
byte[] body = response.body();
assertEquals(body.length, 0);
}
+ // We have created many clients here. Try to speed up their release.
+ if (!sameClient) System.gc();
}
}
--- a/test/jdk/java/net/httpclient/NoBodyPartTwo.java Thu Nov 30 22:27:18 2017 +0300
+++ b/test/jdk/java/net/httpclient/NoBodyPartTwo.java Sun Dec 03 22:34:23 2017 +0300
@@ -57,6 +57,7 @@
volatile boolean consumerHasBeenCalled;
@Test(dataProvider = "variants")
public void testAsByteArrayConsumer(String uri, boolean sameClient) throws Exception {
+ printStamp(START, "testAsByteArrayConsumer(\"%s\", %s)", uri, sameClient);
HttpClient client = null;
for (int i=0; i< ITERATION_COUNT; i++) {
if (!sameClient || client == null)
@@ -73,10 +74,13 @@
client.send(req, asByteArrayConsumer(consumer));
assertTrue(consumerHasBeenCalled);
}
+ // We have created many clients here. Try to speed up their release.
+ if (!sameClient) System.gc();
}
@Test(dataProvider = "variants")
public void testAsInputStream(String uri, boolean sameClient) throws Exception {
+ printStamp(START, "testAsInputStream(\"%s\", %s)", uri, sameClient);
HttpClient client = null;
for (int i=0; i< ITERATION_COUNT; i++) {
if (!sameClient || client == null)
@@ -89,10 +93,13 @@
byte[] body = response.body().readAllBytes();
assertEquals(body.length, 0);
}
+ // We have created many clients here. Try to speed up their release.
+ if (!sameClient) System.gc();
}
@Test(dataProvider = "variants")
public void testBuffering(String uri, boolean sameClient) throws Exception {
+ printStamp(START, "testBuffering(\"%s\", %s)", uri, sameClient);
HttpClient client = null;
for (int i=0; i< ITERATION_COUNT; i++) {
if (!sameClient || client == null)
@@ -105,10 +112,13 @@
byte[] body = response.body();
assertEquals(body.length, 0);
}
+ // We have created many clients here. Try to speed up their release.
+ if (!sameClient) System.gc();
}
@Test(dataProvider = "variants")
public void testDiscard(String uri, boolean sameClient) throws Exception {
+ printStamp(START, "testDiscard(\"%s\", %s)", uri, sameClient);
HttpClient client = null;
for (int i=0; i< ITERATION_COUNT; i++) {
if (!sameClient || client == null)
@@ -121,5 +131,7 @@
HttpResponse<Object> response = client.send(req, discard(obj));
assertEquals(response.body(), obj);
}
+ // We have created many clients here. Try to speed up their release.
+ if (!sameClient) System.gc();
}
}
--- a/test/jdk/java/net/httpclient/http2/BasicTest.java Thu Nov 30 22:27:18 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/BasicTest.java Sun Dec 03 22:34:23 2017 +0300
@@ -118,6 +118,8 @@
public static void test() throws Exception {
try {
initialize();
+ warmup(false);
+ warmup(true);
simpleTest(false, false);
simpleTest(false, true);
simpleTest(true, false);
@@ -218,8 +220,7 @@
}
static void paramsTest() throws Exception {
- Http2TestServer server = new Http2TestServer(true, 0, serverExec, sslContext);
- server.addHandler((t -> {
+ httpsServer.addHandler((t -> {
SSLSession s = t.getSSLSession();
String prot = s.getProtocol();
if (prot.equals("TLSv1.2")) {
@@ -229,9 +230,7 @@
t.sendResponseHeaders(500, -1);
}
}), "/");
- server.start();
- int port = server.getAddress().getPort();
- URI u = new URI("https://127.0.0.1:"+port+"/foo");
+ URI u = new URI("https://127.0.0.1:"+httpsPort+"/foo");
HttpClient client = getClient();
HttpRequest req = HttpRequest.newBuilder(u).build();
HttpResponse<String> resp = client.send(req, asString());
@@ -243,8 +242,8 @@
System.err.println("paramsTest: DONE");
}
- static void simpleTest(boolean secure, boolean ping) throws Exception {
- URI uri = getURI(secure, ping);
+ static void warmup(boolean secure) throws Exception {
+ URI uri = getURI(secure);
System.err.println("Request to " + uri);
// Do a simple warmup request
@@ -254,15 +253,17 @@
.POST(fromString(SIMPLE_STRING))
.build();
HttpResponse<String> response = client.send(req, asString());
+ checkStatus(200, response.statusCode());
+ String responseBody = response.body();
HttpHeaders h = response.headers();
-
- checkStatus(200, response.statusCode());
-
- String responseBody = response.body();
checkStrings(SIMPLE_STRING, responseBody);
-
checkStrings(h.firstValue("x-hello").get(), "world");
checkStrings(h.firstValue("x-bye").get(), "universe");
+ }
+
+ static void simpleTest(boolean secure, boolean ping) throws Exception {
+ URI uri = getURI(secure, ping);
+ System.err.println("Request to " + uri);
// Do loops asynchronously
--- a/test/jdk/java/net/httpclient/http2/server/Http2EchoHandler.java Thu Nov 30 22:27:18 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/server/Http2EchoHandler.java Sun Dec 03 22:34:23 2017 +0300
@@ -31,7 +31,7 @@
public void handle(Http2TestExchange t)
throws IOException {
try {
- System.err.println("EchoHandler received request to " + t.getRequestURI());
+ System.err.printf("EchoHandler received request to %s from %s\n", t.getRequestURI(), t.getRemoteAddress());
InputStream is = t.getRequestBody();
HttpHeadersImpl map = t.getRequestHeaders();
HttpHeadersImpl map1 = t.getResponseHeaders();
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Thu Nov 30 22:27:18 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServer.java Sun Dec 03 22:34:23 2017 +0300
@@ -35,6 +35,7 @@
import javax.net.ssl.SSLServerSocket;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.SNIServerName;
+import jdk.incubator.http.internal.frame.ErrorFrame;
/**
* Waits for incoming TCP connections from a client and establishes
@@ -172,8 +173,9 @@
public void stop() {
// TODO: clean shutdown GoAway
stopping = true;
+ System.err.printf("Server stopping %d connections\n", connections.size());
for (Http2TestServerConnection connection : connections.values()) {
- connection.close();
+ connection.close(ErrorFrame.NO_ERROR);
}
try {
server.close();
@@ -223,7 +225,7 @@
// and if so then the client might wait
// forever.
connections.remove(addr, c);
- c.close();
+ c.close(ErrorFrame.PROTOCOL_ERROR);
throw e;
}
}
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Thu Nov 30 22:27:18 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Sun Dec 03 22:34:23 2017 +0300
@@ -85,6 +85,8 @@
Sentinel() { super(-1,-1);}
}
+ static final Sentinel sentinel = new Sentinel();
+
class PingRequest {
final byte[] pingData;
final long pingStamp;
@@ -114,8 +116,6 @@
}
}
- static Sentinel sentinel;
-
Http2TestServerConnection(Http2TestServer server,
Socket socket,
Http2TestExchangeSupplier exchangeSupplier)
@@ -159,6 +159,13 @@
return ping.response();
}
+ void goAway(int error) throws IOException {
+ int laststream = nextstream >= 3 ? nextstream - 2 : 1;
+
+ GoAwayFrame go = new GoAwayFrame(laststream, error);
+ outputQ.put(go);
+ }
+
/**
* Returns the first PingRequest from Queue
*/
@@ -173,7 +180,7 @@
void handlePing(PingFrame ping) throws IOException {
if (ping.streamid() != 0) {
System.err.println("Invalid ping received");
- close();
+ close(ErrorFrame.PROTOCOL_ERROR);
return;
}
if (ping.getFlag(PingFrame.ACK)) {
@@ -181,7 +188,7 @@
PingRequest request = getNextRequest();
if (request == null) {
System.err.println("Invalid ping ACK received");
- close();
+ close(ErrorFrame.PROTOCOL_ERROR);
return;
} else if (!Arrays.equals(request.pingData, ping.getData())) {
request.fail(new RuntimeException("Wrong ping data in ACK"));
@@ -231,15 +238,25 @@
sock.getSession(); // blocks until handshake done
}
- void close() {
+ void closeIncoming() {
+ close(-1);
+ }
+
+ void close(int error) {
+ if (stopping)
+ return;
stopping = true;
+ System.err.printf("Server connection to %s stopping. %d streams\n",
+ socket.getRemoteSocketAddress().toString(), streams.size());
streams.forEach((i, q) -> {
- q.close();
+ q.orderlyClose();
});
try {
+ if (error != -1)
+ goAway(error);
+ outputQ.orderlyClose();
socket.close();
- // TODO: put a reset on each stream
- } catch (IOException e) {
+ } catch (Exception e) {
}
}
@@ -321,8 +338,21 @@
nextstream = 3;
}
- exec.submit(this::readLoop);
- exec.submit(this::writeLoop);
+ (new ConnectionThread("readLoop", this::readLoop)).start();
+ (new ConnectionThread("writeLoop", this::writeLoop)).start();
+ }
+
+ class ConnectionThread extends Thread {
+ final Runnable r;
+ ConnectionThread(String name, Runnable r) {
+ setName(name);
+ setDaemon(true);
+ this.r = r;
+ }
+
+ public void run() {
+ r.run();
+ }
}
private void writeFrame(Http2Frame frame) throws IOException {
@@ -369,7 +399,7 @@
return;
} else if (f instanceof GoAwayFrame) {
System.err.println("Closing: "+ f.toString());
- close();
+ close(ErrorFrame.NO_ERROR);
} else if (f instanceof PingFrame) {
handlePing((PingFrame)f);
} else
@@ -569,7 +599,11 @@
void readLoop() {
try {
while (!stopping) {
- Http2Frame frame = readFrame();
+ Http2Frame frame = readFrameImpl();
+ if (frame == null) {
+ closeIncoming();
+ return;
+ }
//System.err.printf("TestServer: received frame %s\n", frame);
int stream = frame.streamid();
if (stream == 0) {
@@ -625,7 +659,7 @@
System.err.println("Http server reader thread shutdown");
e.printStackTrace();
}
- close();
+ close(ErrorFrame.PROTOCOL_ERROR);
}
}
@@ -667,6 +701,8 @@
Http2Frame frame;
try {
frame = outputQ.take();
+ if (stopping)
+ break;
} catch(IOException x) {
if (stopping && x.getCause() instanceof InterruptedException) {
break;
@@ -742,27 +778,46 @@
}
private Http2Frame readFrame() throws IOException {
- byte[] buf = new byte[9];
- if (is.readNBytes(buf, 0, 9) != 9)
- throw new IOException("readFrame: connection closed");
- int len = 0;
- for (int i = 0; i < 3; i++) {
- int n = buf[i] & 0xff;
- //System.err.println("n = " + n);
- len = (len << 8) + n;
+ Http2Frame f = readFrameImpl();
+ if (f == null)
+ throw new IOException("connection closed");
+ return f;
+ }
+
+ // does not throw an exception for EOF
+ private Http2Frame readFrameImpl() throws IOException {
+ try {
+ byte[] buf = new byte[9];
+ int ret;
+ ret=is.readNBytes(buf, 0, 9);
+ if (ret == 0) {
+ return null;
+ } else if (ret != 9) {
+ throw new IOException("readFrame: connection closed");
+ }
+ int len = 0;
+ for (int i = 0; i < 3; i++) {
+ int n = buf[i] & 0xff;
+ //System.err.println("n = " + n);
+ len = (len << 8) + n;
+ }
+ byte[] rest = new byte[len];
+ int n = is.readNBytes(rest, 0, len);
+ if (n != len)
+ throw new IOException("Error reading frame");
+ List<Http2Frame> frames = new ArrayList<>();
+ FramesDecoder reader = new FramesDecoder(frames::add);
+ reader.decode(ByteBuffer.wrap(buf));
+ reader.decode(ByteBuffer.wrap(rest));
+ if (frames.size()!=1)
+ throw new IOException("Expected 1 frame got "+frames.size()) ;
+
+ return frames.get(0);
+ } catch (IOException ee) {
+ if (stopping)
+ return null;
+ throw ee;
}
- byte[] rest = new byte[len];
- int n = is.readNBytes(rest, 0, len);
- if (n != len)
- throw new IOException("Error reading frame");
- List<Http2Frame> frames = new ArrayList<>();
- FramesDecoder reader = new FramesDecoder(frames::add);
- reader.decode(ByteBuffer.wrap(buf));
- reader.decode(ByteBuffer.wrap(rest));
- if (frames.size()!=1)
- throw new IOException("Expected 1 frame got "+frames.size()) ;
-
- return frames.get(0);
}
void sendSettingsFrame() throws IOException {
--- a/test/jdk/java/net/httpclient/http2/server/Queue.java Thu Nov 30 22:27:18 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/server/Queue.java Sun Dec 03 22:34:23 2017 +0300
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.LinkedList;
+import java.util.Objects;
import java.util.stream.Stream;
// Each stream has one of these for input. Each Http2Connection has one
@@ -36,13 +37,11 @@
private boolean closed = false;
private boolean closing = false;
private Throwable exception = null;
- private Runnable callback;
- private boolean callbackDisabled = false;
private int waiters; // true if someone waiting
private final T closeSentinel;
Queue(T closeSentinel) {
- this.closeSentinel = closeSentinel;
+ this.closeSentinel = Objects.requireNonNull(closeSentinel);
}
public synchronized int size() {
@@ -50,6 +49,7 @@
}
public synchronized void put(T obj) throws IOException {
+ Objects.requireNonNull(obj);
if (closed || closing) {
throw new IOException("stream closed");
}
@@ -59,16 +59,6 @@
if (waiters > 0) {
notifyAll();
}
-
- if (callbackDisabled) {
- return;
- }
-
- if (q.size() > 0 && callback != null) {
- // Note: calling callback while holding the lock is
- // dangerous and may lead to deadlocks.
- callback.run();
- }
}
// Other close() variants are immediate and abortive
@@ -77,6 +67,7 @@
public synchronized void orderlyClose() {
if (closing || closed)
return;
+
try {
put(closeSentinel);
} catch (IOException e) {
@@ -87,6 +78,8 @@
@Override
public synchronized void close() {
+ if (closed)
+ return;
closed = true;
notifyAll();
}
@@ -123,6 +116,7 @@
if (item.equals(closeSentinel)) {
closed = true;
assert q.isEmpty();
+ return null;
}
return item;
} catch (InterruptedException ex) {
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java Thu Nov 30 22:27:18 2017 +0300
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java Sun Dec 03 22:34:23 2017 +0300
@@ -55,6 +55,7 @@
import java.security.cert.CertificateException;
import java.util.List;
import java.util.Queue;
+import java.util.Random;
import java.util.StringTokenizer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
@@ -69,6 +70,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
@Test
public class SSLTubeTest {
@@ -76,6 +79,17 @@
private static final long COUNTER = 600;
private static final int LONGS_PER_BUF = 800;
private static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF;
+ public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0);
+
+ static final Random rand = new Random();
+
+ static int randomRange(int lower, int upper) {
+ if (lower > upper)
+ throw new IllegalArgumentException("lower > upper");
+ int diff = upper - lower;
+ int r = lower + rand.nextInt(diff);
+ return r - (r % 8); // round down to multiple of 8 (align for longs)
+ }
private static ByteBuffer getBuffer(long startingAt) {
ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8);
@@ -86,18 +100,31 @@
return buf;
}
- @Test(timeOut = 30000)
- public void run() throws IOException {
- /* Start of wiring */
+ @Test
+ public void runWithSSLLoopackServer() throws IOException {
ExecutorService sslExecutor = Executors.newCachedThreadPool();
+
+ /* Start of wiring */
/* Emulates an echo server */
-// FlowTube server = new SSLTube(createSSLEngine(false),
-// sslExecutor,
-// new EchoTube(16));
SSLLoopbackSubscriber server =
new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor);
server.start();
+ run(server, sslExecutor);
+ }
+
+ @Test
+ public void runWithEchoServer() throws IOException {
+ ExecutorService sslExecutor = Executors.newCachedThreadPool();
+
+ /* Start of wiring */
+ /* Emulates an echo server */
+ FlowTube server = crossOverEchoServer(sslExecutor);
+
+ run(server, sslExecutor);
+ }
+
+ private void run(FlowTube server, ExecutorService sslExecutor) throws IOException {
FlowTube client = new SSLTube(createSSLEngine(true),
sslExecutor,
server);
@@ -128,6 +155,9 @@
}
}
+ /**
+ * This is a copy of the SSLLoopbackSubscriber used in FlowTest
+ */
static class SSLLoopbackSubscriber implements FlowTube {
private final BlockingQueue<ByteBuffer> buffer;
private final Socket clientSock;
@@ -173,7 +203,7 @@
private void clientReader() {
try {
InputStream is = clientSock.getInputStream();
- final int bufsize = FlowTest.randomRange(512, 16 * 1024);
+ final int bufsize = randomRange(512, 16 * 1024);
System.out.println("clientReader: bufsize = " + bufsize);
while (true) {
byte[] buf = new byte[bufsize];
@@ -206,7 +236,7 @@
while (true) {
ByteBuffer buf = buffer.take();
- if (buf == FlowTest.SENTINEL) {
+ if (buf == SENTINEL) {
// finished
//Utils.sleep(2000);
System.out.println("clientWriter close: " + nbytes + " written");
@@ -249,7 +279,7 @@
try {
InputStream is = serverSock.getInputStream();
OutputStream os = serverSock.getOutputStream();
- final int bufsize = FlowTest.randomRange(512, 16 * 1024);
+ final int bufsize = randomRange(512, 16 * 1024);
System.out.println("serverLoopback: bufsize = " + bufsize);
byte[] bb = new byte[bufsize];
while (true) {
@@ -305,7 +335,7 @@
@Override
public void onComplete() {
try {
- buffer.put(FlowTest.SENTINEL);
+ buffer.put(SENTINEL);
} catch (InterruptedException e) {
e.printStackTrace();
Utils.close(clientSock);
@@ -330,150 +360,366 @@
}
}
-// private static final class EchoTube implements FlowTube {
-//
-// private final static Object EOF = new Object();
-// private final Executor executor = Executors.newSingleThreadExecutor();
-//
-// private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
-// private final int maxQueueSize;
-// private final SequentialScheduler processingScheduler =
-// new SequentialScheduler(createProcessingTask());
-//
-// /* Writing into this tube */
-// private long unfulfilled;
-// private Flow.Subscription subscription;
-//
-// /* Reading from this tube */
-// private final Demand demand = new Demand();
-// private final AtomicBoolean cancelled = new AtomicBoolean();
-// private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
-//
-// private EchoTube(int maxBufferSize) {
-// if (maxBufferSize < 1)
-// throw new IllegalArgumentException();
-// this.maxQueueSize = maxBufferSize;
-// }
-//
-// @Override
-// public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
-// this.subscriber = subscriber;
-// System.out.println("EchoTube got subscriber: " + subscriber);
-// this.subscriber.onSubscribe(new InternalSubscription());
-// }
-//
-// @Override
-// public void onSubscribe(Flow.Subscription subscription) {
-// unfulfilled = maxQueueSize;
-// System.out.println("EchoTube request: " + maxQueueSize);
-// (this.subscription = subscription).request(maxQueueSize);
-// }
-//
-// @Override
-// public void onNext(List<ByteBuffer> item) {
-// if (--unfulfilled == (maxQueueSize / 2)) {
-// long req = maxQueueSize - unfulfilled;
-// subscription.request(req);
-// System.out.println("EchoTube request: " + req);
-// unfulfilled = maxQueueSize;
-// }
-// System.out.println("EchoTube add " + Utils.remaining(item));
-// queue.add(item);
-// processingScheduler.deferOrSchedule(executor);
-// }
-//
-// @Override
-// public void onError(Throwable throwable) {
-// System.out.println("EchoTube add " + throwable);
-// queue.add(throwable);
-// processingScheduler.deferOrSchedule(executor);
-// }
-//
-// @Override
-// public void onComplete() {
-// System.out.println("EchoTube add EOF");
-// queue.add(EOF);
-// processingScheduler.deferOrSchedule(executor);
-// }
-//
-// @Override
-// public boolean isFinished() {
-// return false;
-// }
-//
-// private class InternalSubscription implements Flow.Subscription {
-//
-// @Override
-// public void request(long n) {
-// System.out.println("EchoTube got request: " + n);
-// if (n <= 0) {
-// throw new InternalError();
-// }
-// demand.increase(n);
-// processingScheduler.runOrSchedule();
-// }
-//
-// @Override
-// public void cancel() {
-// cancelled.set(true);
-// }
-// }
-//
-// @Override
-// public String toString() {
-// return "EchoTube";
-// }
-//
-// private SequentialScheduler.RestartableTask createProcessingTask() {
-// return new SequentialScheduler.CompleteRestartableTask() {
-//
-// @Override
-// protected void run() {
-// try {
-// while (!cancelled.get()) {
-// Object item = queue.peek();
-// if (item == null)
-// return;
-// try {
-// System.out.println("EchoTube processing item");
-// if (item instanceof List) {
-// if (!demand.tryDecrement()) {
-// System.out.println("EchoTube no demand");
-// return;
-// }
-// @SuppressWarnings("unchecked")
-// List<ByteBuffer> bytes = (List<ByteBuffer>) item;
-// Object removed = queue.remove();
-// assert removed == item;
-// System.out.println("EchoTube processing "
-// + Utils.remaining(bytes));
-// subscriber.onNext(bytes);
-// } else if (item instanceof Throwable) {
-// cancelled.set(true);
-// Object removed = queue.remove();
-// assert removed == item;
-// System.out.println("EchoTube processing " + item);
-// subscriber.onError((Throwable) item);
-// } else if (item == EOF) {
-// cancelled.set(true);
-// Object removed = queue.remove();
-// assert removed == item;
-// System.out.println("EchoTube processing EOF");
-// subscriber.onComplete();
-// } else {
-// throw new InternalError(String.valueOf(item));
-// }
-// } finally {
-// }
-// }
-// } catch(Throwable t) {
-// t.printStackTrace();
-// throw t;
-// }
-// }
-// };
-// }
-// }
+
+ /**
+ * Creates a cross-over FlowTube than can be plugged into a client-side
+ * SSLTube (in place of the SSLLoopbackSubscriber).
+ * Note that the only method that can be called on the return tube
+ * is connectFlows(). Calling any other method will trigger an
+ * InternalError.
+ * @param sslExecutor an executor
+ * @return a cross-over FlowTube connected to an EchoTube.
+ * @throws IOException
+ */
+ FlowTube crossOverEchoServer(Executor sslExecutor) throws IOException {
+ LateBindingTube crossOver = new LateBindingTube();
+ FlowTube server = new SSLTube(createSSLEngine(false),
+ sslExecutor,
+ crossOver);
+ EchoTube echo = new EchoTube(6);
+ server.connectFlows(FlowTube.asTubePublisher(echo), FlowTube.asTubeSubscriber(echo));
+
+ return new CrossOverTube(crossOver);
+ }
+
+ /**
+ * A cross-over FlowTube that makes it possible to reverse the direction
+ * of flows. The typical usage is to connect an two opposite SSLTube,
+ * one encrypting, one decrypting, to e.g. an EchoTube, with the help
+ * of a LateBindingTube:
+ * {@code
+ * client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube
+ * }
+ * <p>
+ * Note that the only method that can be called on the CrossOverTube is
+ * connectFlows(). Calling any other method will cause an InternalError to
+ * be thrown.
+ * Also connectFlows() can be called only once.
+ */
+ private static final class CrossOverTube implements FlowTube {
+ final LateBindingTube tube;
+ CrossOverTube(LateBindingTube tube) {
+ this.tube = tube;
+ }
+
+ @Override
+ public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+ throw newInternalError();
+ }
+
+ @Override
+ public void connectFlows(TubePublisher writePublisher, TubeSubscriber readSubscriber) {
+ tube.start(writePublisher, readSubscriber);
+ }
+
+ @Override
+ public boolean isFinished() {
+ return tube.isFinished();
+ }
+
+ Error newInternalError() {
+ InternalError error = new InternalError();
+ error.printStackTrace(System.out);
+ return error;
+ }
+
+ @Override
+ public void onSubscribe(Flow.Subscription subscription) {
+ throw newInternalError();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throw newInternalError();
+ }
+
+ @Override
+ public void onComplete() {
+ throw newInternalError();
+ }
+
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ throw newInternalError();
+ }
+ }
+
+ /**
+ * A late binding tube that makes it possible to create an
+ * SSLTube before the right-hand-side tube has been created.
+ * The typical usage is to make it possible to connect two
+ * opposite SSLTube (one encrypting, one decrypting) through a
+ * CrossOverTube:
+ * {@code
+ * client app => SSLTube => CrossOverTube <= LateBindingTube <= SSLTube <= EchoTube
+ * }
+ * <p>
+ * Note that this class only supports a single call to start(): it cannot be
+ * subscribed more than once from its left-hand-side (the cross over tube side).
+ */
+ private static class LateBindingTube implements FlowTube {
+
+ final CompletableFuture<Flow.Publisher<List<ByteBuffer>>> futurePublisher
+ = new CompletableFuture<>();
+ final ConcurrentLinkedQueue<Consumer<Flow.Subscriber<? super List<ByteBuffer>>>> queue
+ = new ConcurrentLinkedQueue<>();
+ AtomicReference<Flow.Subscriber<? super List<ByteBuffer>>> subscriberRef = new AtomicReference<>();
+ SequentialScheduler scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
+ AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ private volatile boolean finished;
+ private volatile boolean completed;
+
+
+ public void start(Flow.Publisher<List<ByteBuffer>> publisher,
+ Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+ subscriberRef.set(subscriber);
+ futurePublisher.complete(publisher);
+ scheduler.runOrSchedule();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return finished;
+ }
+
+ @Override
+ public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+ futurePublisher.thenAccept((p) -> p.subscribe(subscriber));
+ scheduler.runOrSchedule();
+ }
+
+ @Override
+ public void onSubscribe(Flow.Subscription subscription) {
+ queue.add((s) -> s.onSubscribe(subscription));
+ scheduler.runOrSchedule();
+ }
+
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ queue.add((s) -> s.onNext(item));
+ scheduler.runOrSchedule();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ System.out.println("LateBindingTube onError");
+ throwable.printStackTrace(System.out);
+ queue.add((s) -> {
+ errorRef.compareAndSet(null, throwable);
+ try {
+ System.out.println("LateBindingTube subscriber onError: " + throwable);
+ s.onError(errorRef.get());
+ } finally {
+ finished = true;
+ System.out.println("LateBindingTube finished");
+ }
+ });
+ scheduler.runOrSchedule();
+ }
+
+ @Override
+ public void onComplete() {
+ System.out.println("LateBindingTube completing");
+ queue.add((s) -> {
+ completed = true;
+ try {
+ System.out.println("LateBindingTube complete subscriber");
+ s.onComplete();
+ } finally {
+ finished = true;
+ System.out.println("LateBindingTube finished");
+ }
+ });
+ scheduler.runOrSchedule();
+ }
+
+ private void loop() {
+ if (finished) {
+ scheduler.stop();
+ return;
+ }
+ Flow.Subscriber<? super List<ByteBuffer>> subscriber = subscriberRef.get();
+ if (subscriber == null) return;
+ try {
+ Consumer<Flow.Subscriber<? super List<ByteBuffer>>> s;
+ while ((s = queue.poll()) != null) {
+ s.accept(subscriber);
+ }
+ } catch (Throwable t) {
+ if (errorRef.compareAndSet(null, t)) {
+ onError(t);
+ }
+ }
+ }
+ }
+
+ /**
+ * An echo tube that just echoes back whatever bytes it receives.
+ * This cannot be plugged to the right-hand-side of an SSLTube
+ * since handshake data cannot be simply echoed back, and
+ * application data most likely also need to be decrypted and
+ * re-encrypted.
+ */
+ private static final class EchoTube implements FlowTube {
+
+ private final static Object EOF = new Object();
+ private final Executor executor = Executors.newSingleThreadExecutor();
+
+ private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
+ private final int maxQueueSize;
+ private final SequentialScheduler processingScheduler =
+ new SequentialScheduler(createProcessingTask());
+
+ /* Writing into this tube */
+ private volatile long requested;
+ private Flow.Subscription subscription;
+
+ /* Reading from this tube */
+ private final Demand demand = new Demand();
+ private final AtomicBoolean cancelled = new AtomicBoolean();
+ private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
+
+ private EchoTube(int maxBufferSize) {
+ if (maxBufferSize < 1)
+ throw new IllegalArgumentException();
+ this.maxQueueSize = maxBufferSize;
+ }
+
+ @Override
+ public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+ this.subscriber = subscriber;
+ System.out.println("EchoTube got subscriber: " + subscriber);
+ this.subscriber.onSubscribe(new InternalSubscription());
+ }
+
+ @Override
+ public void onSubscribe(Flow.Subscription subscription) {
+ System.out.println("EchoTube request: " + maxQueueSize);
+ (this.subscription = subscription).request(requested = maxQueueSize);
+ }
+
+ private void requestMore() {
+ Flow.Subscription s = subscription;
+ if (s == null || cancelled.get()) return;
+ long unfulfilled = queue.size() + --requested;
+ if (unfulfilled <= maxQueueSize/2) {
+ long req = maxQueueSize - unfulfilled;
+ requested += req;
+ s.request(req);
+ System.out.printf("EchoTube request: %s [requested:%s, queue:%s, unfulfilled:%s]%n",
+ req, requested-req, queue.size(), unfulfilled );
+ }
+ }
+
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ System.out.printf("EchoTube add %s [requested:%s, queue:%s]%n",
+ Utils.remaining(item), requested, queue.size());
+ queue.add(item);
+ processingScheduler.deferOrSchedule(executor);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ System.out.println("EchoTube add " + throwable);
+ queue.add(throwable);
+ processingScheduler.deferOrSchedule(executor);
+ }
+
+ @Override
+ public void onComplete() {
+ System.out.println("EchoTube add EOF");
+ queue.add(EOF);
+ processingScheduler.deferOrSchedule(executor);
+ }
+
+ @Override
+ public boolean isFinished() {
+ return cancelled.get();
+ }
+
+ private class InternalSubscription implements Flow.Subscription {
+
+ @Override
+ public void request(long n) {
+ System.out.println("EchoTube got request: " + n);
+ if (n <= 0) {
+ throw new InternalError();
+ }
+ if (demand.increase(n)) {
+ processingScheduler.deferOrSchedule(executor);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ cancelled.set(true);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "EchoTube";
+ }
+
+ int transmitted = 0;
+ private SequentialScheduler.RestartableTask createProcessingTask() {
+ return new SequentialScheduler.CompleteRestartableTask() {
+
+ @Override
+ protected void run() {
+ try {
+ while (!cancelled.get()) {
+ Object item = queue.peek();
+ if (item == null) {
+ System.out.printf("EchoTube: queue empty, requested=%s, demand=%s, transmitted=%s%n",
+ requested, demand.get(), transmitted);
+ requestMore();
+ return;
+ }
+ try {
+ System.out.printf("EchoTube processing item, requested=%s, demand=%s, transmitted=%s%n",
+ requested, demand.get(), transmitted);
+ if (item instanceof List) {
+ if (!demand.tryDecrement()) {
+ System.out.println("EchoTube no demand");
+ return;
+ }
+ @SuppressWarnings("unchecked")
+ List<ByteBuffer> bytes = (List<ByteBuffer>) item;
+ Object removed = queue.remove();
+ assert removed == item;
+ System.out.println("EchoTube processing "
+ + Utils.remaining(bytes));
+ transmitted++;
+ subscriber.onNext(bytes);
+ requestMore();
+ } else if (item instanceof Throwable) {
+ cancelled.set(true);
+ Object removed = queue.remove();
+ assert removed == item;
+ System.out.println("EchoTube processing " + item);
+ subscriber.onError((Throwable) item);
+ } else if (item == EOF) {
+ cancelled.set(true);
+ Object removed = queue.remove();
+ assert removed == item;
+ System.out.println("EchoTube processing EOF");
+ subscriber.onComplete();
+ } else {
+ throw new InternalError(String.valueOf(item));
+ }
+ } finally {
+ }
+ }
+ } catch(Throwable t) {
+ t.printStackTrace();
+ throw t;
+ }
+ }
+ };
+ }
+ }
/**
* The final subscriber which receives the decrypted looped-back data. Just
@@ -518,13 +764,13 @@
if (--unfulfilled == (REQUEST_WINDOW / 2)) {
long req = REQUEST_WINDOW - unfulfilled;
System.out.println("EndSubscriber request " + req);
+ unfulfilled = REQUEST_WINDOW;
subscription.request(req);
- unfulfilled = REQUEST_WINDOW;
}
long currval = counter.get();
if (currval % 500 == 0) {
- System.out.println("End: " + currval);
+ System.out.println("EndSubscriber: " + currval);
}
System.out.println("EndSubscriber onNext " + Utils.remaining(buffers));