test/jdk/jdk/net/RdmaSockets/rsocket/SocketChannel/IOExchanges.java
author bpb
Thu, 07 Feb 2019 11:09:09 -0800
branchrsocket-branch
changeset 57160 c502c299d41e
parent 57156 81e4a12fd1a4
permissions -rw-r--r--
rsocket-branch: correct copyright year

/*
 * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

/*
 * @test
 * @bug 8195160
 * @summary Tests various combinations of blocking/nonblocking connections
 * @requires (os.family == "linux")
 * @library ../ /test/lib
 * @build RsocketTest
 * @run testng/othervm IOExchanges
 * @author chegar
 */

import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ProtocolFamily;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.ByteBuffer;
import jdk.net.RdmaSockets;
import org.testng.SkipException;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import static java.lang.System.out;
import static java.net.StandardProtocolFamily.INET;
import static java.net.StandardProtocolFamily.INET6;
import static java.nio.channels.SelectionKey.OP_ACCEPT;
import static java.nio.channels.SelectionKey.OP_READ;
import static java.nio.channels.SelectionKey.OP_WRITE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class IOExchanges {

    // Whether, or not, to use RDMA channels or regular socket channels.
    // For test assertion purposes during development.
    static final boolean useRDMA = true;

    static InetAddress addr;
    static ProtocolFamily family;

    @BeforeTest
    public void setup() throws Exception {
        if (useRDMA && !RsocketTest.isRsocketAvailable())
            throw new SkipException("rsocket is not available");

        addr = InetAddress.getLocalHost();
        family = addr instanceof Inet6Address ? INET6 : INET;
        out.printf("local address: %s%n", addr);
        out.printf("useRDMA: %b%n", useRDMA);
    }

    static SocketChannel openSocketChannel(ProtocolFamily family)
        throws IOException
    {
        return useRDMA ? RdmaSockets.openSocketChannel(family)
                       : SocketChannel.open();
    }
    static ServerSocketChannel openServerSocketChannel(ProtocolFamily family)
        throws IOException
    {
        return useRDMA ? RdmaSockets.openServerSocketChannel(family)
                       : ServerSocketChannel.open();
    }
    static Selector openSelector( ) throws IOException {
        return useRDMA ? RdmaSockets.openSelector() : Selector.open();
    }

    /*
     The following, non-exhaustive set, of tests exercise different combinations
     of blocking and non-blocking accept/connect calls along with I/O
     operations, that exchange a single byte. The intent it to test a reasonable
     set of blocking and non-blocking scenarios.

      The individual test method names follow their test scenario.
       [BAccep|SELNBAccep|SPINNBAccep] - Accept either:
                        blocking, select-non-blocking, spinning-non-blocking
       [BConn|NBConn] - blocking connect / non-blocking connect
       [BIO|NBIO]     - blocking / non-blocking I/O operations (read/write)
       [WR|RW] - connecting thread write/accepting thread reads first, and vice-versa
       [Id]    - unique test Id

        BAccep_BConn_BIO_WR_1
        BAccep_BConn_BIO_RW_2
        SELNBAccep_BConn_BIO_WR_3
        SELNBAccep_BConn_BIO_RW_4
        SPINNBAccep_BConn_BIO_WR_5
        SPINNBAccep_BConn_BIO_RW_6
        BAccep_NBConn_BIO_WR_7
        BAccep_NBConn_BIO_RW_8
        SELNBAccep_NBConn_BIO_WR_9
        SELNBAccep_NBConn_BIO_RW_10
        SPINNBAccep_NBConn_BIO_WR_11
        SPINNBAccep_NBConn_BIO_RW_12

        BAccep_BConn_NBIO_WR_1a        << Non-Blocking I/O
        BAccep_BConn_NBIO_RW_2a
        SELNBAccep_BConn_NBIO_WR_3a
        SELNBAccep_BConn_NBIO_RW_4a
        SPINNBAccep_BConn_NBIO_WR_5a
        SPINNBAccep_BConn_NBIO_RW_6a
        BAccep_NBConn_NBIO_WR_7a
        BAccep_NBConn_NBIO_RW_8a
        SELNBAccep_NBConn_NBIO_WR_9a
        SELNBAccep_NBConn_NBIO_RW_10a
        SPINBAccep_NBConn_NBIO_WR_11a
        SPINBAccep_NBConn_NBIO_RW_12a
    */


    @Test
    public void BAccep_BConn_BIO_WR_1() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            TestThread t = TestThread.of("t1", () -> {
                try (SocketChannel sc = openSocketChannel(family)) {
                    assertTrue(sc.connect(new InetSocketAddress(addr, port)));
                    ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x01).flip();
                    assertEquals(sc.write(bb), 1);
                    out.printf("wrote: 0x%x%n", bb.get(0));
                    assertEquals(sc.read(bb.clear()), -1);
                }
            });
            t.start();

            try (SocketChannel sc = ssc.accept()) {
                ByteBuffer bb = ByteBuffer.allocate(10);
                assertEquals(sc.read(bb), 1);
                out.printf("read:  0x%x%n", bb.get(0));
                assertEquals(bb.get(0), 0x01);
            }
            t.awaitCompletion();
        }
    }

    @Test
    public void BAccep_BConn_BIO_RW_2() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            TestThread t = TestThread.of("t2", () -> {
                try (SocketChannel sc = openSocketChannel(family)) {
                    assertTrue(sc.connect(new InetSocketAddress(addr, port)));
                    ByteBuffer bb = ByteBuffer.allocate(10);
                    assertEquals(sc.read(bb), 1);
                    out.printf("read:  0x%x%n", bb.get(0));
                    assertEquals(bb.get(0), 0x02);
                }
            });
            t.start();

            try (SocketChannel sc = ssc.accept()) {
                ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x02).flip();
                assertEquals(sc.write(bb), 1);
                out.printf("wrote: 0x%x%n", bb.get(0));
                assertEquals(sc.read(bb.clear()), -1);
            }
            t.awaitCompletion();
        }
    }

    @Test
    public void SELNBAccep_BConn_BIO_WR_3() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family);
             Selector selector = openSelector()) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            TestThread t = TestThread.of("t3", () -> {
                try (SocketChannel sc = openSocketChannel(family)) {
                    assertTrue(sc.connect(new InetSocketAddress(addr, port)));
                    ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x03).flip();
                    assertEquals(sc.write(bb), 1);
                    out.printf("wrote: 0x%x%n", bb.get(0));
                    assertEquals(sc.read(bb.clear()), -1);
                }
            });
            t.start();

            ssc.configureBlocking(false).register(selector, OP_ACCEPT);
            assertEquals(selector.select(), 1);

            try (SocketChannel sc = ssc.accept()) {
                ByteBuffer bb = ByteBuffer.allocate(10);
                assertEquals(sc.read(bb), 1);
                out.printf("read:  0x%x%n", bb.get(0));
                assertEquals(bb.get(0), 0x03);
            }
            t.awaitCompletion();
        }
    }

    @Test
    public void SELNBAccep_BConn_BIO_RW_4() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family);
             Selector selector = openSelector()) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            TestThread t = TestThread.of("t4", () -> {
                try (SocketChannel sc = openSocketChannel(family)) {
                    assertTrue(sc.connect(new InetSocketAddress(addr, port)));
                    ByteBuffer bb = ByteBuffer.allocate(10);
                    assertEquals(sc.read(bb), 1);
                    out.printf("read:  0x%x%n", bb.get(0));
                    assertEquals(bb.get(0), 0x04);
                }
            });
            t.start();

            ssc.configureBlocking(false).register(selector, OP_ACCEPT);
            assertEquals(selector.select(), 1);

            try (SocketChannel sc = ssc.accept()) {
                ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x04).flip();
                assertEquals(sc.write(bb), 1);
                out.printf("wrote: 0x%x%n", bb.get(0));
                assertEquals(sc.read(bb.clear()), -1);

            }
            t.awaitCompletion();
        }
    }

    @Test
    public void SPINNBAccep_BConn_BIO_WR_5() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            TestThread t = TestThread.of("t5", () -> {
                try (SocketChannel sc = openSocketChannel(family)) {
                    assertTrue(sc.connect(new InetSocketAddress(addr, port)));
                    ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x05).flip();
                    assertEquals(sc.write(bb), 1);
                    out.printf("wrote: 0x%x%n", bb.get(0));
                    assertEquals(sc.read(bb.clear()), -1);
                }
            });
            t.start();

            SocketChannel accepted;
            for (;;) {
                accepted = ssc.accept();
                if (accepted != null) {
                    out.println("accepted new connection");
                    break;
                }
                Thread.onSpinWait();
            }

            try (SocketChannel sc = accepted) {
                ByteBuffer bb = ByteBuffer.allocate(10);
                assertEquals(sc.read(bb), 1);
                out.printf("read:  0x%x%n", bb.get(0));
                assertEquals(bb.get(0), 0x05);
            }
            t.awaitCompletion();
        }
    }

    @Test
    public void SPINNBAccep_BConn_BIO_RW_6() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            TestThread t = TestThread.of("t6", () -> {
                try (SocketChannel sc = openSocketChannel(family)) {
                    assertTrue(sc.connect(new InetSocketAddress(addr, port)));
                    ByteBuffer bb = ByteBuffer.allocate(10);
                    assertEquals(sc.read(bb), 1);
                    out.printf("read:  0x%x%n", bb.get(0));
                    assertEquals(bb.get(0), 0x06);
                }
            });
            t.start();

            SocketChannel accepted;
            for (;;) {
                accepted = ssc.accept();
                if (accepted != null) {
                    out.println("accepted new connection");
                    break;
                }
                Thread.onSpinWait();
            }

            try (SocketChannel sc = accepted) {
                ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x06).flip();
                assertEquals(sc.write(bb), 1);
                out.printf("wrote: 0x%x%n", bb.get(0));
                assertEquals(sc.read(bb.clear()), -1);

            }
            t.awaitCompletion();
        }
    }

    // Similar to the previous six scenarios, but with same-thread
    // non-blocking connect.
