test/jdk/java/nio/channels/AsynchronousSocketChannel/CompletionHandlerRelease.java
changeset 50933 76b5ee99ffc0
equal deleted inserted replaced
50932:6d03b1ea636b 50933:76b5ee99ffc0
       
     1 /*
       
     2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 /* @test
       
    25  * @bug 8202252
       
    26  * @run testng CompletionHandlerRelease
       
    27  * @summary Verify that reference to CompletionHandler is cleared after use
       
    28  */
       
    29 
       
    30 import java.io.Closeable;
       
    31 import java.io.IOException;
       
    32 import java.lang.ref.Reference;
       
    33 import java.lang.ref.ReferenceQueue;
       
    34 import java.lang.ref.WeakReference;
       
    35 import java.net.InetAddress;
       
    36 import java.net.InetSocketAddress;
       
    37 import static java.net.StandardSocketOptions.*;
       
    38 import java.nio.ByteBuffer;
       
    39 import java.nio.channels.AsynchronousChannelGroup;
       
    40 import java.nio.channels.AsynchronousServerSocketChannel;
       
    41 import java.nio.channels.AsynchronousSocketChannel;
       
    42 import java.nio.channels.CompletionHandler;
       
    43 import java.util.concurrent.CountDownLatch;
       
    44 import java.util.concurrent.Executors;
       
    45 import java.util.concurrent.Future;
       
    46 
       
    47 import org.testng.annotations.AfterTest;
       
    48 import org.testng.annotations.BeforeTest;
       
    49 import org.testng.annotations.Test;
       
    50 import static org.testng.Assert.*;
       
    51 
       
    52 public class CompletionHandlerRelease {
       
    53     @Test
       
    54     public void testConnect() throws Exception {
       
    55         try (Server server = new Server()) {
       
    56             try (AsynchronousSocketChannel ch =
       
    57                  AsynchronousSocketChannel.open(GROUP)) {
       
    58                 CountDownLatch latch = new CountDownLatch(1);
       
    59                 Handler<Void,Object> handler =
       
    60                     new Handler<Void,Object>("connect", latch);
       
    61                 ReferenceQueue queue = new ReferenceQueue<WeakReference>();
       
    62                 WeakReference<Object> ref =
       
    63                     new WeakReference<Object>(handler, queue);
       
    64 
       
    65                 ch.connect(server.address(), null, handler);
       
    66 
       
    67                 try { latch.await(); } catch (InterruptedException ignore) { }
       
    68 
       
    69                 handler = null;
       
    70                 waitForRefToClear(ref, queue);
       
    71 
       
    72                 server.accept().get().close();
       
    73             }
       
    74         }
       
    75     }
       
    76 
       
    77     @Test
       
    78     public void testWrite() throws Exception {
       
    79         try (Server server = new Server();
       
    80              AsynchronousSocketChannel ch =
       
    81                  AsynchronousSocketChannel.open(GROUP)) {
       
    82             ch.connect(server.address()).get();
       
    83 
       
    84             try (AsynchronousSocketChannel sc = server.accept().get()) {
       
    85                 ByteBuffer src = ByteBuffer.wrap("hello".getBytes("UTF-8"));
       
    86                 sc.setOption(SO_SNDBUF, src.remaining());
       
    87 
       
    88                 CountDownLatch latch = new CountDownLatch(1);
       
    89                 Handler<Integer,Object> handler =
       
    90                     new Handler<Integer,Object>("write", latch);
       
    91                 ReferenceQueue queue = new ReferenceQueue<WeakReference>();
       
    92                 WeakReference<Object> ref =
       
    93                     new WeakReference<Object>(handler, queue);
       
    94 
       
    95                 sc.write(src, null, handler);
       
    96 
       
    97                 try { latch.await(); } catch (InterruptedException ignore) { }
       
    98 
       
    99                 handler = null;
       
   100                 waitForRefToClear(ref, queue);
       
   101             }
       
   102         }
       
   103     }
       
   104 
       
   105     @Test
       
   106     public void testRead() throws Exception {
       
   107         try (Server server = new Server();
       
   108              AsynchronousSocketChannel ch =
       
   109                  AsynchronousSocketChannel.open(GROUP)) {
       
   110             ch.connect(server.address()).get();
       
   111 
       
   112             try (AsynchronousSocketChannel sc = server.accept().get()) {
       
   113                 ByteBuffer src = ByteBuffer.wrap("hello".getBytes("UTF-8"));
       
   114                 sc.setOption(SO_SNDBUF, src.remaining());
       
   115                 sc.write(src).get();
       
   116 
       
   117                 CountDownLatch latch = new CountDownLatch(1);
       
   118                 Handler<Integer,Object> handler =
       
   119                     new Handler<Integer,Object>("read", latch);
       
   120                 ReferenceQueue queue = new ReferenceQueue<WeakReference>();
       
   121                 WeakReference<Object> ref =
       
   122                     new WeakReference<Object>(handler, queue);
       
   123 
       
   124                 ByteBuffer dst = ByteBuffer.allocate(64);
       
   125                 ch.read(dst, null, handler);
       
   126 
       
   127                 try { latch.await(); } catch (InterruptedException ignore) { }
       
   128 
       
   129                 handler = null;
       
   130                 waitForRefToClear(ref, queue);
       
   131             }
       
   132         }
       
   133     }
       
   134 
       
   135     private AsynchronousChannelGroup GROUP;
       
   136 
       
   137     @BeforeTest
       
   138     void setup() throws IOException {
       
   139         GROUP = AsynchronousChannelGroup.withFixedThreadPool(2,
       
   140             Executors.defaultThreadFactory());
       
   141     }
       
   142 
       
   143     @AfterTest
       
   144     void cleanup() throws IOException {
       
   145         GROUP.shutdownNow();
       
   146     }
       
   147 
       
   148     class Server implements Closeable {
       
   149         private final AsynchronousServerSocketChannel ssc;
       
   150         private final InetSocketAddress address;
       
   151 
       
   152         Server() throws IOException {
       
   153             this(0);
       
   154         }
       
   155 
       
   156         Server(int recvBufSize) throws IOException {
       
   157             ssc = AsynchronousServerSocketChannel.open(GROUP);
       
   158             if (recvBufSize > 0) {
       
   159                 ssc.setOption(SO_RCVBUF, recvBufSize);
       
   160             }
       
   161             ssc.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(),
       
   162                 0));
       
   163             address = (InetSocketAddress)ssc.getLocalAddress();
       
   164         }
       
   165 
       
   166         InetSocketAddress address() {
       
   167             return address;
       
   168         }
       
   169 
       
   170         Future<AsynchronousSocketChannel> accept() throws IOException {
       
   171             return ssc.accept();
       
   172         }
       
   173 
       
   174         public void close() throws IOException {
       
   175             ssc.close();
       
   176         }
       
   177     }
       
   178 
       
   179     static class Handler<V,A> implements CompletionHandler<V,A> {
       
   180         private final String name;
       
   181         private final CountDownLatch latch;
       
   182 
       
   183         Handler(String name, CountDownLatch latch) {
       
   184             this.name = name;
       
   185             this.latch = latch;
       
   186         }
       
   187 
       
   188         public void completed(V result, A attachment) {
       
   189             System.out.format("%s completed(%s, %s)%n",
       
   190                 name, result, attachment);
       
   191             latch.countDown();
       
   192         }
       
   193 
       
   194         public void failed(Throwable exc, A attachment) {
       
   195             System.out.format("%s failed(%s, %s)%n",
       
   196                 name, exc, attachment);
       
   197             exc.printStackTrace();
       
   198             latch.countDown();
       
   199         }
       
   200     }
       
   201 
       
   202     private void waitForRefToClear(Reference ref, ReferenceQueue queue)
       
   203         throws InterruptedException {
       
   204         Reference r;
       
   205         while ((r = queue.remove(20)) == null) {
       
   206             System.gc();
       
   207         }
       
   208         assertEquals(r, ref);
       
   209         assertNull(r.get());
       
   210     }
       
   211 }