--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/nio/channels/AsyncCloseAndInterrupt.java Tue Sep 12 19:03:39 2017 +0200
@@ -0,0 +1,751 @@
+/*
+ * Copyright (c) 2002, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/* @test
+ * @bug 4460583 4470470 4840199 6419424 6710579 6596323 6824135 6395224 7142919
+ * 8151582 8068693 8153209
+ * @run main/othervm AsyncCloseAndInterrupt
+ * @key intermittent
+ * @summary Comprehensive test of asynchronous closing and interruption
+ * @author Mark Reinhold
+ */
+
+import java.io.*;
+import java.net.*;
+import java.nio.channels.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class AsyncCloseAndInterrupt {
+
+ static PrintStream log = System.err;
+
+ static void sleep(int ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException x) { }
+ }
+
+ // Wildcard address localized to this machine -- Windoze doesn't allow
+ // connecting to a server socket that was previously bound to a true
+ // wildcard, namely new InetSocketAddress((InetAddress)null, 0).
+ //
+ private static InetSocketAddress wildcardAddress;
+
+
+ // Server socket that blindly accepts all connections
+
+ static ServerSocketChannel acceptor;
+
+ private static void initAcceptor() throws IOException {
+ acceptor = ServerSocketChannel.open();
+ acceptor.socket().bind(wildcardAddress);
+
+ Thread th = new Thread("Acceptor") {
+ public void run() {
+ try {
+ for (;;) {
+ SocketChannel sc = acceptor.accept();
+ }
+ } catch (IOException x) {
+ x.printStackTrace();
+ }
+ }
+ };
+
+ th.setDaemon(true);
+ th.start();
+ }
+
+
+ // Server socket that refuses all connections
+
+ static ServerSocketChannel refuser;
+
+ private static void initRefuser() throws IOException {
+ refuser = ServerSocketChannel.open();
+ refuser.bind(wildcardAddress, 1); // use minimum backlog
+ }
+
+ // Dead pipe source and sink
+
+ static Pipe.SourceChannel deadSource;
+ static Pipe.SinkChannel deadSink;
+
+ private static void initPipes() throws IOException {
+ if (deadSource != null)
+ deadSource.close();
+ deadSource = Pipe.open().source();
+ if (deadSink != null)
+ deadSink.close();
+ deadSink = Pipe.open().sink();
+ }
+
+
+ // Files
+
+ private static File fifoFile = null; // File that blocks on reads and writes
+ private static File diskFile = null; // Disk file
+
+ private static void initFile() throws Exception {
+
+ diskFile = File.createTempFile("aci", ".tmp");
+ diskFile.deleteOnExit();
+ FileChannel fc = new FileOutputStream(diskFile).getChannel();
+ buffer.clear();
+ if (fc.write(buffer) != buffer.capacity())
+ throw new RuntimeException("Cannot create disk file");
+ fc.close();
+
+ if (TestUtil.onWindows()) {
+ log.println("WARNING: Cannot completely test FileChannels on Windows");
+ return;
+ }
+ fifoFile = new File("x.fifo");
+ if (fifoFile.exists()) {
+ if (!fifoFile.delete())
+ throw new IOException("Cannot delete existing fifo " + fifoFile);
+ }
+ Process p = Runtime.getRuntime().exec("mkfifo " + fifoFile);
+ if (p.waitFor() != 0)
+ throw new IOException("Error creating fifo");
+ new RandomAccessFile(fifoFile, "rw").close();
+
+ }
+
+
+ // Channel factories
+
+ static abstract class ChannelFactory {
+ private final String name;
+ ChannelFactory(String name) {
+ this.name = name;
+ }
+ public String toString() {
+ return name;
+ }
+ abstract InterruptibleChannel create() throws IOException;
+ }
+
+ static ChannelFactory socketChannelFactory
+ = new ChannelFactory("SocketChannel") {
+ InterruptibleChannel create() throws IOException {
+ return SocketChannel.open();
+ }
+ };
+
+ static ChannelFactory connectedSocketChannelFactory
+ = new ChannelFactory("SocketChannel") {
+ InterruptibleChannel create() throws IOException {
+ SocketAddress sa = acceptor.socket().getLocalSocketAddress();
+ return SocketChannel.open(sa);
+ }
+ };
+
+ static ChannelFactory serverSocketChannelFactory
+ = new ChannelFactory("ServerSocketChannel") {
+ InterruptibleChannel create() throws IOException {
+ ServerSocketChannel ssc = ServerSocketChannel.open();
+ ssc.socket().bind(wildcardAddress);
+ return ssc;
+ }
+ };
+
+ static ChannelFactory datagramChannelFactory
+ = new ChannelFactory("DatagramChannel") {
+ InterruptibleChannel create() throws IOException {
+ DatagramChannel dc = DatagramChannel.open();
+ InetAddress lb = InetAddress.getByName("127.0.0.1");
+ dc.bind(new InetSocketAddress(lb, 0));
+ dc.connect(new InetSocketAddress(lb, 80));
+ return dc;
+ }
+ };
+
+ static ChannelFactory pipeSourceChannelFactory
+ = new ChannelFactory("Pipe.SourceChannel") {
+ InterruptibleChannel create() throws IOException {
+ // ## arrange to close sink
+ return Pipe.open().source();
+ }
+ };
+
+ static ChannelFactory pipeSinkChannelFactory
+ = new ChannelFactory("Pipe.SinkChannel") {
+ InterruptibleChannel create() throws IOException {
+ // ## arrange to close source
+ return Pipe.open().sink();
+ }
+ };
+
+ static ChannelFactory fifoFileChannelFactory
+ = new ChannelFactory("FileChannel") {
+ InterruptibleChannel create() throws IOException {
+ return new RandomAccessFile(fifoFile, "rw").getChannel();
+ }
+ };
+
+ static ChannelFactory diskFileChannelFactory
+ = new ChannelFactory("FileChannel") {
+ InterruptibleChannel create() throws IOException {
+ return new RandomAccessFile(diskFile, "rw").getChannel();
+ }
+ };
+
+
+ // I/O operations
+
+ static abstract class Op {
+ private final String name;
+ protected Op(String name) {
+ this.name = name;
+ }
+ abstract void doIO(InterruptibleChannel ich) throws IOException;
+ void setup() throws IOException { }
+ public String toString() { return name; }
+ }
+
+ static ByteBuffer buffer = ByteBuffer.allocateDirect(1 << 20);
+
+ static ByteBuffer[] buffers = new ByteBuffer[] {
+ ByteBuffer.allocateDirect(1 << 19),
+ ByteBuffer.allocateDirect(1 << 19)
+ };
+
+ static void clearBuffers() {
+ buffers[0].clear();
+ buffers[1].clear();
+ }
+
+ static void show(Channel ch) {
+ log.print("Channel " + (ch.isOpen() ? "open" : "closed"));
+ if (ch.isOpen() && (ch instanceof SocketChannel)) {
+ SocketChannel sc = (SocketChannel)ch;
+ if (sc.socket().isInputShutdown())
+ log.print(", input shutdown");
+ if (sc.socket().isOutputShutdown())
+ log.print(", output shutdown");
+ }
+ log.println();
+ }
+
+ static final Op READ = new Op("read") {
+ void doIO(InterruptibleChannel ich) throws IOException {
+ ReadableByteChannel rbc = (ReadableByteChannel)ich;
+ buffer.clear();
+ int n = rbc.read(buffer);
+ log.println("Read returned " + n);
+ show(rbc);
+ if (rbc.isOpen()
+ && (n == -1)
+ && (rbc instanceof SocketChannel)
+ && ((SocketChannel)rbc).socket().isInputShutdown()) {
+ return;
+ }
+ throw new RuntimeException("Read succeeded");
+ }
+ };
+
+ static final Op READV = new Op("readv") {
+ void doIO(InterruptibleChannel ich) throws IOException {
+ ScatteringByteChannel sbc = (ScatteringByteChannel)ich;
+ clearBuffers();
+ int n = (int)sbc.read(buffers);
+ log.println("Read returned " + n);
+ show(sbc);
+ if (sbc.isOpen()
+ && (n == -1)
+ && (sbc instanceof SocketChannel)
+ && ((SocketChannel)sbc).socket().isInputShutdown()) {
+ return;
+ }
+ throw new RuntimeException("Read succeeded");
+ }
+ };
+
+ static final Op RECEIVE = new Op("receive") {
+ void doIO(InterruptibleChannel ich) throws IOException {
+ DatagramChannel dc = (DatagramChannel)ich;
+ buffer.clear();
+ dc.receive(buffer);
+ show(dc);
+ throw new RuntimeException("Read succeeded");
+ }
+ };
+
+ static final Op WRITE = new Op("write") {
+ void doIO(InterruptibleChannel ich) throws IOException {
+
+ WritableByteChannel wbc = (WritableByteChannel)ich;
+
+ SocketChannel sc = null;
+ if (wbc instanceof SocketChannel)
+ sc = (SocketChannel)wbc;
+
+ int n = 0;
+ for (;;) {
+ buffer.clear();
+ int d = wbc.write(buffer);
+ n += d;
+ if (!wbc.isOpen())
+ break;
+ if ((sc != null) && sc.socket().isOutputShutdown())
+ break;
+ }
+ log.println("Wrote " + n + " bytes");
+ show(wbc);
+ }
+ };
+
+ static final Op WRITEV = new Op("writev") {
+ void doIO(InterruptibleChannel ich) throws IOException {
+
+ GatheringByteChannel gbc = (GatheringByteChannel)ich;
+
+ SocketChannel sc = null;
+ if (gbc instanceof SocketChannel)
+ sc = (SocketChannel)gbc;
+
+ int n = 0;
+ for (;;) {
+ clearBuffers();
+ int d = (int)gbc.write(buffers);
+ n += d;
+ if (!gbc.isOpen())
+ break;
+ if ((sc != null) && sc.socket().isOutputShutdown())
+ break;
+ }
+ log.println("Wrote " + n + " bytes");
+ show(gbc);
+
+ }
+ };
+
+ static final Op CONNECT = new Op("connect") {
+ void setup() {
+ waitPump("connect waiting for pumping refuser ...");
+ }
+ void doIO(InterruptibleChannel ich) throws IOException {
+ SocketChannel sc = (SocketChannel)ich;
+ if (sc.connect(refuser.socket().getLocalSocketAddress()))
+ throw new RuntimeException("Connection succeeded");
+ throw new RuntimeException("Connection did not block");
+ }
+ };
+
+ static final Op FINISH_CONNECT = new Op("finishConnect") {
+ void setup() {
+ waitPump("finishConnect waiting for pumping refuser ...");
+ }
+ void doIO(InterruptibleChannel ich) throws IOException {
+ SocketChannel sc = (SocketChannel)ich;
+ sc.configureBlocking(false);
+ SocketAddress sa = refuser.socket().getLocalSocketAddress();
+ if (sc.connect(sa))
+ throw new RuntimeException("Connection succeeded");
+ sc.configureBlocking(true);
+ if (sc.finishConnect())
+ throw new RuntimeException("Connection succeeded");
+ throw new RuntimeException("Connection did not block");
+ }
+ };
+
+ static final Op ACCEPT = new Op("accept") {
+ void doIO(InterruptibleChannel ich) throws IOException {
+ ServerSocketChannel ssc = (ServerSocketChannel)ich;
+ ssc.accept();
+ throw new RuntimeException("Accept succeeded");
+ }
+ };
+
+ // Use only with diskFileChannelFactory
+ static final Op TRANSFER_TO = new Op("transferTo") {
+ void doIO(InterruptibleChannel ich) throws IOException {
+ FileChannel fc = (FileChannel)ich;
+ long n = fc.transferTo(0, fc.size(), deadSink);
+ log.println("Transferred " + n + " bytes");
+ show(fc);
+ }
+ };
+
+ // Use only with diskFileChannelFactory
+ static final Op TRANSFER_FROM = new Op("transferFrom") {
+ void doIO(InterruptibleChannel ich) throws IOException {
+ FileChannel fc = (FileChannel)ich;
+ long n = fc.transferFrom(deadSource, 0, 1 << 20);
+ log.println("Transferred " + n + " bytes");
+ show(fc);
+ }
+ };
+
+
+
+ // Test modes
+
+ static final int TEST_PREINTR = 0; // Interrupt thread before I/O
+ static final int TEST_INTR = 1; // Interrupt thread during I/O
+ static final int TEST_CLOSE = 2; // Close channel during I/O
+ static final int TEST_SHUTI = 3; // Shutdown input during I/O
+ static final int TEST_SHUTO = 4; // Shutdown output during I/O
+
+ static final String[] testName = new String[] {
+ "pre-interrupt", "interrupt", "close",
+ "shutdown-input", "shutdown-output"
+ };
+
+
+ static class Tester extends TestThread {
+
+ private InterruptibleChannel ch;
+ private Op op;
+ private int test;
+ volatile boolean ready = false;
+
+ protected Tester(ChannelFactory cf, InterruptibleChannel ch,
+ Op op, int test)
+ {
+ super(cf + "/" + op + "/" + testName[test]);
+ this.ch = ch;
+ this.op = op;
+ this.test = test;
+ }
+
+ @SuppressWarnings("fallthrough")
+ private void caught(Channel ch, IOException x) {
+ String xn = x.getClass().getName();
+ switch (test) {
+
+ case TEST_PREINTR:
+ case TEST_INTR:
+ if (!xn.equals("java.nio.channels.ClosedByInterruptException"))
+ throw new RuntimeException("Wrong exception thrown: " + x);
+ break;
+
+ case TEST_CLOSE:
+ case TEST_SHUTO:
+ if (!xn.equals("java.nio.channels.AsynchronousCloseException"))
+ throw new RuntimeException("Wrong exception thrown: " + x);
+ break;
+
+ case TEST_SHUTI:
+ if (TestUtil.onWindows())
+ break;
+ // FALL THROUGH
+
+ default:
+ throw new Error(x);
+ }
+
+ if (ch.isOpen()) {
+ if (test == TEST_SHUTO) {
+ SocketChannel sc = (SocketChannel)ch;
+ if (!sc.socket().isOutputShutdown())
+ throw new RuntimeException("Output not shutdown");
+ } else if ((test == TEST_INTR) && (op == TRANSFER_FROM)) {
+ // Let this case pass -- CBIE applies to other channel
+ } else {
+ throw new RuntimeException("Channel still open");
+ }
+ }
+
+ log.println("Thrown as expected: " + x);
+ }
+
+ final void go() throws Exception {
+ if (test == TEST_PREINTR)
+ Thread.currentThread().interrupt();
+ ready = true;
+ try {
+ op.doIO(ch);
+ } catch (ClosedByInterruptException x) {
+ caught(ch, x);
+ } catch (AsynchronousCloseException x) {
+ caught(ch, x);
+ } finally {
+ ch.close();
+ }
+ }
+
+ }
+
+ private static volatile boolean pumpDone = false;
+ private static volatile boolean pumpReady = false;
+
+ private static void waitPump(String msg){
+ log.println(msg);
+ while (!pumpReady){
+ sleep(200);
+ }
+ log.println(msg + " done");
+ }
+
+ // Create a pump thread dedicated to saturate refuser's connection backlog
+ private static Future<Integer> pumpRefuser(ExecutorService pumperExecutor) {
+
+ Callable<Integer> pumpTask = new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws IOException {
+ // Can't reliably saturate connection backlog on Windows Server editions
+ assert !TestUtil.onWindows();
+ log.println("Start pumping refuser ...");
+ List<SocketChannel> refuserClients = new ArrayList<>();
+
+ // Saturate the refuser's connection backlog so that further connection
+ // attempts will be blocked
+ pumpReady = false;
+ while (!pumpDone) {
+ SocketChannel sc = SocketChannel.open();
+ sc.configureBlocking(false);
+ boolean connected = sc.connect(refuser.socket().getLocalSocketAddress());
+
+ // Assume that the connection backlog is saturated if a
+ // client cannot connect to the refuser within 50 milliseconds
+ long start = System.currentTimeMillis();
+ while (!pumpReady && !connected
+ && (System.currentTimeMillis() - start < 50)) {
+ connected = sc.finishConnect();
+ }
+
+ if (connected) {
+ // Retain so that finalizer doesn't close
+ refuserClients.add(sc);
+ } else {
+ sc.close();
+ pumpReady = true;
+ }
+ }
+
+ for (SocketChannel sc : refuserClients) {
+ sc.close();
+ }
+ refuser.close();
+
+ log.println("Stop pumping refuser ...");
+ return refuserClients.size();
+ }
+ };
+
+ return pumperExecutor.submit(pumpTask);
+ }
+
+ // Test
+ static void test(ChannelFactory cf, Op op, int test) throws Exception {
+ test(cf, op, test, true);
+ }
+
+ static void test(ChannelFactory cf, Op op, int test, boolean extraSleep)
+ throws Exception
+ {
+ log.println();
+ initPipes();
+ InterruptibleChannel ch = cf.create();
+ Tester t = new Tester(cf, ch, op, test);
+ log.println(t);
+ op.setup();
+ t.start();
+ do {
+ sleep(50);
+ } while (!t.ready);
+
+ if (extraSleep) {
+ sleep(100);
+ }
+
+ switch (test) {
+
+ case TEST_INTR:
+ t.interrupt();
+ break;
+
+ case TEST_CLOSE:
+ ch.close();
+ break;
+
+ case TEST_SHUTI:
+ if (TestUtil.onWindows()) {
+ log.println("WARNING: Asynchronous shutdown not working on Windows");
+ ch.close();
+ } else {
+ ((SocketChannel)ch).socket().shutdownInput();
+ }
+ break;
+
+ case TEST_SHUTO:
+ if (TestUtil.onWindows()) {
+ log.println("WARNING: Asynchronous shutdown not working on Windows");
+ ch.close();
+ } else {
+ ((SocketChannel)ch).socket().shutdownOutput();
+ }
+ break;
+
+ default:
+ break;
+ }
+
+ t.finishAndThrow(10000);
+ }
+
+ static void test(ChannelFactory cf, Op op) throws Exception {
+ test(cf, op, true);
+ }
+
+ static void test(ChannelFactory cf, Op op, boolean extraSleep) throws Exception {
+ // Test INTR cases before PREINTER cases since sometimes
+ // interrupted threads can't load classes
+ test(cf, op, TEST_INTR, extraSleep);
+ test(cf, op, TEST_PREINTR, extraSleep);
+
+ // Bugs, see FileChannelImpl for details
+ if (op == TRANSFER_FROM) {
+ log.println("WARNING: transferFrom/close not tested");
+ return;
+ }
+ if ((op == TRANSFER_TO) && !TestUtil.onWindows()) {
+ log.println("WARNING: transferTo/close not tested");
+ return;
+ }
+
+ test(cf, op, TEST_CLOSE, extraSleep);
+ }
+
+ static void test(ChannelFactory cf)
+ throws Exception
+ {
+ InterruptibleChannel ch = cf.create(); // Sample channel
+ ch.close();
+
+ if (ch instanceof ReadableByteChannel) {
+ test(cf, READ);
+ if (ch instanceof SocketChannel)
+ test(cf, READ, TEST_SHUTI);
+ }
+
+ if (ch instanceof ScatteringByteChannel) {
+ test(cf, READV);
+ if (ch instanceof SocketChannel)
+ test(cf, READV, TEST_SHUTI);
+ }
+
+ if (ch instanceof DatagramChannel) {
+ test(cf, RECEIVE);
+
+ // Return here: We can't effectively test writes since, if they
+ // block, they do so only for a fleeting moment unless the network
+ // interface is overloaded.
+ return;
+
+ }
+
+ if (ch instanceof WritableByteChannel) {
+ test(cf, WRITE);
+ if (ch instanceof SocketChannel)
+ test(cf, WRITE, TEST_SHUTO);
+ }
+
+ if (ch instanceof GatheringByteChannel) {
+ test(cf, WRITEV);
+ if (ch instanceof SocketChannel)
+ test(cf, WRITEV, TEST_SHUTO);
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ wildcardAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0);
+ initAcceptor();
+ if (!TestUtil.onWindows())
+ initRefuser();
+ initPipes();
+ initFile();
+
+ if (TestUtil.onWindows()) {
+ log.println("WARNING: Cannot test FileChannel transfer operations"
+ + " on Windows");
+ } else {
+ test(diskFileChannelFactory, TRANSFER_TO);
+ test(diskFileChannelFactory, TRANSFER_FROM);
+ }
+ if (fifoFile != null)
+ test(fifoFileChannelFactory);
+
+ // Testing positional file reads and writes is impractical: It requires
+ // access to a large file soft-mounted via NFS, and even then isn't
+ // completely guaranteed to work.
+ //
+ // Testing map is impractical and arguably unnecessary: It's
+ // unclear under what conditions mmap(2) will actually block.
+
+ test(connectedSocketChannelFactory);
+
+ if (TestUtil.onWindows()) {
+ log.println("WARNING Cannot reliably test connect/finishConnect"
+ + " operations on Windows");
+ } else {
+ // Only the following tests need refuser's connection backlog
+ // to be saturated
+ ExecutorService pumperExecutor =
+ Executors.newSingleThreadExecutor(
+ new ThreadFactory() {
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ t.setName("Pumper");
+ return t;
+ }
+ });
+
+ pumpDone = false;
+ try {
+ Future<Integer> pumpFuture = pumpRefuser(pumperExecutor);
+ waitPump("\nWait for initial Pump");
+
+ test(socketChannelFactory, CONNECT, false);
+ test(socketChannelFactory, FINISH_CONNECT, false);
+
+ pumpDone = true;
+ Integer newConn = pumpFuture.get(30, TimeUnit.SECONDS);
+ log.println("Pump " + newConn + " connections.");
+ } finally {
+ pumperExecutor.shutdown();
+ }
+ }
+
+ test(serverSocketChannelFactory, ACCEPT);
+ test(datagramChannelFactory);
+ test(pipeSourceChannelFactory);
+ test(pipeSinkChannelFactory);
+ }
+}