8231506: Fix some instabilities in a few networking tests
authordfuchs
Tue, 01 Oct 2019 12:10:33 +0100
changeset 58423 54de0c861d32
parent 58422 d7dbabd226ff
child 58424 94ca05133eb2
8231506: Fix some instabilities in a few networking tests Reviewed-by: alanb, chegar, msheppar
test/jdk/java/net/MulticastSocket/UnreferencedMulticastSockets.java
test/jdk/java/net/SocketImpl/SocketImplCombinations.java
test/jdk/java/net/httpclient/DigestEchoServer.java
--- a/test/jdk/java/net/MulticastSocket/UnreferencedMulticastSockets.java	Tue Oct 01 12:27:14 2019 +0200
+++ b/test/jdk/java/net/MulticastSocket/UnreferencedMulticastSockets.java	Tue Oct 01 12:10:33 2019 +0100
@@ -50,6 +50,7 @@
 import java.util.ArrayDeque;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
 
 import jdk.test.lib.net.IPSupport;
@@ -72,11 +73,14 @@
     static class Server implements Runnable {
 
         MulticastSocket ss;
-
+        final int port;
+        final Phaser phaser = new Phaser(2);
         Server() throws IOException {
+            InetAddress loopback = InetAddress.getLoopbackAddress();
             InetSocketAddress serverAddress =
-                new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+                new InetSocketAddress(loopback, 0);
             ss = new MulticastSocket(serverAddress);
+            port = ss.getLocalPort();
             System.out.printf("  DatagramServer addr: %s: %d%n",
                     this.getHost(), this.getPort());
             pendingSockets.add(new NamedWeak(ss, pendingQueue, "serverMulticastSocket"));
@@ -89,7 +93,7 @@
         }
 
         int getPort() {
-            return ss.getLocalPort();
+            return port;
         }
 
         // Receive a byte and send back a byte
@@ -98,12 +102,18 @@
                 byte[] buffer = new byte[50];
                 DatagramPacket p = new DatagramPacket(buffer, buffer.length);
                 ss.receive(p);
+                System.out.printf("Server: ping received from: %s%n", p.getSocketAddress());
+                phaser.arriveAndAwaitAdvance(); // await the client...
                 buffer[0] += 1;
+                System.out.printf("Server: sending echo to: %s%n", p.getSocketAddress());
                 ss.send(p);         // send back +1
 
+                System.out.printf("Server: awaiting client%n");
+                phaser.arriveAndAwaitAdvance(); // await the client...
                 // do NOT close but 'forget' the socket reference
+                System.out.printf("Server: forgetting socket...%n");
                 ss = null;
-            } catch (Exception ioe) {
+            } catch (Throwable ioe) {
                 ioe.printStackTrace();
             }
         }
