6693490: (se) select throws "File exists" IOException under load (lnx)
authoralanb
Fri, 27 Mar 2009 15:24:37 +0000
changeset 2433 9f0ab7f726b8
parent 2427 f35f516befc3
child 2434 aae0b77424ae
6693490: (se) select throws "File exists" IOException under load (lnx) Reviewed-by: sherman
jdk/src/share/classes/sun/nio/ch/SelChImpl.java
jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java
jdk/src/solaris/classes/sun/nio/ch/EPollSelectorImpl.java
jdk/test/java/nio/channels/Selector/RegAfterPreClose.java
--- a/jdk/src/share/classes/sun/nio/ch/SelChImpl.java	Tue Mar 24 14:10:38 2009 +0000
+++ b/jdk/src/share/classes/sun/nio/ch/SelChImpl.java	Fri Mar 27 15:24:37 2009 +0000
@@ -25,6 +25,7 @@
 
 package sun.nio.ch;
 
+import java.nio.channels.Channel;
 import java.io.FileDescriptor;
 import java.io.IOException;
 
@@ -35,7 +36,7 @@
  * @since 1.4
  */
 
-interface SelChImpl {
+interface SelChImpl extends Channel {
 
     FileDescriptor getFD();
 
--- a/jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java	Tue Mar 24 14:10:38 2009 +0000
+++ b/jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java	Fri Mar 27 15:24:37 2009 +0000
@@ -78,8 +78,8 @@
     // Base address of the native pollArray
     private final long pollArrayAddress;
 
-    // Set of "idle" file descriptors
-    private final HashSet<Integer> idleSet;
+    // Set of "idle" channels
+    private final HashSet<SelChImpl> idleSet;
 
     EPollArrayWrapper() {
         // creates the epoll file descriptor
@@ -96,19 +96,22 @@
         }
 
         // create idle set
-        idleSet = new HashSet<Integer>();
+        idleSet = new HashSet<SelChImpl>();
     }
 
     // Used to update file description registrations
     private static class Updator {
+        SelChImpl channel;
         int opcode;
-        int fd;
         int events;
-        Updator(int opcode, int fd, int events) {
+        Updator(SelChImpl channel, int opcode, int events) {
+            this.channel = channel;
             this.opcode = opcode;
-            this.fd = fd;
             this.events = events;
         }
+        Updator(SelChImpl channel, int opcode) {
+            this(channel, opcode, 0);
+        }
     }
 
     private LinkedList<Updator> updateList = new LinkedList<Updator>();
@@ -163,60 +166,54 @@
     }
 
     /**
-     * Update the events for a given file descriptor.
+     * Update the events for a given channel.
      */
-    void setInterest(int fd, int mask) {
+    void setInterest(SelChImpl channel, int mask) {
         synchronized (updateList) {
-
-            // if the interest events are 0 then add to idle set, and delete
-            // from epoll if registered (or pending)
-            if (mask == 0) {
-                if (idleSet.add(fd)) {
-                    updateList.add(new Updator(EPOLL_CTL_DEL, fd, 0));
-                }
-                return;
-            }
-
-            // if file descriptor is idle then add to epoll
-            if (!idleSet.isEmpty() && idleSet.remove(fd)) {
-                updateList.add(new Updator(EPOLL_CTL_ADD, fd, mask));
-                return;
-            }
-
             // if the previous pending operation is to add this file descriptor
             // to epoll then update its event set
             if (updateList.size() > 0) {
                 Updator last = updateList.getLast();
-                if (last.fd == fd && last.opcode == EPOLL_CTL_ADD) {
+                if (last.channel == channel && last.opcode == EPOLL_CTL_ADD) {
                     last.events = mask;
                     return;
                 }
             }
 
             // update existing registration
-            updateList.add(new Updator(EPOLL_CTL_MOD, fd, mask));
+            updateList.add(new Updator(channel, EPOLL_CTL_MOD, mask));
+        }
+    }
+
+    /**
+     * Add a channel's file descriptor to epoll
+     */
+    void add(SelChImpl channel) {
+        synchronized (updateList) {
+            updateList.add(new Updator(channel, EPOLL_CTL_ADD));
         }
     }
 
     /**
-     * Add a new file descriptor to epoll
+     * Remove a channel's file descriptor from epoll
      */
-    void add(int fd) {
+    void release(SelChImpl channel) {
         synchronized (updateList) {
-            updateList.add(new Updator(EPOLL_CTL_ADD, fd, 0));
-        }
-    }
+            // flush any pending updates
+            int i = 0;
+            while (i < updateList.size()) {
+                if (updateList.get(i).channel == channel) {
+                    updateList.remove(i);
+                } else {
+                    i++;
+                }
+            }
 
-    /**
-     * Remove a file descriptor from epoll
-     */
-    void release(int fd) {
-        synchronized (updateList) {
-            // if file descriptor is idle then remove from idle set, otherwise
-            // delete from epoll
-            if (!idleSet.remove(fd)) {
-                updateList.add(new Updator(EPOLL_CTL_DEL, fd, 0));
-            }
+            // remove from the idle set (if present)
+            idleSet.remove(channel);
+
+            // remove from epoll (if registered)
+            epollCtl(epfd, EPOLL_CTL_DEL, channel.getFDVal(), 0);
         }
     }
 
@@ -248,7 +245,26 @@
         synchronized (updateList) {
             Updator u = null;
             while ((u = updateList.poll()) != null) {
-                epollCtl(epfd, u.opcode, u.fd, u.events);
+                SelChImpl ch = u.channel;
+                if (!ch.isOpen())
+                    continue;
+
+                // if the events are 0 then file descriptor is put into "idle
+                // set" to prevent it being polled
+                if (u.events == 0) {
+                    boolean added = idleSet.add(u.channel);
+                    // if added to idle set then remove from epoll if registered
+                    if (added && (u.opcode == EPOLL_CTL_MOD))
+                        epollCtl(epfd, EPOLL_CTL_DEL, ch.getFDVal(), 0);
+                } else {
+                    // events are specified. If file descriptor was in idle set
+                    // it must be re-registered (by converting opcode to ADD)
+                    boolean idle = false;
+                    if (!idleSet.isEmpty())
+                        idle = idleSet.remove(u.channel);
+                    int opcode = (idle) ? EPOLL_CTL_ADD : u.opcode;
+                    epollCtl(epfd, opcode, ch.getFDVal(), u.events);
+                }
             }
         }
     }
--- a/jdk/src/solaris/classes/sun/nio/ch/EPollSelectorImpl.java	Tue Mar 24 14:10:38 2009 +0000
+++ b/jdk/src/solaris/classes/sun/nio/ch/EPollSelectorImpl.java	Fri Mar 27 15:24:37 2009 +0000
@@ -139,7 +139,6 @@
         FileDispatcherImpl.closeIntFD(fd0);
         FileDispatcherImpl.closeIntFD(fd1);
 
-        pollWrapper.release(fd0);
         pollWrapper.closeEPollFD();
         // it is possible
         selectedKeys = null;
@@ -162,17 +161,18 @@
     protected void implRegister(SelectionKeyImpl ski) {
         if (closed)
             throw new ClosedSelectorException();
-        int fd = IOUtil.fdVal(ski.channel.getFD());
-        fdToKey.put(Integer.valueOf(fd), ski);
-        pollWrapper.add(fd);
+        SelChImpl ch = ski.channel;
+        fdToKey.put(Integer.valueOf(ch.getFDVal()), ski);
+        pollWrapper.add(ch);
         keys.add(ski);
     }
 
     protected void implDereg(SelectionKeyImpl ski) throws IOException {
         assert (ski.getIndex() >= 0);
-        int fd = ski.channel.getFDVal();
+        SelChImpl ch = ski.channel;
+        int fd = ch.getFDVal();
         fdToKey.remove(Integer.valueOf(fd));
-        pollWrapper.release(fd);
+        pollWrapper.release(ch);
         ski.setIndex(-1);
         keys.remove(ski);
         selectedKeys.remove(ski);
@@ -185,8 +185,7 @@
     void putEventOps(SelectionKeyImpl sk, int ops) {
         if (closed)
             throw new ClosedSelectorException();
-        int fd = IOUtil.fdVal(sk.channel.getFD());
-        pollWrapper.setInterest(fd, ops);
+        pollWrapper.setInterest(sk.channel, ops);
     }
 
     public Selector wakeup() {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/nio/channels/Selector/RegAfterPreClose.java	Fri Mar 27 15:24:37 2009 +0000
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2009 Sun Microsystems, Inc.  All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+/* @test
+ * @bug 6693490
+ * @summary Pre-close file descriptor may inadvertently get registered with
+ *     epoll during close
+ */
+
+import java.net.*;
+import java.nio.channels.*;
+import java.util.concurrent.*;
+import java.util.*;
+import java.io.IOException;
+
+public class RegAfterPreClose {
+
+    static volatile boolean done;
+
+    /**
+     * A task that continuously connects to a given address and immediately
+     * closes the connection.
+     */
+    static class Connector implements Runnable {
+        private final SocketAddress sa;
+        Connector(int port) throws IOException {
+            InetAddress lh = InetAddress.getLocalHost();
+            this.sa = new InetSocketAddress(lh, port);
+        }
+        public void run() {
+            while (!done) {
+                try {
+                    SocketChannel.open(sa).close();
+                } catch (IOException x) {
+                    // back-off as probably resource related
+                    try {
+                        Thread.sleep(10);
+                    } catch (InterruptedException  ignore) { }
+                }
+            }
+        }
+    }
+
+    /**
+     * A task that closes a channel.
+     */
+    static class Closer implements Runnable {
+        private final Channel channel;
+        Closer(Channel sc) {
+            this.channel = sc;
+        }
+        public void run() {
+            try {
+                channel.close();
+            } catch (IOException ignore) { }
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        // create listener
+        InetSocketAddress isa = new InetSocketAddress(0);
+        ServerSocketChannel ssc = ServerSocketChannel.open();
+        ssc.socket().bind(isa);
+
+        // register with Selector
+        final Selector sel = Selector.open();
+        ssc.configureBlocking(false);
+        SelectionKey key = ssc.register(sel, SelectionKey.OP_ACCEPT);
+
+        ThreadFactory factory = new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread t = new Thread(r);
+                t.setDaemon(true);
+                return t;
+            }
+        };
+
+        // schedule test to run for 1 minute
+        Executors.newScheduledThreadPool(1, factory).schedule(new Runnable() {
+            public void run() {
+                done = true;
+                sel.wakeup();
+            }}, 1, TimeUnit.MINUTES);
+
+        // create Executor that handles tasks that closes channels
+        // "asynchronously" - this creates the conditions to provoke the bug.
+        Executor executor = Executors.newFixedThreadPool(2, factory);
+
+        // submit task that connects to listener
+        executor.execute(new Connector(ssc.socket().getLocalPort()));
+
+        // loop accepting connections until done (or an IOException is thrown)
+        while (!done) {
+            sel.select();
+            if (key.isAcceptable()) {
+                SocketChannel sc = ssc.accept();
+                if (sc != null) {
+                    sc.configureBlocking(false);
+                    sc.register(sel, SelectionKey.OP_READ);
+                    executor.execute(new Closer(sc));
+                }
+            }
+            sel.selectedKeys().clear();
+        }
+    }
+}