/*
    @Test
    public void BAccep_NBConn_BIO_WR_7() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            try (SocketChannel sc = openSocketChannel(family)) {
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress(addr, port));

                try (SocketChannel sc2 = ssc.accept()) {
                    assertTrue(sc.finishConnect());
                    sc.configureBlocking(true);
                    TestThread t = TestThread.of("t7", () -> {
                        ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x07).flip();
                        assertEquals(sc.write(bb), 1);
                        out.printf("wrote: 0x%x%n", bb.get(0));
                        assertEquals(sc.read(bb.clear()), -1);
                    });
                    t.start();

                    ByteBuffer bb = ByteBuffer.allocate(10);
                    assertEquals(sc2.read(bb), 1);
                    out.printf("read:  0x%x%n", bb.get(0));
                    assertEquals(bb.get(0), 0x07);
                    sc2.shutdownOutput();
                    t.awaitCompletion();
                }
            }
        }
    }

    @Test
    public void BAccep_NBConn_BIO_RW_8() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            try (SocketChannel sc = openSocketChannel(family)) {
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress(addr, port));

                try (SocketChannel sc2 = ssc.accept()) {
                    assertTrue(sc.finishConnect());
                    sc.configureBlocking(true);
                    TestThread t = TestThread.of("t8", () -> {
                        ByteBuffer bb = ByteBuffer.allocate(10);
                        assertEquals(sc.read(bb), 1);
                        out.printf("read:  0x%x%n", bb.get(0));
                        assertEquals(bb.get(0), 0x08);
                        sc.shutdownOutput();
                    });
                    t.start();

                    ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x08).flip();
                    assertEquals(sc2.write(bb), 1);
                    out.printf("wrote: 0x%x%n", bb.get(0));
                    assertEquals(sc2.read(bb.clear()), -1);
                    t.awaitCompletion();
                }
            }
        }
    }

    @Test
    public void SELNBAccep_NBConn_BIO_WR_9() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            try (SocketChannel sc = openSocketChannel(family);
                 Selector selector = openSelector()) {
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress(addr, port));

                ssc.configureBlocking(false).register(selector, OP_ACCEPT);
                assertEquals(selector.select(), 1);

                try (SocketChannel sc2 = ssc.accept()) {
                    assertTrue(sc.finishConnect());
                    sc.configureBlocking(true);
                    TestThread t = TestThread.of("t9", () -> {
                        ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x09).flip();
                        assertEquals(sc.write(bb), 1);
                        out.printf("wrote: 0x%x%n", bb.get(0));
                        assertEquals(sc.read(bb.clear()), -1);
                    });
                    t.start();

                    ByteBuffer bb = ByteBuffer.allocate(10);
                    assertEquals(sc2.read(bb), 1);
                    out.printf("read:  0x%x%n", bb.get(0));
                    assertEquals(bb.get(0), 0x09);
                    sc2.shutdownOutput();
                    t.awaitCompletion();
                }
            }
        }
    }

    @Test
    public void SELNBAccep_NBConn_BIO_RW_10() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            try (SocketChannel sc = openSocketChannel(family);
                 Selector selector = openSelector()) {
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress(addr, port));

                ssc.configureBlocking(false).register(selector, OP_ACCEPT);
                assertEquals(selector.select(), 1);

                try (SocketChannel sc2 = ssc.accept()) {
                    assertTrue(sc.finishConnect());
                    sc.configureBlocking(true);
                    TestThread t = TestThread.of("t10", () -> {
                        ByteBuffer bb = ByteBuffer.allocate(10);
                        assertEquals(sc.read(bb), 1);
                        out.printf("read:  0x%x%n", bb.get(0));
                        assertEquals(bb.get(0), 0x10);
                        sc.shutdownOutput();
                    });
                    t.start();

                    ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x10).flip();
                    assertEquals(sc2.write(bb), 1);
                    out.printf("wrote: 0x%x%n", bb.get(0));
                    assertEquals(sc2.read(bb.clear()), -1);
                    t.awaitCompletion();
                }
            }
        }
    }

    @Test
    public void SPINNBAccep_NBConn_BIO_WR_11() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            try (SocketChannel sc = openSocketChannel(family)) {
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress(addr, port));

                SocketChannel accepted;
                for (;;) {
                    accepted = ssc.accept();
                    if (accepted != null) {
                        out.println("accepted new connection");
                        break;
                    }
                    Thread.onSpinWait();
                }

                try (SocketChannel sc2 = accepted) {
                    assertTrue(sc.finishConnect());
                    sc.configureBlocking(true);
                    TestThread t = TestThread.of("t11", () -> {
                        ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x11).flip();
                        assertEquals(sc.write(bb), 1);
                        out.printf("wrote: 0x%x%n", bb.get(0));
                        assertEquals(sc.read(bb.clear()), -1);
                    });
                    t.start();

                    ByteBuffer bb = ByteBuffer.allocate(10);
                    assertEquals(sc2.read(bb), 1);
                    out.printf("read:  0x%x%n", bb.get(0));
                    assertEquals(bb.get(0), 0x11);
                    sc2.shutdownOutput();
                    t.awaitCompletion();
                }
            }
        }
    }

    @Test
    public void SPINNBAccep_NBConn_BIO_RW_12() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            try (SocketChannel sc = openSocketChannel(family)) {
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress(addr, port));

                SocketChannel accepted;
                for (;;) {
                    accepted = ssc.accept();
                    if (accepted != null) {
                        out.println("accepted new connection");
                        break;
                    }
                    Thread.onSpinWait();
                }

                try (SocketChannel sc2 = accepted) {
                    assertTrue(sc.finishConnect());
                    sc.configureBlocking(true);
                    TestThread t = TestThread.of("t12", () -> {
                        ByteBuffer bb = ByteBuffer.allocate(10);
                        assertEquals(sc.read(bb), 1);
                        out.printf("read:  0x%x%n", bb.get(0));
                        assertEquals(bb.get(0), 0x12);
                        sc.shutdownOutput();
                    });
                    t.start();

                    ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x12).flip();
                    assertEquals(sc2.write(bb), 1);
                    out.printf("wrote: 0x%x%n", bb.get(0));
                    assertEquals(sc2.read(bb.clear()), -1);
                    t.awaitCompletion();
                }
            }
        }
    }
*/
    // ---
    // Similar to the previous twelve scenarios but with non-blocking IO
    // ---

    @Test
    public void BAccep_BConn_NBIO_WR_1a() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            TestThread t = TestThread.of("t1a", () -> {
                try (SocketChannel sc = openSocketChannel(family);
                     Selector selector = openSelector()) {
                    assertTrue(sc.connect(new InetSocketAddress(addr, port)));
                    ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x1A).flip();
                    sc.configureBlocking(false);
                    SelectionKey k = sc.register(selector, OP_WRITE);
                    selector.select();
                    int c;
                    while ((c = sc.write(bb)) < 1);
                    assertEquals(c, 1);
                    out.printf("wrote: 0x%x%n", bb.get(0));
                    k.interestOps(OP_READ);
                    selector.select();
                    bb.clear();
                    while ((c = sc.read(bb)) == 0);
                    assertEquals(c, -1);
                }
            });
            t.start();

            try (SocketChannel sc = ssc.accept();
                 Selector selector = openSelector()) {
                ByteBuffer bb = ByteBuffer.allocate(10);
                sc.configureBlocking(false);
                sc.register(selector, OP_READ);
                selector.select();
                int c;
                while ((c = sc.read(bb)) == 0) ;
                assertEquals(c, 1);
                out.printf("read:  0x%x%n", bb.get(0));
                assertEquals(bb.get(0), 0x1A);
            }
            t.awaitCompletion();
        }
    }

    @Test
    public void BAccep_BConn_NBIO_RW_2a() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            TestThread t = TestThread.of("t2a", () -> {
                try (SocketChannel sc = openSocketChannel(family);
                     Selector selector = openSelector()) {
                    assertTrue(sc.connect(new InetSocketAddress(addr, port)));
                    ByteBuffer bb = ByteBuffer.allocate(10);
                    sc.configureBlocking(false);
                    sc.register(selector, OP_READ);
                    selector.select();
                    int c;
                    while ((c = sc.read(bb)) == 0);
                    assertEquals(c, 1);
                    out.printf("read:  0x%x%n", bb.get(0));
                    assertEquals(bb.get(0), 0x2A);
                }
            });
            t.start();

            try (SocketChannel sc = ssc.accept();
                 Selector selector = openSelector()) {
                ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x2A).flip();
                sc.configureBlocking(false);
                SelectionKey k = sc.register(selector, OP_WRITE);
                selector.select();
                int c;
                while ((c = sc.write(bb)) < 1);
                assertEquals(c, 1);
                out.printf("wrote: 0x%x%n", bb.get(0));
                k.interestOps(OP_READ);
                selector.select();
                bb.clear();
                while ((c = sc.read(bb)) == 0);
                assertEquals(c, -1);
            }
            t.awaitCompletion();
        }
    }

    @Test
    public void SELNBAccep_BConn_NBIO_WR_3a() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family);
             Selector aselector = openSelector()) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            TestThread t = TestThread.of("t3a", () -> {
                try (SocketChannel sc = openSocketChannel(family);
                     Selector selector = openSelector()) {
                    assertTrue(sc.connect(new InetSocketAddress(addr, port)));
                    ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x3A).flip();
                    sc.configureBlocking(false);
                    SelectionKey k = sc.register(selector, OP_WRITE);
                    selector.select();
                    int c;
                    while ((c = sc.write(bb)) < 1);
                    assertEquals(c, 1);
                    out.printf("wrote: 0x%x%n", bb.get(0));
                    k.interestOps(OP_READ);
                    selector.select();
                    bb.clear();
                    while ((c = sc.read(bb)) == 0);
                    assertEquals(c, -1);
                }
            });
            t.start();

            ssc.configureBlocking(false).register(aselector, OP_ACCEPT);
            assertEquals(aselector.select(), 1);

            try (SocketChannel sc = ssc.accept();
                 Selector selector = openSelector()) {
                ByteBuffer bb = ByteBuffer.allocate(10);
                sc.configureBlocking(false);
                sc.register(selector, OP_READ);
                selector.select();
                int c;
                while ((c = sc.read(bb)) == 0) ;
                assertEquals(c, 1);
                out.printf("read:  0x%x%n", bb.get(0));
                assertEquals(bb.get(0), 0x3A);
            }
            t.awaitCompletion();
        }
    }

    @Test
    public void SELNBAccep_BConn_NBIO_RW_4a() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family);
             Selector aselector = openSelector()) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            TestThread t = TestThread.of("t4a", () -> {
                try (SocketChannel sc = openSocketChannel(family);
                     Selector selector = openSelector()) {
                    assertTrue(sc.connect(new InetSocketAddress(addr, port)));
                    ByteBuffer bb = ByteBuffer.allocate(10);
                    sc.configureBlocking(false);
                    sc.register(selector, OP_READ);
                    selector.select();
                    int c;
                    while ((c = sc.read(bb)) == 0);
                    assertEquals(c, 1);
                    out.printf("read:  0x%x%n", bb.get(0));
                    assertEquals(bb.get(0), 0x4A);
                }
            });
            t.start();

            ssc.configureBlocking(false).register(aselector, OP_ACCEPT);
            assertEquals(aselector.select(), 1);

            try (SocketChannel sc = ssc.accept();
                 Selector selector = openSelector()) {
                ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x4A).flip();
                sc.configureBlocking(false);
                SelectionKey k = sc.register(selector, OP_WRITE);
                selector.select();
                int c;
                while ((c = sc.write(bb)) < 1);
                assertEquals(c, 1);
                out.printf("wrote: 0x%x%n", bb.get(0));
                k.interestOps(OP_READ);
                selector.select();
                bb.clear();
                while ((c = sc.read(bb)) == 0);
                assertEquals(c, -1);
            }
            t.awaitCompletion();
        }
    }

    @Test
    public void SPINNBAccep_BConn_NBIO_WR_5a() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            TestThread t = TestThread.of("t5a", () -> {
                try (SocketChannel sc = openSocketChannel(family);
                     Selector selector = openSelector()) {
                    assertTrue(sc.connect(new InetSocketAddress(addr, port)));
                    ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x5A).flip();
                    sc.configureBlocking(false);
                    SelectionKey k = sc.register(selector, OP_WRITE);
                    selector.select();
                    int c;
                    while ((c = sc.write(bb)) < 1);
                    assertEquals(c, 1);
                    out.printf("wrote: 0x%x%n", bb.get(0));
                    k.interestOps(OP_READ);
                    selector.select();
                    bb.clear();
                    while ((c = sc.read(bb)) == 0);
                    assertEquals(c, -1);
                }
            });
            t.start();

            SocketChannel accepted;
            for (;;) {
                accepted = ssc.accept();
                if (accepted != null) {
                    out.println("accepted new connection");
                    break;
                }
                Thread.onSpinWait();
            }

            try (SocketChannel sc = accepted;
                 Selector selector = openSelector()) {
                ByteBuffer bb = ByteBuffer.allocate(10);
                sc.configureBlocking(false);
                sc.register(selector, OP_READ);
                selector.select();
                int c;
                while ((c = sc.read(bb)) == 0) ;
                assertEquals(c, 1);
                out.printf("read:  0x%x%n", bb.get(0));
                assertEquals(bb.get(0), 0x5A);
            }
            t.awaitCompletion();
        }
    }

    @Test
    public void SPINNBAccep_BConn_NBIO_RW_6a() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            TestThread t = TestThread.of("t6a", () -> {
                try (SocketChannel sc = openSocketChannel(family);
                     Selector selector = openSelector()) {
                    assertTrue(sc.connect(new InetSocketAddress(addr, port)));
                    ByteBuffer bb = ByteBuffer.allocate(10);
                    sc.configureBlocking(false);
                    sc.register(selector, OP_READ);
                    selector.select();
                    int c;
                    while ((c = sc.read(bb)) == 0);
                    assertEquals(c, 1);
                    out.printf("read:  0x%x%n", bb.get(0));
                    assertEquals(bb.get(0), 0x6A);
                }
            });
            t.start();

            SocketChannel accepted;
            for (;;) {
                accepted = ssc.accept();
                if (accepted != null) {
                    out.println("accepted new connection");
                    break;
                }
                Thread.onSpinWait();
            }

            try (SocketChannel sc = accepted;
                 Selector selector = openSelector()) {
                ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x6A).flip();
                sc.configureBlocking(false);
                SelectionKey k = sc.register(selector, OP_WRITE);
                selector.select();
                int c;
                while ((c = sc.write(bb)) < 1);
                assertEquals(c, 1);
                out.printf("wrote: 0x%x%n", bb.get(0));
                k.interestOps(OP_READ);
                selector.select();
                bb.clear();
                while ((c = sc.read(bb)) == 0);
                assertEquals(c, -1);

            }
            t.awaitCompletion();
        }
    }

    // Similar to the previous six scenarios but with same-thread
    // non-blocking connect.
