test/jdk/java/nio/channels/Selector/SelectWithConsumer.java
author phh
Sat, 30 Nov 2019 14:33:05 -0800
changeset 59330 5b96c12f909d
parent 58987 3760767a7231
permissions -rw-r--r--
8234541: C1 emits an empty message when it inlines successfully Summary: Use "inline" as the message when successfull Reviewed-by: thartmann, mdoerr Contributed-by: navy.xliu@gmail.com

/*
 * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

/* @test
 * @bug 8199433 8208780
 * @run testng SelectWithConsumer
 * @summary Unit test for Selector select(Consumer), select(Consumer,long) and
 *          selectNow(Consumer)
 */

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.Pipe;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.concurrent.TimeUnit.*;

import org.testng.annotations.AfterTest;
import org.testng.annotations.Test;
import static org.testng.Assert.*;

@Test
public class SelectWithConsumer {

    /**
     * Invoke the select methods that take an action and check that the
     * accumulated ready ops notified to the action matches the expected ops.
     */
    void testActionInvoked(SelectionKey key, int expectedOps) throws Exception {
        var callerThread = Thread.currentThread();
        var sel = key.selector();
        var interestOps = key.interestOps();
        var notifiedOps = new AtomicInteger();

        if (expectedOps == 0) {
            // ensure select(Consumer) does not block indefinitely
            sel.wakeup();
        } else {
            // ensure that the channel is ready for all expected operations
            sel.select();
            while ((key.readyOps() & interestOps) != expectedOps) {
                Thread.sleep(100);
                sel.select();
            }
        }

        // select(Consumer)
        notifiedOps.set(0);
        int n = sel.select(k -> {
            assertTrue(Thread.currentThread() == callerThread);
            assertTrue(k == key);
            int readyOps = key.readyOps();
            assertTrue((readyOps & interestOps) != 0);
            assertTrue((readyOps & notifiedOps.get()) == 0);
            notifiedOps.set(notifiedOps.get() | readyOps);
        });
        assertTrue((n == 1) ^ (expectedOps == 0));
        assertTrue(notifiedOps.get() == expectedOps);

        // select(Consumer, timeout)
        notifiedOps.set(0);
        n = sel.select(k -> {
            assertTrue(Thread.currentThread() == callerThread);
            assertTrue(k == key);
            int readyOps = key.readyOps();
            assertTrue((readyOps & interestOps) != 0);
            assertTrue((readyOps & notifiedOps.get()) == 0);
            notifiedOps.set(notifiedOps.get() | readyOps);
        }, 1000);
        assertTrue((n == 1) ^ (expectedOps == 0));
        assertTrue(notifiedOps.get() == expectedOps);

        // selectNow(Consumer)
        notifiedOps.set(0);
        n = sel.selectNow(k -> {
            assertTrue(Thread.currentThread() == callerThread);
            assertTrue(k == key);
            int readyOps = key.readyOps();
            assertTrue((readyOps & interestOps) != 0);
            assertTrue((readyOps & notifiedOps.get()) == 0);
            notifiedOps.set(notifiedOps.get() | readyOps);
        });
        assertTrue((n == 1) ^ (expectedOps == 0));
        assertTrue(notifiedOps.get() == expectedOps);
    }

