http-client-branch: workaround issue with SubmissionPublisher::close in several tests http-client-branch
authordfuchs
Wed, 06 Dec 2017 16:08:15 +0000
branchhttp-client-branch
changeset 55970 261d4d2f77e2
parent 55969 1da220f80a5d
child 55971 3ae165556d48
http-client-branch: workaround issue with SubmissionPublisher::close in several tests
test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/AbstractSSLTubeTest.java
test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/FlowTest.java
test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLEchoTubeTest.java
test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/AbstractSSLTubeTest.java	Wed Dec 06 18:47:54 2017 +0300
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/AbstractSSLTubeTest.java	Wed Dec 06 16:08:15 2017 +0000
@@ -23,11 +23,8 @@
 
 package jdk.incubator.http;
 
-import jdk.incubator.http.internal.common.Demand;
 import jdk.incubator.http.internal.common.FlowTube;
-import jdk.incubator.http.internal.common.SSLFlowDelegate;
 import jdk.incubator.http.internal.common.SSLTube;
-import jdk.incubator.http.internal.common.SequentialScheduler;
 import jdk.incubator.http.internal.common.Utils;
 import org.testng.annotations.Test;
 
@@ -35,17 +32,11 @@
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
-import javax.net.ssl.SSLSocket;
 import javax.net.ssl.TrustManagerFactory;
-import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.security.KeyManagementException;
 import java.security.KeyStore;
@@ -54,24 +45,15 @@
 import java.security.UnrecoverableKeyException;
 import java.security.cert.CertificateException;
 import java.util.List;
-import java.util.Queue;
 import java.util.Random;
 import java.util.StringTokenizer;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Flow;
 import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SubmissionPublisher;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
 
 public class AbstractSSLTubeTest extends AbstractRandomTest {
 
@@ -79,6 +61,15 @@
     public static final int LONGS_PER_BUF = 800;
     public static final long TOTAL_LONGS = COUNTER * LONGS_PER_BUF;
     public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0);
+    // This is a hack to work around an issue with SubmissionPublisher.
+    // SubmissionPublisher will call onComplete immediately without forwarding
+    // remaining pending data if SubmissionPublisher.close() is called when
+    // there is no demand. In other words, it doesn't wait for the subscriber
+    // to pull all the data before calling onComplete.
+    // We use a CountDownLatch to figure out when it is safe to call close().
+    // This may cause the test to hang if data are buffered.
+    protected final CountDownLatch allBytesReceived = new CountDownLatch(1);
+
 
     protected static ByteBuffer getBuffer(long startingAt) {
         ByteBuffer buf = ByteBuffer.allocate(LONGS_PER_BUF * 8);
@@ -89,7 +80,9 @@
         return buf;
     }
 
-    protected void run(FlowTube server, ExecutorService sslExecutor) throws IOException {
+    protected void run(FlowTube server,
+                       ExecutorService sslExecutor,
+                       CountDownLatch allBytesReceived) throws IOException {
         FlowTube client = new SSLTube(createSSLEngine(true),
                                       sslExecutor,
                                       server);
@@ -98,7 +91,7 @@
                                           Integer.MAX_VALUE);
         FlowTube.TubePublisher begin = p::subscribe;
         CompletableFuture<Void> completion = new CompletableFuture<>();
-        EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion);
+        EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion, allBytesReceived);
         client.connectFlows(begin, end);
         /* End of wiring */
 
@@ -111,7 +104,14 @@
             p.submit(List.of(b));
         }
         System.out.println("Finished submission. Waiting for loopback");