@@ -112,8 +122,11 @@
     public static void main(String args[]) throws Exception {
         IPSupport.throwSkippedExceptionIfNonOperational();
 
+        InetSocketAddress clientAddress =
+                new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+
         // Create and close a MulticastSocket to warm up the FD count for side effects.
-        try (MulticastSocket s = new MulticastSocket(0)) {
+        try (MulticastSocket s = new MulticastSocket(clientAddress)) {
             // no-op; close immediately
             s.getLocalPort();   // no-op
         }
@@ -126,8 +139,33 @@
         Thread thr = new Thread(svr);
         thr.start();
 
-        MulticastSocket client = new MulticastSocket(0);
-        System.out.printf("  client bound port: %d%n", client.getLocalPort());
+        // It is possible under some circumstances that the client
+        // might get bound to the same port than the server: this
+        // would make the test fail - so if this happen we try to
+        // bind to a specific port by incrementing the server port.
+        MulticastSocket client = null;
+        int serverPort = svr.getPort();
+        int maxtries = 20;
+        for (int i = 0; i < maxtries; i++) {
+            try {
+                System.out.printf("Trying to bind client to: %s%n", clientAddress);
+                client = new MulticastSocket(clientAddress);
+                if (client.getLocalPort() != svr.getPort()) break;
+                client.close();
+            } catch (IOException x) {
+                System.out.printf("Couldn't create client after %d attempts: %s%n", i, x);
+                if (i == maxtries) throw x;
+            }
+            if (i == maxtries) {
+                String msg = String.format("Couldn't create client after %d attempts", i);
+                System.out.println(msg);
+                throw new AssertionError(msg);
+            }
+            clientAddress = new InetSocketAddress(clientAddress.getAddress(), serverPort + i);
+        }
+
+        System.out.printf("  client bound port: %s:%d%n",
+                client.getLocalAddress(), client.getLocalPort());
         client.connect(svr.getHost(), svr.getPort());
         pendingSockets.add(new NamedWeak(client, pendingQueue, "clientMulticastSocket"));
         extractRefs(client, "clientMulticastSocket");
@@ -136,14 +174,17 @@
         msg[0] = 1;
         DatagramPacket p = new DatagramPacket(msg, msg.length, svr.getHost(), svr.getPort());
         client.send(p);
+        System.out.printf("  ping sent to: %s:%d%n", svr.getHost(), svr.getPort());
+        svr.phaser.arriveAndAwaitAdvance(); // wait until the server has received its packet
 
         p = new DatagramPacket(msg, msg.length);
         client.receive(p);
 
-        System.out.printf("echo received from: %s%n", p.getSocketAddress());
+        System.out.printf("  echo received from: %s%n", p.getSocketAddress());
         if (msg[0] != 2) {
             throw new AssertionError("incorrect data received: expected: 2, actual: " + msg[0]);
         }
+        svr.phaser.arriveAndAwaitAdvance(); // let the server null out its socket
 
         // Do NOT close the MulticastSocket; forget it
 
--- a/test/jdk/java/net/SocketImpl/SocketImplCombinations.java	Tue Oct 01 12:27:14 2019 +0200
+++ b/test/jdk/java/net/SocketImpl/SocketImplCombinations.java	Tue Oct 01 12:10:33 2019 +0100
@@ -68,7 +68,7 @@
      * Test creating a connected Socket, it should be created with a platform SocketImpl.
      */
     public void testNewSocket2() throws IOException {
-        try (ServerSocket ss = new ServerSocket(0)) {
+        try (ServerSocket ss = boundServerSocket()) {
             try (Socket s = new Socket(ss.getInetAddress(), ss.getLocalPort())) {
                 SocketImpl si = getSocketImpl(s);
                 assertTrue(isSocksSocketImpl(si));
@@ -127,7 +127,7 @@
         Socket s = new Socket((SocketImpl) null) { };
         try (s) {
             assertTrue(getSocketImpl(s) == null);
-            s.bind(new InetSocketAddress(0));   // force SocketImpl to be created
+            s.bind(loopbackSocketAddress());   // force SocketImpl to be created
             SocketImpl si = getSocketImpl(s);
             assertTrue(isSocksSocketImpl(si));
             SocketImpl delegate = getDelegate(si);
@@ -218,7 +218,7 @@
             Socket s = new Socket((SocketImpl) null) { };
             try (s) {
                 assertTrue(getSocketImpl(s) == null);
-                s.bind(new InetSocketAddress(0));   // force SocketImpl to be created
+                s.bind(loopbackSocketAddress());   // force SocketImpl to be created
                 assertTrue(getSocketImpl(s) instanceof CustomSocketImpl);
             }
         } finally {
@@ -378,7 +378,7 @@
     public void testServerSocketAccept5a() throws IOException {
         SocketImpl serverImpl = new CustomSocketImpl(true);
         try (ServerSocket ss = new ServerSocket(serverImpl) { }) {
-            ss.bind(new InetSocketAddress(0));
+            ss.bind(loopbackSocketAddress());
             expectThrows(IOException.class, ss::accept);
         }
     }
@@ -566,16 +566,36 @@
     }
 
     /**
+     * Returns a new InetSocketAddress with the loopback interface
+     * and port 0.
+     */
+    static InetSocketAddress loopbackSocketAddress() {
+        InetAddress loopback = InetAddress.getLoopbackAddress();
+        return new InetSocketAddress(loopback, 0);
+    }
+
+    /**
+     * Returns a ServerSocket bound to a port on the loopback address
+     */
+    static ServerSocket boundServerSocket() throws IOException {
+        ServerSocket ss = new ServerSocket();
+        ss.bind(loopbackSocketAddress());
+        return ss;
+    }
+
+    /**
      * Creates a ServerSocket that returns the given Socket from accept.
      */
     static ServerSocket serverSocketToAccept(Socket s) throws IOException {
-        return new ServerSocket(0) {
+        ServerSocket ss = new ServerSocket() {
             @Override
             public Socket accept() throws IOException {
                 implAccept(s);
                 return s;
             }
         };
+        ss.bind(loopbackSocketAddress());
+        return ss;
     }
 
     /**
@@ -590,7 +610,7 @@
                 return s;
             }
         };
-        ss.bind(new InetSocketAddress(0));
+        ss.bind(loopbackSocketAddress());
         return ss;
     }
 
--- a/test/jdk/java/net/httpclient/DigestEchoServer.java	Tue Oct 01 12:27:14 2019 +0200
+++ b/test/jdk/java/net/httpclient/DigestEchoServer.java	Tue Oct 01 12:10:33 2019 +0100
@@ -26,6 +26,8 @@
 import com.sun.net.httpserver.HttpsConfigurator;
 import com.sun.net.httpserver.HttpsParameters;
 import com.sun.net.httpserver.HttpsServer;
+
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -1568,8 +1570,8 @@
                 @Override
                 public void run() {
                     try {
+                        int c = 0;
                         try {
-                            int c;
                             while ((c = is.read()) != -1) {
                                 os.write(c);
                                 os.flush();
@@ -1578,11 +1580,13 @@
                                 if (DEBUG) System.out.print(tag);
                             }
                             is.close();
+                        } catch (IOException ex) {
+                            if (DEBUG || !stopped && c >  -1)
+                                ex.printStackTrace(System.out);
+                            end.completeExceptionally(ex);
                         } finally {
-                            os.close();
+                            try {os.close();} catch (Throwable t) {}
                         }
-                    } catch (IOException ex) {
-                        if (DEBUG) ex.printStackTrace(System.out);
                     } finally {
                         end.complete(null);
                     }
@@ -1632,10 +1636,12 @@
         @Override
         public void run() {
             Socket clientConnection = null;
+            Socket targetConnection = null;
             try {
                 while (!stopped) {
                     System.out.println(now() + "Tunnel: Waiting for client");
                     Socket toClose;
+                    targetConnection = clientConnection = null;
                     try {
                         toClose = clientConnection = ss.accept();
                         if (NO_LINGER) {
@@ -1649,7 +1655,6 @@
                     }
                     System.out.println(now() + "Tunnel: Client accepted");
                     StringBuilder headers = new StringBuilder();
-                    Socket targetConnection = null;
                     InputStream  ccis = clientConnection.getInputStream();
                     OutputStream ccos = clientConnection.getOutputStream();
                     Writer w = new OutputStreamWriter(
@@ -1769,28 +1774,44 @@
                             end1 = new CompletableFuture<>());
                     Thread t2 = pipe(targetConnection.getInputStream(), ccos, '-',
                             end2 = new CompletableFuture<>());
-                    end = CompletableFuture.allOf(end1, end2);
+                    var end11 = end1.whenComplete((r, t) -> exceptionally(end2, t));
+                    var end22 = end2.whenComplete((r, t) ->  exceptionally(end1, t));
+                    end = CompletableFuture.allOf(end11, end22);
+                    Socket tc = targetConnection;
                     end.whenComplete(
                             (r,t) -> {
                                 try { toClose.close(); } catch (IOException x) { }
+                                try { tc.close(); } catch (IOException x) { }
                                 finally {connectionCFs.remove(end);}
                             });
                     connectionCFs.add(end);
+                    targetConnection = clientConnection = null;
                     t1.start();
                     t2.start();
                 }
             } catch (Throwable ex) {
-                try {
-                    ss.close();
-                } catch (IOException ex1) {
-                    ex.addSuppressed(ex1);
-                }
+                close(clientConnection, ex);
+                close(targetConnection, ex);
+                close(ss, ex);
                 ex.printStackTrace(System.err);
             } finally {
                 System.out.println(now() + "Tunnel: exiting (stopped=" + stopped + ")");
                 connectionCFs.forEach(cf -> cf.complete(null));
             }
         }
+
+        void exceptionally(CompletableFuture<?> cf, Throwable t) {
+            if (t != null) cf.completeExceptionally(t);
+        }
+
+        void close(Closeable c, Throwable e) {
+            if (c == null) return;
+            try {
+                c.close();
+            } catch (IOException x) {
+                e.addSuppressed(x);
+            }
+        }
     }
 
     /**