    /**
     * Test that an action is performed when a channel is ready for reading.
     */
    public void testReadable() throws Exception {
        Pipe p = Pipe.open();
        try (Selector sel = Selector.open()) {
            Pipe.SinkChannel sink = p.sink();
            Pipe.SourceChannel source = p.source();
            source.configureBlocking(false);
            SelectionKey key = source.register(sel, SelectionKey.OP_READ);

            // write to sink to ensure source is readable
            scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);

            // test that action is invoked
            testActionInvoked(key, SelectionKey.OP_READ);
        } finally {
            closePipe(p);
        }
    }

    /**
     * Test that an action is performed when a channel is ready for writing.
     */
    public void testWritable() throws Exception {
        Pipe p = Pipe.open();
        try (Selector sel = Selector.open()) {
            Pipe.SourceChannel source = p.source();
            Pipe.SinkChannel sink = p.sink();
            sink.configureBlocking(false);
            SelectionKey key = sink.register(sel, SelectionKey.OP_WRITE);

            // test that action is invoked
            testActionInvoked(key, SelectionKey.OP_WRITE);
        } finally {
            closePipe(p);
        }
    }

    /**
     * Test that an action is performed when a channel is ready for both
     * reading and writing.
     */
    public void testReadableAndWriteable() throws Exception {
        ServerSocketChannel ssc = null;
        SocketChannel sc = null;
        SocketChannel peer = null;
        try (Selector sel = Selector.open()) {
            ssc = ServerSocketChannel.open().bind(new InetSocketAddress(0));
            sc = SocketChannel.open(ssc.getLocalAddress());
            sc.configureBlocking(false);
            SelectionKey key = sc.register(sel, (SelectionKey.OP_READ |
                                                 SelectionKey.OP_WRITE));

            // accept connection and write data so the source is readable
            peer = ssc.accept();
            peer.write(messageBuffer());

            // test that action is invoked
            testActionInvoked(key, (SelectionKey.OP_READ | SelectionKey.OP_WRITE));
        } finally {
            if (ssc != null) ssc.close();
            if (sc != null) sc.close();
            if (peer != null) peer.close();
        }
    }

    /**
     * Test that the action is called for two selected channels
     */
    public void testTwoChannels() throws Exception {
        Pipe p = Pipe.open();
        try (Selector sel = Selector.open()) {
            Pipe.SourceChannel source = p.source();
            Pipe.SinkChannel sink = p.sink();
            source.configureBlocking(false);
            sink.configureBlocking(false);
            SelectionKey key1 = source.register(sel, SelectionKey.OP_READ);
            SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE);

            // write to sink to ensure that the source is readable
            sink.write(messageBuffer());

            // wait for key1 to be readable
            sel.select();
            assertTrue(key2.isWritable());
            while (!key1.isReadable()) {
                Thread.sleep(20);
                sel.select();
            }

            var counter = new AtomicInteger();

            // select(Consumer)
            counter.set(0);
            int n = sel.select(k -> {
                assertTrue(k == key1 || k == key2);
                counter.incrementAndGet();
            });
            assertTrue(n == 2);
            assertTrue(counter.get() == 2);

            // select(Consumer, timeout)
            counter.set(0);
            n = sel.select(k -> {
                assertTrue(k == key1 || k == key2);
                counter.incrementAndGet();
            }, 1000);
            assertTrue(n == 2);
            assertTrue(counter.get() == 2);

            // selectNow(Consumer)
            counter.set(0);
            n = sel.selectNow(k -> {
                assertTrue(k == key1 || k == key2);
                counter.incrementAndGet();
            });
            assertTrue(n == 2);
            assertTrue(counter.get() == 2);
        } finally {
            closePipe(p);
        }
    }

    /**
     * Test calling select twice, the action should be invoked each time
     */
    public void testRepeatedSelect1() throws Exception {
        Pipe p = Pipe.open();
        try (Selector sel = Selector.open()) {
            Pipe.SourceChannel source = p.source();
            Pipe.SinkChannel sink = p.sink();
            source.configureBlocking(false);
            SelectionKey key = source.register(sel, SelectionKey.OP_READ);

            // write to sink to ensure that the source is readable
            sink.write(messageBuffer());

            // test that action is invoked
            testActionInvoked(key, SelectionKey.OP_READ);
            testActionInvoked(key, SelectionKey.OP_READ);

        } finally {
            closePipe(p);
        }
    }

    /**
     * Test calling select twice. An I/O operation is performed after the
     * first select so the channel will not be selected by the second select.
     */
    public void testRepeatedSelect2() throws Exception {
        Pipe p = Pipe.open();
        try (Selector sel = Selector.open()) {
            Pipe.SourceChannel source = p.source();
            Pipe.SinkChannel sink = p.sink();
            source.configureBlocking(false);
            SelectionKey key = source.register(sel, SelectionKey.OP_READ);

            // write to sink to ensure that the source is readable
            sink.write(messageBuffer());

            // test that action is invoked
            testActionInvoked(key, SelectionKey.OP_READ);

            // read all bytes
            int n;
            ByteBuffer bb = ByteBuffer.allocate(100);
            do {
                n = source.read(bb);
                bb.clear();
            } while (n > 0);

            // test that action is not invoked
            testActionInvoked(key, 0);
        } finally {
            closePipe(p);
        }
    }

    /**
     * Test timeout
     */
    public void testTimeout() throws Exception {
        Pipe p = Pipe.open();
        try (Selector sel = Selector.open()) {
            Pipe.SourceChannel source = p.source();
            Pipe.SinkChannel sink = p.sink();
            source.configureBlocking(false);
            source.register(sel, SelectionKey.OP_READ);
            long start = System.currentTimeMillis();
            int n = sel.select(k -> assertTrue(false), 1000L);
            long duration = System.currentTimeMillis() - start;
            assertTrue(n == 0);
            assertTrue(duration > 500, "select took " + duration + " ms");
        } finally {
            closePipe(p);
        }
    }

    /**
     * Test wakeup prior to select
     */
    public void testWakeupBeforeSelect() throws Exception {
        // select(Consumer)
        try (Selector sel = Selector.open()) {
            sel.wakeup();
            int n = sel.select(k -> assertTrue(false));
            assertTrue(n == 0);
        }

        // select(Consumer, timeout)
        try (Selector sel = Selector.open()) {
            sel.wakeup();
            long start = System.currentTimeMillis();
            int n = sel.select(k -> assertTrue(false), 60*1000);
            long duration = System.currentTimeMillis() - start;
            assertTrue(n == 0);
            assertTrue(duration < 5000, "select took " + duration + " ms");
        }
    }

    /**
     * Test wakeup during select
     */
    public void testWakeupDuringSelect() throws Exception {
        // select(Consumer)
        try (Selector sel = Selector.open()) {
            scheduleWakeup(sel, 1, SECONDS);
            int n = sel.select(k -> assertTrue(false));
            assertTrue(n == 0);
        }

        // select(Consumer, timeout)
        try (Selector sel = Selector.open()) {
            scheduleWakeup(sel, 1, SECONDS);
            long start = System.currentTimeMillis();
            int n = sel.select(k -> assertTrue(false), 60*1000);
            long duration = System.currentTimeMillis() - start;
            assertTrue(n == 0);
            assertTrue(duration > 500 && duration < 10*1000,
                    "select took " + duration + " ms");
        }
    }

    /**
     * Test invoking select with interrupt status set
     */
    public void testInterruptBeforeSelect() throws Exception {
        // select(Consumer)
        try (Selector sel = Selector.open()) {
            Thread.currentThread().interrupt();
            int n = sel.select(k -> assertTrue(false));
            assertTrue(n == 0);
            assertTrue(Thread.currentThread().isInterrupted());
            assertTrue(sel.isOpen());
        } finally {
            Thread.currentThread().interrupted();  // clear interrupt status
        }

        // select(Consumer, timeout)
        try (Selector sel = Selector.open()) {
            Thread.currentThread().interrupt();
            long start = System.currentTimeMillis();
            int n = sel.select(k -> assertTrue(false), 60*1000);
            long duration = System.currentTimeMillis() - start;
            assertTrue(n == 0);
            assertTrue(duration < 5000, "select took " + duration + " ms");
            assertTrue(Thread.currentThread().isInterrupted());
            assertTrue(sel.isOpen());
        } finally {
            Thread.currentThread().interrupted();  // clear interrupt status
        }
    }

    /**
     * Test interrupt thread during select
     */
    public void testInterruptDuringSelect() throws Exception {
        // select(Consumer)
        try (Selector sel = Selector.open()) {
            scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
            int n = sel.select(k -> assertTrue(false));
            assertTrue(n == 0);
            assertTrue(Thread.currentThread().isInterrupted());
            assertTrue(sel.isOpen());
        } finally {
            Thread.currentThread().interrupted();  // clear interrupt status
        }

        // select(Consumer, timeout)
        try (Selector sel = Selector.open()) {
            scheduleInterrupt(Thread.currentThread(), 1, SECONDS);
            long start = System.currentTimeMillis();
            int n = sel.select(k -> assertTrue(false), 60*1000);
            long duration = System.currentTimeMillis() - start;
            assertTrue(n == 0);
            assertTrue(Thread.currentThread().isInterrupted());
            assertTrue(sel.isOpen());
        } finally {
            Thread.currentThread().interrupted();  // clear interrupt status
        }
    }

    /**
     * Test invoking select on a closed selector
     */
    @Test(expectedExceptions = ClosedSelectorException.class)
    public void testClosedSelector1() throws Exception {
        Selector sel = Selector.open();
        sel.close();
        sel.select(k -> assertTrue(false));
    }
    @Test(expectedExceptions = ClosedSelectorException.class)
    public void testClosedSelector2() throws Exception {
        Selector sel = Selector.open();
        sel.close();
        sel.select(k -> assertTrue(false), 1000);
    }
    @Test(expectedExceptions = ClosedSelectorException.class)
    public void testClosedSelector3() throws Exception {
        Selector sel = Selector.open();
        sel.close();
        sel.selectNow(k -> assertTrue(false));
    }

    /**
     * Test closing selector while in a selection operation
     */
    public void testCloseDuringSelect() throws Exception {
        // select(Consumer)
        try (Selector sel = Selector.open()) {
            scheduleClose(sel, 3, SECONDS);
            int n = sel.select(k -> assertTrue(false));
            assertTrue(n == 0);
            assertFalse(sel.isOpen());
        }

        // select(Consumer, timeout)
        try (Selector sel = Selector.open()) {
            scheduleClose(sel, 3, SECONDS);
            long start = System.currentTimeMillis();
            int n = sel.select(k -> assertTrue(false), 60*1000);
            long duration = System.currentTimeMillis() - start;
            assertTrue(n == 0);
            assertTrue(duration > 2000 && duration < 10*1000,
                    "select took " + duration + " ms");
            assertFalse(sel.isOpen());
        }
    }

    /**
     * Test action closing selector
     */
    @Test(expectedExceptions = ClosedSelectorException.class)
    public void testActionClosingSelector() throws Exception {
        Pipe p = Pipe.open();
        try (Selector sel = Selector.open()) {
            Pipe.SourceChannel source = p.source();
            Pipe.SinkChannel sink = p.sink();
            source.configureBlocking(false);
            SelectionKey key = source.register(sel, SelectionKey.OP_READ);

            // write to sink to ensure that the source is readable
            sink.write(messageBuffer());

            // should relay ClosedSelectorException
            sel.select(k -> {
                assertTrue(k == key);
                try {
                    sel.close();
                } catch (IOException ioe) { }
            });
        } finally {
            closePipe(p);
        }
    }

    /**
     * Test that the action is invoked while synchronized on the selector and
     * its selected-key set.
     */
    public void testLocks() throws Exception {
        Pipe p = Pipe.open();
        try (Selector sel = Selector.open()) {
            Pipe.SourceChannel source = p.source();
            Pipe.SinkChannel sink = p.sink();
            source.configureBlocking(false);
            SelectionKey key = source.register(sel, SelectionKey.OP_READ);

            // write to sink to ensure that the source is readable
            sink.write(messageBuffer());

            // select(Consumer)
            sel.select(k -> {
                assertTrue(k == key);
                assertTrue(Thread.holdsLock(sel));
                assertFalse(Thread.holdsLock(sel.keys()));
                assertTrue(Thread.holdsLock(sel.selectedKeys()));
            });

            // select(Consumer, timeout)
            sel.select(k -> {
                assertTrue(k == key);
                assertTrue(Thread.holdsLock(sel));
                assertFalse(Thread.holdsLock(sel.keys()));
                assertTrue(Thread.holdsLock(sel.selectedKeys()));
            }, 1000L);

            // selectNow(Consumer)
            sel.selectNow(k -> {
                assertTrue(k == key);
                assertTrue(Thread.holdsLock(sel));
                assertFalse(Thread.holdsLock(sel.keys()));
                assertTrue(Thread.holdsLock(sel.selectedKeys()));
            });
        } finally {
            closePipe(p);
        }
    }

    /**
     * Test that selection operations remove cancelled keys from the selector's
     * key and selected-key sets.
     */
    public void testCancel() throws Exception {
        Pipe p = Pipe.open();
        try (Selector sel = Selector.open()) {
            Pipe.SinkChannel sink = p.sink();
            Pipe.SourceChannel source = p.source();

            // write to sink to ensure that the source is readable
            sink.write(messageBuffer());

            source.configureBlocking(false);
            SelectionKey key1 = source.register(sel, SelectionKey.OP_READ);
            // make sure pipe source is readable before we do following checks.
            // this is sometime necessary on windows where pipe is implemented
            // as a pair of connected socket, so there is no guarantee that written
            // bytes on sink side is immediately available on source side.
            sel.select();

            sink.configureBlocking(false);
            SelectionKey key2 = sink.register(sel, SelectionKey.OP_WRITE);
            sel.selectNow();

            assertTrue(sel.keys().contains(key1));
            assertTrue(sel.keys().contains(key2));
            assertTrue(sel.selectedKeys().contains(key1));
            assertTrue(sel.selectedKeys().contains(key2));

            // cancel key1
            key1.cancel();
            int n = sel.selectNow(k -> assertTrue(k == key2));
            assertTrue(n == 1);
            assertFalse(sel.keys().contains(key1));
            assertTrue(sel.keys().contains(key2));
            assertFalse(sel.selectedKeys().contains(key1));
            assertTrue(sel.selectedKeys().contains(key2));

            // cancel key2
            key2.cancel();
            n = sel.selectNow(k -> assertTrue(false));
            assertTrue(n == 0);
            assertFalse(sel.keys().contains(key1));
            assertFalse(sel.keys().contains(key2));
            assertFalse(sel.selectedKeys().contains(key1));
            assertFalse(sel.selectedKeys().contains(key2));
        } finally {
            closePipe(p);
        }
    }

    /**
     * Test an action invoking select()
     */
    public void testReentrantSelect1() throws Exception {
        Pipe p = Pipe.open();
        try (Selector sel = Selector.open()) {
            Pipe.SinkChannel sink = p.sink();
            Pipe.SourceChannel source = p.source();
            source.configureBlocking(false);
            source.register(sel, SelectionKey.OP_READ);

            // write to sink to ensure that the source is readable
            scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);

            int n = sel.select(k -> {
                try {
                    sel.select();
                    assertTrue(false);
                } catch (IOException ioe) {
                    throw new RuntimeException(ioe);
                } catch (IllegalStateException expected) {
                }
            });
            assertTrue(n == 1);
        } finally {
            closePipe(p);
        }
    }

    /**
     * Test an action invoking selectNow()
     */
    public void testReentrantSelect2() throws Exception {
        Pipe p = Pipe.open();
        try (Selector sel = Selector.open()) {
            Pipe.SinkChannel sink = p.sink();
            Pipe.SourceChannel source = p.source();

            // write to sink to ensure that the source is readable
            scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);

            source.configureBlocking(false);
            source.register(sel, SelectionKey.OP_READ);
            int n = sel.select(k -> {
                try {
                    sel.selectNow();
                    assertTrue(false);
                } catch (IOException ioe) {
                    throw new RuntimeException(ioe);
                } catch (IllegalStateException expected) {
                }
            });
            assertTrue(n == 1);
        } finally {
            closePipe(p);
        }
    }

    /**
     * Test an action invoking select(Consumer)
     */
    public void testReentrantSelect3() throws Exception {
        Pipe p = Pipe.open();
        try (Selector sel = Selector.open()) {
            Pipe.SinkChannel sink = p.sink();
            Pipe.SourceChannel source = p.source();

            // write to sink to ensure that the source is readable
            scheduleWrite(sink, messageBuffer(), 100, MILLISECONDS);

            source.configureBlocking(false);
            source.register(sel, SelectionKey.OP_READ);
            int n = sel.select(k -> {
                try {
                    sel.select(x -> assertTrue(false));
                    assertTrue(false);
                } catch (IOException ioe) {
                    throw new RuntimeException(ioe);
                } catch (IllegalStateException expected) {
                }
            });
            assertTrue(n == 1);
        } finally {
            closePipe(p);
        }
    }

    /**
     * Negative timeout
     */
    @Test(expectedExceptions = IllegalArgumentException.class)
    public void testNegativeTimeout() throws Exception {
        try (Selector sel = Selector.open()) {
            sel.select(k -> { }, -1L);
        }
    }

    /**
     * Null action
     */
    @Test(expectedExceptions = NullPointerException.class)
    public void testNull1() throws Exception {
        try (Selector sel = Selector.open()) {
            sel.select(null);
        }
    }
    @Test(expectedExceptions = NullPointerException.class)
    public void testNull2() throws Exception {
        try (Selector sel = Selector.open()) {
            sel.select(null, 1000);
        }
    }
    @Test(expectedExceptions = NullPointerException.class)
    public void testNull3() throws Exception {
        try (Selector sel = Selector.open()) {
            sel.selectNow(null);
        }
    }


    // -- support methods ---

    private final ScheduledExecutorService POOL = Executors.newScheduledThreadPool(1);

    @AfterTest
    void shutdownThreadPool() {
        POOL.shutdown();
    }

    void scheduleWakeup(Selector sel, long delay, TimeUnit unit) {
        POOL.schedule(() -> sel.wakeup(), delay, unit);
    }

    void scheduleInterrupt(Thread t, long delay, TimeUnit unit) {
        POOL.schedule(() -> t.interrupt(), delay, unit);
    }

    void scheduleClose(Closeable c, long delay, TimeUnit unit) {
        POOL.schedule(() -> {
            try {
                c.close();
            } catch (IOException ioe) {
                ioe.printStackTrace();
            }
        }, delay, unit);
    }

    void scheduleWrite(WritableByteChannel sink, ByteBuffer buf, long delay, TimeUnit unit) {
        POOL.schedule(() -> {
            try {
                sink.write(buf);
            } catch (IOException ioe) {
                ioe.printStackTrace();
            }
        }, delay, unit);
    }

    static void closePipe(Pipe p) {
        try { p.sink().close(); } catch (IOException ignore) { }
        try { p.source().close(); } catch (IOException ignore) { }
    }

    static ByteBuffer messageBuffer() {
        try {
            return ByteBuffer.wrap("message".getBytes("UTF-8"));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}