+        completion.whenComplete((r,t) -> allBytesReceived.countDown());
+        try {
+            allBytesReceived.await();
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
         p.close();
+        System.out.println("All bytes received: calling publisher.close()");
         try {
             completion.join();
             System.out.println("OK");
@@ -140,12 +140,15 @@
         private final long nbytes;
         private final AtomicLong counter = new AtomicLong();
         private final CompletableFuture<?> completion;
+        private final CountDownLatch allBytesReceived;
         private volatile Flow.Subscription subscription;
         private long unfulfilled;
 
-        EndSubscriber(long nbytes, CompletableFuture<?> completion) {
+        EndSubscriber(long nbytes, CompletableFuture<?> completion,
+                      CountDownLatch allBytesReceived) {
             this.nbytes = nbytes;
             this.completion = completion;
+            this.allBytesReceived = allBytesReceived;
         }
 
         @Override
@@ -184,7 +187,7 @@
             for (ByteBuffer buf : buffers) {
                 while (buf.hasRemaining()) {
                     long n = buf.getLong();
-                    if (currval > (AbstractSSLTubeTest.TOTAL_LONGS - 50)) {
+                    if (currval > (TOTAL_LONGS - 50)) {
                         System.out.println("End: " + currval);
                     }
                     if (n != currval++) {
@@ -197,12 +200,16 @@
             }
 
             counter.set(currval);
+            if (currval >= TOTAL_LONGS) {
+                allBytesReceived.countDown();
+            }
         }
 
         @Override
         public void onError(Throwable throwable) {
             System.out.println("EndSubscriber onError " + throwable);
             completion.completeExceptionally(throwable);
+            allBytesReceived.countDown();
         }
 
         @Override
@@ -215,7 +222,9 @@
                 System.out.println("DONE OK");
                 completion.complete(null);
             }
+            allBytesReceived.countDown();
         }
+        
         @Override
         public String toString() {
             return "EndSubscriber";
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/FlowTest.java	Wed Dec 06 18:47:54 2017 +0300
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/FlowTest.java	Wed Dec 06 16:08:15 2017 +0000
@@ -42,6 +42,7 @@
 import java.util.StringTokenizer;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Flow;
@@ -68,6 +69,15 @@
     public static final ByteBuffer SENTINEL = ByteBuffer.allocate(0);
     static volatile String alpn;
 
+    // This is a hack to work around an issue with SubmissionPublisher.
+    // SubmissionPublisher will call onComplete immediately without forwarding
+    // remaining pending data if SubmissionPublisher.close() is called when
+    // there is no demand. In other words, it doesn't wait for the subscriber
+    // to pull all the data before calling onComplete.
+    // We use a CountDownLatch to figure out when it is safe to call close().
+    // This may cause the test to hang if data are buffered.
+    final CountDownLatch allBytesReceived = new CountDownLatch(1);
+
     private final CompletableFuture<Void> completion;
 
     public FlowTest() throws IOException {
@@ -82,9 +92,9 @@
         engineClient.setSSLParameters(params);
         engineClient.setUseClientMode(true);
         completion = new CompletableFuture<>();
-        SSLLoopbackSubscriber looper = new SSLLoopbackSubscriber(ctx, executor);
+        SSLLoopbackSubscriber looper = new SSLLoopbackSubscriber(ctx, executor, allBytesReceived);
         looper.start();
-        EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion);
+        EndSubscriber end = new EndSubscriber(TOTAL_LONGS, completion, allBytesReceived);
         SSLFlowDelegate sslClient = new SSLFlowDelegate(engineClient, executor, end, looper);
         // going to measure how long handshake takes
         final long start = System.currentTimeMillis();
@@ -131,6 +141,14 @@
             srcPublisher.submit(List.of(b));
         }
         System.out.println("Finished submission. Waiting for loopback");
+        // make sure we don't wait for allBytesReceived in case of error.
+        completion.whenComplete((r,t) -> allBytesReceived.countDown());
+        try {
+            allBytesReceived.await();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        System.out.println("All bytes received: ");
         srcPublisher.close();
         try {
             completion.join();
@@ -172,8 +190,11 @@
         private final Thread thread1, thread2, thread3;
         private volatile Flow.Subscription clientSubscription;
         private final SubmissionPublisher<List<ByteBuffer>> publisher;
+        private final CountDownLatch allBytesReceived;
 
-        SSLLoopbackSubscriber(SSLContext ctx, ExecutorService exec) throws IOException {
+        SSLLoopbackSubscriber(SSLContext ctx,
+                              ExecutorService exec,
+                              CountDownLatch allBytesReceived) throws IOException {
             SSLServerSocketFactory fac = ctx.getServerSocketFactory();
             SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0);
             SSLParameters params = serv.getSSLParameters();
@@ -185,6 +206,7 @@
             clientSock = new Socket("127.0.0.1", serverPort);
             serverSock = (SSLSocket) serv.accept();
             this.buffer = new LinkedBlockingQueue<>();
+            this.allBytesReceived = allBytesReceived;
             thread1 = new Thread(this::clientWriter, "clientWriter");
             thread2 = new Thread(this::serverLoopback, "serverLoopback");
             thread3 = new Thread(this::clientReader, "clientReader");
@@ -218,6 +240,10 @@
                     if (n == -1) {
                         System.out.println("clientReader close: read "
                                 + readCount.get() + " bytes");
+                        System.out.println("clientReader: got EOF. "
+                                            + "Waiting signal to close publisher.");
+                        allBytesReceived.await();
+                        System.out.println("clientReader: closing publisher");
                         publisher.close();
                         sleep(2000);
                         Utils.close(is, clientSock);
@@ -361,11 +387,15 @@
         private final AtomicLong counter;
         private volatile Flow.Subscription subscription;
         private final CompletableFuture<Void> completion;
+        private final CountDownLatch allBytesReceived;
 
-        EndSubscriber(long nbytes, CompletableFuture<Void> completion) {
+        EndSubscriber(long nbytes,
+                      CompletableFuture<Void> completion,
+                      CountDownLatch allBytesReceived) {
             counter = new AtomicLong(0);
             this.nbytes = nbytes;
             this.completion = completion;
+            this.allBytesReceived = allBytesReceived;
         }
 
         @Override
@@ -408,10 +438,14 @@
 
             counter.set(currval);
             subscription.request(1);
+            if (currval >= TOTAL_LONGS) {
+                allBytesReceived.countDown();
+            }
         }
 
         @Override
         public void onError(Throwable throwable) {
+            allBytesReceived.countDown();
             completion.completeExceptionally(throwable);
         }
 
@@ -423,6 +457,7 @@
                 completion.completeExceptionally(new RuntimeException("ERROR AT END"));
             } else {
                 System.out.println("DONE OK: counter = " + n);
+                allBytesReceived.countDown();
                 completion.complete(null);
             }
         }
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLEchoTubeTest.java	Wed Dec 06 18:47:54 2017 +0300
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLEchoTubeTest.java	Wed Dec 06 16:08:15 2017 +0000
@@ -25,51 +25,22 @@
 
 import jdk.incubator.http.internal.common.Demand;
 import jdk.incubator.http.internal.common.FlowTube;
-import jdk.incubator.http.internal.common.SSLFlowDelegate;
 import jdk.incubator.http.internal.common.SSLTube;
 import jdk.incubator.http.internal.common.SequentialScheduler;
 import jdk.incubator.http.internal.common.Utils;
 import org.testng.annotations.Test;
 
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.TrustManagerFactory;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
 import java.util.List;
 import java.util.Queue;
-import java.util.Random;
-import java.util.StringTokenizer;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Flow;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SubmissionPublisher;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
@@ -84,7 +55,7 @@
         /* Emulates an echo server */
         FlowTube server = crossOverEchoServer(sslExecutor);
 
-        run(server, sslExecutor);
+        run(server, sslExecutor, allBytesReceived);
     }
 
     /**
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java	Wed Dec 06 18:47:54 2017 +0300
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java	Wed Dec 06 16:08:15 2017 +0000
@@ -41,6 +41,7 @@
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Flow;
@@ -58,10 +59,12 @@
         /* Start of wiring */
         /* Emulates an echo server */
         SSLLoopbackSubscriber server =
-                new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor);
+                new SSLLoopbackSubscriber((new SimpleSSLContext()).get(),
+                        sslExecutor,
+                        allBytesReceived);
         server.start();
 
-        run(server, sslExecutor);
+        run(server, sslExecutor, allBytesReceived);
     }
 
     /**
@@ -74,8 +77,11 @@
         private final Thread thread1, thread2, thread3;
         private volatile Flow.Subscription clientSubscription;
         private final SubmissionPublisher<List<ByteBuffer>> publisher;
+        private final CountDownLatch allBytesReceived;
 
-        SSLLoopbackSubscriber(SSLContext ctx, ExecutorService exec) throws IOException {
+        SSLLoopbackSubscriber(SSLContext ctx,
+                              ExecutorService exec,
+                              CountDownLatch allBytesReceived) throws IOException {
             SSLServerSocketFactory fac = ctx.getServerSocketFactory();
             SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0);
             SSLParameters params = serv.getSSLParameters();
@@ -87,6 +93,7 @@
             clientSock = new Socket("127.0.0.1", serverPort);
             serverSock = (SSLSocket) serv.accept();
             this.buffer = new LinkedBlockingQueue<>();
+            this.allBytesReceived = allBytesReceived;
             thread1 = new Thread(this::clientWriter, "clientWriter");
             thread2 = new Thread(this::serverLoopback, "serverLoopback");
             thread3 = new Thread(this::clientReader, "clientReader");
@@ -120,6 +127,9 @@
                     if (n == -1) {
                         System.out.println("clientReader close: read "
                                 + readCount.get() + " bytes");
+                        System.out.println("clientReader: waiting signal to close publisher");
+                        allBytesReceived.await();
+                        System.out.println("clientReader: closing publisher");
                         publisher.close();
                         sleep(2000);
                         Utils.close(is, clientSock);