8202252: (aio) Closed AsynchronousSocketChannel keeps completion handler alive
authorbpb
Tue, 03 Jul 2018 15:02:46 -0700
changeset 50933 76b5ee99ffc0
parent 50932 6d03b1ea636b
child 50934 ab998c2bd38f
8202252: (aio) Closed AsynchronousSocketChannel keeps completion handler alive Summary: Clear handler instance variable after use Reviewed-by: rriggs, alanb
src/java.base/unix/classes/sun/nio/ch/UnixAsynchronousSocketChannelImpl.java
test/jdk/java/nio/channels/AsynchronousSocketChannel/CompletionHandlerRelease.java
--- a/src/java.base/unix/classes/sun/nio/ch/UnixAsynchronousSocketChannelImpl.java	Tue Jul 03 14:42:13 2018 -0700
+++ b/src/java.base/unix/classes/sun/nio/ch/UnixAsynchronousSocketChannelImpl.java	Tue Jul 03 15:02:46 2018 -0700
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2008, 2018, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -273,6 +273,7 @@
 
         // invoke handler and set result
         CompletionHandler<Void,Object> handler = connectHandler;
+        connectHandler = null;
         Object att = connectAttachment;
         PendingFuture<Void,Object> future = connectFuture;
         if (handler == null) {
@@ -405,6 +406,7 @@
             this.readBuffer = null;
             this.readBuffers = null;
             this.readAttachment = null;
+            this.readHandler = null;
 
             // allow another read to be initiated
             enableReading();
@@ -600,6 +602,7 @@
             this.writeBuffer = null;
             this.writeBuffers = null;
             this.writeAttachment = null;
+            this.writeHandler = null;
 
             // allow another write to be initiated
             enableWriting();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/nio/channels/AsynchronousSocketChannel/CompletionHandlerRelease.java	Tue Jul 03 15:02:46 2018 -0700
@@ -0,0 +1,211 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/* @test
+ * @bug 8202252
+ * @run testng CompletionHandlerRelease
+ * @summary Verify that reference to CompletionHandler is cleared after use
+ */
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import static java.net.StandardSocketOptions.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousChannelGroup;
+import java.nio.channels.AsynchronousServerSocketChannel;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+import static org.testng.Assert.*;
+
+public class CompletionHandlerRelease {
+    @Test
+    public void testConnect() throws Exception {
+        try (Server server = new Server()) {
+            try (AsynchronousSocketChannel ch =
+                 AsynchronousSocketChannel.open(GROUP)) {
+                CountDownLatch latch = new CountDownLatch(1);
+                Handler<Void,Object> handler =
+                    new Handler<Void,Object>("connect", latch);
+                ReferenceQueue queue = new ReferenceQueue<WeakReference>();
+                WeakReference<Object> ref =
+                    new WeakReference<Object>(handler, queue);
+
+                ch.connect(server.address(), null, handler);
+
+                try { latch.await(); } catch (InterruptedException ignore) { }
+
+                handler = null;
+                waitForRefToClear(ref, queue);
+
+                server.accept().get().close();
+            }
+        }
+    }
+
+    @Test
+    public void testWrite() throws Exception {
+        try (Server server = new Server();
+             AsynchronousSocketChannel ch =
+                 AsynchronousSocketChannel.open(GROUP)) {
+            ch.connect(server.address()).get();
+
+            try (AsynchronousSocketChannel sc = server.accept().get()) {
+                ByteBuffer src = ByteBuffer.wrap("hello".getBytes("UTF-8"));
+                sc.setOption(SO_SNDBUF, src.remaining());
+
+                CountDownLatch latch = new CountDownLatch(1);
+                Handler<Integer,Object> handler =
+                    new Handler<Integer,Object>("write", latch);
+                ReferenceQueue queue = new ReferenceQueue<WeakReference>();
+                WeakReference<Object> ref =
+                    new WeakReference<Object>(handler, queue);
+
+                sc.write(src, null, handler);
+
+                try { latch.await(); } catch (InterruptedException ignore) { }
+
+                handler = null;
+                waitForRefToClear(ref, queue);
+            }
+        }
+    }
+
+    @Test
+    public void testRead() throws Exception {
+        try (Server server = new Server();
+             AsynchronousSocketChannel ch =
+                 AsynchronousSocketChannel.open(GROUP)) {
+            ch.connect(server.address()).get();
+
+            try (AsynchronousSocketChannel sc = server.accept().get()) {
+                ByteBuffer src = ByteBuffer.wrap("hello".getBytes("UTF-8"));
+                sc.setOption(SO_SNDBUF, src.remaining());
+                sc.write(src).get();
+
+                CountDownLatch latch = new CountDownLatch(1);
+                Handler<Integer,Object> handler =
+                    new Handler<Integer,Object>("read", latch);
+                ReferenceQueue queue = new ReferenceQueue<WeakReference>();
+                WeakReference<Object> ref =
+                    new WeakReference<Object>(handler, queue);
+
+                ByteBuffer dst = ByteBuffer.allocate(64);
+                ch.read(dst, null, handler);
+
+                try { latch.await(); } catch (InterruptedException ignore) { }
+
+                handler = null;
+                waitForRefToClear(ref, queue);
+            }
+        }
+    }
+
+    private AsynchronousChannelGroup GROUP;
+
+    @BeforeTest
+    void setup() throws IOException {
+        GROUP = AsynchronousChannelGroup.withFixedThreadPool(2,
+            Executors.defaultThreadFactory());
+    }
+
+    @AfterTest
+    void cleanup() throws IOException {
+        GROUP.shutdownNow();
+    }
+
+    class Server implements Closeable {
+        private final AsynchronousServerSocketChannel ssc;
+        private final InetSocketAddress address;
+
+        Server() throws IOException {
+            this(0);
+        }
+
+        Server(int recvBufSize) throws IOException {
+            ssc = AsynchronousServerSocketChannel.open(GROUP);
+            if (recvBufSize > 0) {
+                ssc.setOption(SO_RCVBUF, recvBufSize);
+            }
+            ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(),
+                0));
+            address = (InetSocketAddress)ssc.getLocalAddress();
+        }
+
+        InetSocketAddress address() {
+            return address;
+        }
+
+        Future<AsynchronousSocketChannel> accept() throws IOException {
+            return ssc.accept();
+        }
+
+        public void close() throws IOException {
+            ssc.close();
+        }
+    }
+
+    static class Handler<V,A> implements CompletionHandler<V,A> {
+        private final String name;
+        private final CountDownLatch latch;
+
+        Handler(String name, CountDownLatch latch) {
+            this.name = name;
+            this.latch = latch;
+        }
+
+        public void completed(V result, A attachment) {
+            System.out.format("%s completed(%s, %s)%n",
+                name, result, attachment);
+            latch.countDown();
+        }
+
+        public void failed(Throwable exc, A attachment) {
+            System.out.format("%s failed(%s, %s)%n",
+                name, exc, attachment);
+            exc.printStackTrace();
+            latch.countDown();
+        }
+    }
+
+    private void waitForRefToClear(Reference ref, ReferenceQueue queue)
+        throws InterruptedException {
+        Reference r;
+        while ((r = queue.remove(20)) == null) {
+            System.gc();
+        }
+        assertEquals(r, ref);
+        assertNull(r.get());
+    }
+}