/*
    @Test
    public void BAccep_NBConn_NBIO_WR_7a() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            try (SocketChannel sc = openSocketChannel(family)) {
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress(addr, port));

                try (SocketChannel sc2 = ssc.accept()) {
                    assertTrue(sc.finishConnect());
                    TestThread t = TestThread.of("t7a", () -> {
                        try (Selector selector = openSelector()) {
                            ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x7A).flip();
                            sc.configureBlocking(false);
                            SelectionKey k = sc.register(selector, OP_WRITE);
                            selector.select();
                            int c;
                            while ((c = sc.write(bb)) < 1) ;
                            assertEquals(c, 1);
                            out.printf("wrote: 0x%x%n", bb.get(0));
                            k.interestOps(OP_READ);
                            selector.select();
                            bb.clear();
                            while ((c = sc.read(bb)) == 0) ;
                            assertEquals(c, -1);
                        }
                    });
                    t.start();

                    ByteBuffer bb = ByteBuffer.allocate(10);
                    sc2.configureBlocking(false);
                    try (Selector selector = openSelector()) {
                        sc2.register(selector, OP_READ);
                        selector.select();
                        int c;
                        while ((c = sc2.read(bb)) == 0) ;
                        assertEquals(c, 1);
                        out.printf("read:  0x%x%n", bb.get(0));
                        assertEquals(bb.get(0), 0x7A);
                        sc2.shutdownOutput();
                    }
                    t.awaitCompletion();
                }
            }
        }
    }

    @Test
    public void BAccep_NBConn_NBIO_RW_8a() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            try (SocketChannel sc = openSocketChannel(family)) {
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress(addr, port));

                try (SocketChannel sc2 = ssc.accept()) {
                    assertTrue(sc.finishConnect());
                    TestThread t = TestThread.of("t8a", () -> {
                        try (Selector selector = openSelector()) {
                            ByteBuffer bb = ByteBuffer.allocate(10);
                            sc.register(selector, OP_READ);
                            selector.select();
                            int c;
                            while ((c = sc.read(bb)) == 0);
                            assertEquals(c, 1);
                            out.printf("read:  0x%x%n", bb.get(0));
                            assertEquals(bb.get(0), (byte)0x8A);
                            sc.shutdownOutput();
                        }
                    });
                    t.start();

                    ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x8A).flip();
                    sc2.configureBlocking(false);
                    try (Selector selector = openSelector()) {
                        SelectionKey k = sc2.register(selector, OP_WRITE);
                        selector.select();
                        int c;
                        while ((c = sc2.write(bb)) < 1) ;
                        assertEquals(c, 1);
                        out.printf("wrote: 0x%x%n", bb.get(0));
                        k.interestOps(OP_READ);
                        selector.select();
                        bb.clear();
                        while ((c = sc2.read(bb)) == 0) ;
                        assertEquals(c, -1);
                    }
                    t.awaitCompletion();
                }
            }
        }
    }

    @Test
    public void SELNBAccep_NBConn_NBIO_WR_9a() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            try (SocketChannel sc = openSocketChannel(family)) {
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress(addr, port));

                Selector aselector = openSelector();
                ssc.configureBlocking(false).register(aselector, OP_ACCEPT);
                assertEquals(aselector.select(), 1);

                try (SocketChannel sc2 = ssc.accept()) {
                    assertTrue(sc.finishConnect());
                    TestThread t = TestThread.of("t9a", () -> {
                        try (Selector selector = openSelector()) {
                            ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0x9A).flip();
                            sc.configureBlocking(false);
                            SelectionKey k = sc.register(selector, OP_WRITE);
                            selector.select();
                            int c;
                            while ((c = sc.write(bb)) < 1) ;
                            assertEquals(c, 1);
                            out.printf("wrote: 0x%x%n", bb.get(0));
                            k.interestOps(OP_READ);
                            selector.select();
                            bb.clear();
                            while ((c = sc.read(bb)) == 0) ;
                            assertEquals(c, -1);
                        }
                    });
                    t.start();

                    ByteBuffer bb = ByteBuffer.allocate(10);
                    sc2.configureBlocking(false);
                    try (Selector selector = openSelector()) {
                        sc2.register(selector, OP_READ);
                        selector.select();
                        int c;
                        while ((c = sc2.read(bb)) == 0) ;
                        assertEquals(c, 1);
                        out.printf("read:  0x%x%n", bb.get(0));
                        assertEquals(bb.get(0), (byte)0x9A);
                        sc2.shutdownOutput();
                    }
                    t.awaitCompletion();
                }
            }
        }
    }

    @Test
    public void SELNBAccep_NBConn_NBIO_RW_10a() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            try (SocketChannel sc = openSocketChannel(family)) {
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress(addr, port));

                Selector aselector = openSelector();
                ssc.configureBlocking(false).register(aselector, OP_ACCEPT);
                assertEquals(aselector.select(), 1);

                try (SocketChannel sc2 = ssc.accept()) {
                    assertTrue(sc.finishConnect());
                    TestThread t = TestThread.of("t10a", () -> {
                        try (Selector selector = openSelector()) {
                            ByteBuffer bb = ByteBuffer.allocate(10);
                            sc.register(selector, OP_READ);
                            selector.select();
                            int c;
                            while ((c = sc.read(bb)) == 0);
                            assertEquals(c, 1);
                            out.printf("read:  0x%x%n", bb.get(0));
                            assertEquals(bb.get(0), (byte)0xAA);
                            sc.shutdownOutput();
                        }
                    });
                    t.start();

                    ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0xAA).flip();
                    sc2.configureBlocking(false);
                    try (Selector selector = openSelector()) {
                        SelectionKey k = sc2.register(selector, OP_WRITE);
                        selector.select();
                        int c;
                        while ((c = sc2.write(bb)) < 1) ;
                        assertEquals(c, 1);
                        out.printf("wrote: 0x%x%n", bb.get(0));
                        k.interestOps(OP_READ);
                        selector.select();
                        bb.clear();
                        while ((c = sc2.read(bb)) == 0) ;
                        assertEquals(c, -1);
                    }
                    t.awaitCompletion();
                }
            }
        }
    }

    @Test
    public void SPINBAccep_NBConn_NBIO_WR_11a() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            try (SocketChannel sc = openSocketChannel(family)) {
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress(addr, port));

                SocketChannel accepted;
                for (;;) {
                    accepted = ssc.accept();
                    if (accepted != null) {
                        out.println("accepted new connection");
                        break;
                    }
                    Thread.onSpinWait();
                }

                try (SocketChannel sc2 = accepted) {
                    assertTrue(sc.finishConnect());
                    TestThread t = TestThread.of("t11a", () -> {
                        try (Selector selector = openSelector()) {
                            ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0xBA).flip();
                            sc.configureBlocking(false);
                            SelectionKey k = sc.register(selector, OP_WRITE);
                            selector.select();
                            int c;
                            while ((c = sc.write(bb)) < 1) ;
                            assertEquals(c, 1);
                            out.printf("wrote: 0x%x%n", bb.get(0));
                            k.interestOps(OP_READ);
                            selector.select();
                            bb.clear();
                            while ((c = sc.read(bb)) == 0) ;
                            assertEquals(c, -1);
                        }
                    });
                    t.start();

                    ByteBuffer bb = ByteBuffer.allocate(10);
                    sc2.configureBlocking(false);
                    try (Selector selector = openSelector()) {
                        sc2.register(selector, OP_READ);
                        selector.select();
                        int c;
                        while ((c = sc2.read(bb)) == 0) ;
                        assertEquals(c, 1);
                        out.printf("read:  0x%x%n", bb.get(0));
                        assertEquals(bb.get(0), (byte)0xBA);
                        sc2.shutdownOutput();
                    }
                    t.awaitCompletion();
                }
            }
        }
    }

    @Test
    public void SPINBAccep_NBConn_NBIO_RW_12a() throws Throwable {
        try (ServerSocketChannel ssc = openServerSocketChannel(family)) {
            ssc.bind(new InetSocketAddress(addr, 0));
            final int port = ssc.socket().getLocalPort();

            try (SocketChannel sc = openSocketChannel(family)) {
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress(addr, port));

                SocketChannel accepted;
                for (;;) {
                    accepted = ssc.accept();
                    if (accepted != null) {
                        out.println("accepted new connection");
                        break;
                    }
                    Thread.onSpinWait();
                }

                try (SocketChannel sc2 = accepted) {
                    assertTrue(sc.finishConnect());
                    TestThread t = TestThread.of("t10a", () -> {
                        try (Selector selector = openSelector()) {
                            ByteBuffer bb = ByteBuffer.allocate(10);
                            sc.register(selector, OP_READ);
                            selector.select();
                            int c;
                            while ((c = sc.read(bb)) == 0);
                            assertEquals(c, 1);
                            out.printf("read:  0x%x%n", bb.get(0));
                            assertEquals(bb.get(0), (byte)0xCA);
                            sc.shutdownOutput();
                        }
                    });
                    t.start();

                    ByteBuffer bb = ByteBuffer.allocate(10).put((byte)0xCA).flip();
                    sc2.configureBlocking(false);
                    try (Selector selector = openSelector()) {
                        SelectionKey k = sc2.register(selector, OP_WRITE);
                        selector.select();
                        int c;
                        while ((c = sc2.write(bb)) < 1) ;
                        assertEquals(c, 1);
                        out.printf("wrote: 0x%x%n", bb.get(0));
                        k.interestOps(OP_READ);
                        selector.select();
                        bb.clear();
                        while ((c = sc2.read(bb)) == 0) ;
                        assertEquals(c, -1);
                    }
                    t.awaitCompletion();
                }
            }
        }
    }
*/
    // --

    static class TestThread extends Thread {
        private final UncheckedRunnable runnable;
        private volatile Throwable throwable;

        TestThread(UncheckedRunnable runnable, String name) {
            super(name);
            this.runnable = runnable;
        }

        @Override
        public void run() {
            try {
                runnable.run();
            } catch (Throwable t) {
                out.printf("[%s] caught unexpected: %s%n", getName(), t);
                throwable = t;
            }
        }

        interface UncheckedRunnable {
            void run() throws Throwable;
        }

        static TestThread of(String name, UncheckedRunnable runnable) {
            return new TestThread(runnable, name);
        }

        void awaitCompletion() throws Throwable {
            this.join();
            if (throwable != null)
                throw throwable;
        }
    }
}