http-client-branch: workaround issue with SubmissionPublisher::close in several tests
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/AbstractSSLTubeTest.java Wed Dec 06 18:47:54 2017 +0300
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/AbstractSSLTubeTest.java Wed Dec 06 16:08:15 2017 +0000
@@ -23,11 +23,8 @@
package jdk.incubator.http;
-import jdk.incubator.http.internal.common.Demand;
import jdk.incubator.http.internal.common.FlowTube;
-import jdk.incubator.http.internal.common.SSLFlowDelegate;
import jdk.incubator.http.internal.common.SSLTube;
-import jdk.incubator.http.internal.common.SequentialScheduler;
import jdk.incubator.http.internal.common.Utils;
import org.testng.annotations.Test;
@@ -35,17 +32,11 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
-import javax.net.ssl.SSLSocket;
import javax.net.ssl.TrustManagerFactory;
-import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.KeyManagementException;
import java.security.KeyStore;
@@ -54,24 +45,15 @@
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.List;
-import java.util.Queue;
import java.util.Random;
import java.util.StringTokenizer;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SubmissionPublisher;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
public class AbstractSSLTubeTest extends AbstractRandomTest {
@@ -79,6 +61,15 @@
public static final int LONGS_PER_BUF = 800;
public static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF;
public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0);
+ // This is a hack to work around an issue with SubmissionPublisher.
+ // SubmissionPublisher will call onComplete immediately without forwarding
+ // remaining pending data if SubmissionPublisher.close() is called when
+ // there is no demand. In other words, it doesn't wait for the subscriber
+ // to pull all the data before calling onComplete.
+ // We use a CountDownLatch to figure out when it is safe to call close().
+ // This may cause the test to hang if data are buffered.
+ protected final CountDownLatch allBytesReceived = new CountDownLatch(1);
+
protected static ByteBuffer getBuffer(long startingAt) {
ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8);
@@ -89,7 +80,9 @@
return buf;
}
- protected void run(FlowTube server, ExecutorService sslExecutor) throws IOException {
+ protected void run(FlowTube server,
+ ExecutorService sslExecutor,
+ CountDownLatch allBytesReceived) throws IOException {
FlowTube client = new SSLTube(createSSLEngine(true),
sslExecutor,
server);
@@ -98,7 +91,7 @@
Integer.MAX_VALUE);
FlowTube.TubePublisher begin = p::subscribe;
CompletableFuture<Void> completion = new CompletableFuture<>();
- EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion);
+ EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion, allBytesReceived);
client.connectFlows(begin, end);
/* End of wiring */
@@ -111,7 +104,14 @@
p.submit(List.of(b));
}
System.out.println("Finished submission. Waiting for loopback");
+ completion.whenComplete((r,t) -> allBytesReceived.countDown());
+ try {
+ allBytesReceived.await();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
p.close();
+ System.out.println("All bytes received: calling publisher.close()");
try {
completion.join();
System.out.println("OK");
@@ -140,12 +140,15 @@
private final long nbytes;
private final AtomicLong counter = new AtomicLong();
private final CompletableFuture<?> completion;
+ private final CountDownLatch allBytesReceived;
private volatile Flow.Subscription subscription;
private long unfulfilled;
- EndSubscriber(long nbytes, CompletableFuture<?> completion) {
+ EndSubscriber(long nbytes, CompletableFuture<?> completion,
+ CountDownLatch allBytesReceived) {
this.nbytes = nbytes;
this.completion = completion;
+ this.allBytesReceived = allBytesReceived;
}
@Override
@@ -184,7 +187,7 @@
for (ByteBuffer buf : buffers) {
while (buf.hasRemaining()) {
long n = buf.getLong();
- if (currval > (AbstractSSLTubeTest.TOTAL_LONGS - 50)) {
+ if (currval > (TOTAL_LONGS - 50)) {
System.out.println("End: " + currval);
}
if (n != currval++) {
@@ -197,12 +200,16 @@
}
counter.set(currval);
+ if (currval >= TOTAL_LONGS) {
+ allBytesReceived.countDown();
+ }
}
@Override
public void onError(Throwable throwable) {
System.out.println("EndSubscriber onError " + throwable);
completion.completeExceptionally(throwable);
+ allBytesReceived.countDown();
}
@Override
@@ -215,7 +222,9 @@
System.out.println("DONE OK");
completion.complete(null);
}
+ allBytesReceived.countDown();
}
+
@Override
public String toString() {
return "EndSubscriber";
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/FlowTest.java Wed Dec 06 18:47:54 2017 +0300
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/FlowTest.java Wed Dec 06 16:08:15 2017 +0000
@@ -42,6 +42,7 @@
import java.util.StringTokenizer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
@@ -68,6 +69,15 @@
public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0);
static volatile String alpn;
+ // This is a hack to work around an issue with SubmissionPublisher.
+ // SubmissionPublisher will call onComplete immediately without forwarding
+ // remaining pending data if SubmissionPublisher.close() is called when
+ // there is no demand. In other words, it doesn't wait for the subscriber
+ // to pull all the data before calling onComplete.
+ // We use a CountDownLatch to figure out when it is safe to call close().
+ // This may cause the test to hang if data are buffered.
+ final CountDownLatch allBytesReceived = new CountDownLatch(1);
+
private final CompletableFuture<Void> completion;
public FlowTest() throws IOException {
@@ -82,9 +92,9 @@
engineClient.setSSLParameters(params);
engineClient.setUseClientMode(true);
completion = new CompletableFuture<>();
- SSLLoopbackSubscriber looper = new SSLLoopbackSubscriber(ctx, executor);
+ SSLLoopbackSubscriber looper = new SSLLoopbackSubscriber(ctx, executor, allBytesReceived);
looper.start();
- EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion);
+ EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion, allBytesReceived);
SSLFlowDelegate sslClient = new SSLFlowDelegate(engineClient, executor, end, looper);
// going to measure how long handshake takes
final long start = System.currentTimeMillis();
@@ -131,6 +141,14 @@
srcPublisher.submit(List.of(b));
}
System.out.println("Finished submission. Waiting for loopback");
+ // make sure we don't wait for allBytesReceived in case of error.
+ completion.whenComplete((r,t) -> allBytesReceived.countDown());
+ try {
+ allBytesReceived.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ System.out.println("All bytes received: ");
srcPublisher.close();
try {
completion.join();
@@ -172,8 +190,11 @@
private final Thread thread1, thread2, thread3;
private volatile Flow.Subscription clientSubscription;
private final SubmissionPublisher<List<ByteBuffer>> publisher;
+ private final CountDownLatch allBytesReceived;
- SSLLoopbackSubscriber(SSLContext ctx, ExecutorService exec) throws IOException {
+ SSLLoopbackSubscriber(SSLContext ctx,
+ ExecutorService exec,
+ CountDownLatch allBytesReceived) throws IOException {
SSLServerSocketFactory fac = ctx.getServerSocketFactory();
SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0);
SSLParameters params = serv.getSSLParameters();
@@ -185,6 +206,7 @@
clientSock = new Socket("127.0.0.1", serverPort);
serverSock = (SSLSocket) serv.accept();
this.buffer = new LinkedBlockingQueue<>();
+ this.allBytesReceived = allBytesReceived;
thread1 = new Thread(this::clientWriter, "clientWriter");
thread2 = new Thread(this::serverLoopback, "serverLoopback");
thread3 = new Thread(this::clientReader, "clientReader");
@@ -218,6 +240,10 @@
if (n == -1) {
System.out.println("clientReader close: read "
+ readCount.get() + " bytes");
+ System.out.println("clientReader: got EOF. "
+ + "Waiting signal to close publisher.");
+ allBytesReceived.await();
+ System.out.println("clientReader: closing publisher");
publisher.close();
sleep(2000);
Utils.close(is, clientSock);
@@ -361,11 +387,15 @@
private final AtomicLong counter;
private volatile Flow.Subscription subscription;
private final CompletableFuture<Void> completion;
+ private final CountDownLatch allBytesReceived;
- EndSubscriber(long nbytes, CompletableFuture<Void> completion) {
+ EndSubscriber(long nbytes,
+ CompletableFuture<Void> completion,
+ CountDownLatch allBytesReceived) {
counter = new AtomicLong(0);
this.nbytes = nbytes;
this.completion = completion;
+ this.allBytesReceived = allBytesReceived;
}
@Override
@@ -408,10 +438,14 @@
counter.set(currval);
subscription.request(1);
+ if (currval >= TOTAL_LONGS) {
+ allBytesReceived.countDown();
+ }
}
@Override
public void onError(Throwable throwable) {
+ allBytesReceived.countDown();
completion.completeExceptionally(throwable);
}
@@ -423,6 +457,7 @@
completion.completeExceptionally(new RuntimeException("ERROR AT END"));
} else {
System.out.println("DONE OK: counter = " + n);
+ allBytesReceived.countDown();
completion.complete(null);
}
}
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLEchoTubeTest.java Wed Dec 06 18:47:54 2017 +0300
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLEchoTubeTest.java Wed Dec 06 16:08:15 2017 +0000
@@ -25,51 +25,22 @@
import jdk.incubator.http.internal.common.Demand;
import jdk.incubator.http.internal.common.FlowTube;
-import jdk.incubator.http.internal.common.SSLFlowDelegate;
import jdk.incubator.http.internal.common.SSLTube;
import jdk.incubator.http.internal.common.SequentialScheduler;
import jdk.incubator.http.internal.common.Utils;
import org.testng.annotations.Test;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.TrustManagerFactory;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
import java.nio.ByteBuffer;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
import java.util.List;
import java.util.Queue;
-import java.util.Random;
-import java.util.StringTokenizer;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@@ -84,7 +55,7 @@
/* Emulates an echo server */
FlowTube server = crossOverEchoServer(sslExecutor);
- run(server, sslExecutor);
+ run(server, sslExecutor, allBytesReceived);
}
/**
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java Wed Dec 06 18:47:54 2017 +0300
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java Wed Dec 06 16:08:15 2017 +0000
@@ -41,6 +41,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
@@ -58,10 +59,12 @@
/* Start of wiring */
/* Emulates an echo server */
SSLLoopbackSubscriber server =
- new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor);
+ new SSLLoopbackSubscriber((new SimpleSSLContext()).get(),
+ sslExecutor,
+ allBytesReceived);
server.start();
- run(server, sslExecutor);
+ run(server, sslExecutor, allBytesReceived);
}
/**
@@ -74,8 +77,11 @@
private final Thread thread1, thread2, thread3;
private volatile Flow.Subscription clientSubscription;
private final SubmissionPublisher<List<ByteBuffer>> publisher;
+ private final CountDownLatch allBytesReceived;
- SSLLoopbackSubscriber(SSLContext ctx, ExecutorService exec) throws IOException {
+ SSLLoopbackSubscriber(SSLContext ctx,
+ ExecutorService exec,
+ CountDownLatch allBytesReceived) throws IOException {
SSLServerSocketFactory fac = ctx.getServerSocketFactory();
SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0);
SSLParameters params = serv.getSSLParameters();
@@ -87,6 +93,7 @@
clientSock = new Socket("127.0.0.1", serverPort);
serverSock = (SSLSocket) serv.accept();
this.buffer = new LinkedBlockingQueue<>();
+ this.allBytesReceived = allBytesReceived;
thread1 = new Thread(this::clientWriter, "clientWriter");
thread2 = new Thread(this::serverLoopback, "serverLoopback");
thread3 = new Thread(this::clientReader, "clientReader");
@@ -120,6 +127,9 @@
if (n == -1) {
System.out.println("clientReader close: read "
+ readCount.get() + " bytes");
+ System.out.println("clientReader: waiting signal to close publisher");
+ allBytesReceived.await();
+ System.out.println("clientReader: closing publisher");
publisher.close();
sleep(2000);
Utils.close(is, clientSock);