jdk/src/solaris/classes/sun/nio/ch/SolarisEventPort.java
changeset 2057 3acf8e5e2ca0
child 3632 399359a027de
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/solaris/classes/sun/nio/ch/SolarisEventPort.java	Sun Feb 15 12:25:54 2009 +0000
@@ -0,0 +1,244 @@
+/*
+ * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+package sun.nio.ch;
+
+import java.nio.channels.spi.AsynchronousChannelProvider;
+import java.util.concurrent.RejectedExecutionException;
+import java.io.IOException;
+import sun.misc.Unsafe;
+
+/**
+ * AsynchronousChannelGroup implementation based on the Solaris 10 event port
+ * framework.
+ */
+
+class SolarisEventPort
+    extends Port
+{
+    private static final Unsafe unsafe = Unsafe.getUnsafe();
+    private static final int addressSize = unsafe.addressSize();
+
+    private static int dependsArch(int value32, int value64) {
+        return (addressSize == 4) ? value32 : value64;
+    }
+
+    /*
+     * typedef struct port_event {
+     *     int             portev_events;
+     *     ushort_t        portev_source;
+     *     ushort_t        portev_pad;
+     *     uintptr_t       portev_object;
+     *     void            *portev_user;
+     * } port_event_t;
+     */
+    private static final int SIZEOF_PORT_EVENT  = dependsArch(16, 24);
+    private static final int OFFSETOF_EVENTS    = 0;
+    private static final int OFFSETOF_SOURCE    = 4;
+    private static final int OFFSETOF_OBJECT    = 8;
+
+    // port sources
+    private static final short PORT_SOURCE_USER     = 3;
+    private static final short PORT_SOURCE_FD       = 4;
+
+    // file descriptor to event port.
+    private final int port;
+
+    // true when port is closed
+    private boolean closed;
+
+    SolarisEventPort(AsynchronousChannelProvider provider, ThreadPool pool)
+        throws IOException
+    {
+        super(provider, pool);
+
+        // create event port
+        this.port = portCreate();
+    }
+
+    SolarisEventPort start() {
+        startThreads(new EventHandlerTask());
+        return this;
+    }
+
+    // releass resources
+    private void implClose() {
+        synchronized (this) {
+            if (closed)
+                return;
+            closed = true;
+        }
+        portClose(port);
+    }
+
+    private void wakeup() {
+        try {
+            portSend(port, 0);
+        } catch (IOException x) {
+            throw new AssertionError(x);
+        }
+    }
+
+    @Override
+    void executeOnHandlerTask(Runnable task) {
+        synchronized (this) {
+            if (closed)
+                throw new RejectedExecutionException();
+            offerTask(task);
+            wakeup();
+        }
+    }
+
+    @Override
+    void shutdownHandlerTasks() {
+       /*
+         * If no tasks are running then just release resources; otherwise
+         * write to the one end of the socketpair to wakeup any polling threads..
+         */
+        int nThreads = threadCount();
+        if (nThreads == 0) {
+            implClose();
+        } else {
+            // send user event to wakeup each thread
+            while (nThreads-- > 0) {
+                try {
+                    portSend(port, 0);
+                } catch (IOException x) {
+                    throw new AssertionError(x);
+                }
+            }
+        }
+    }
+
+    @Override
+    void startPoll(int fd, int events) {
+        // (re-)associate file descriptor
+        // no need to translate events
+        try {
+            portAssociate(port, PORT_SOURCE_FD, fd, events);
+        } catch (IOException x) {
+            throw new AssertionError();     // should not happen
+        }
+    }
+
+    /*
+     * Task to read a single event from the port and dispatch it to the
+     * channel's onEvent handler.
+     */
+    private class EventHandlerTask implements Runnable {
+        public void run() {
+            Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
+                Invoker.getGroupAndInvokeCount();
+            boolean replaceMe = false;
+            long address = unsafe.allocateMemory(SIZEOF_PORT_EVENT);
+            try {
+                for (;;) {
+                    // reset invoke count
+                    if (myGroupAndInvokeCount != null)
+                        myGroupAndInvokeCount.resetInvokeCount();
+
+                    // wait for I/O completion event
+                    // A error here is fatal (thread will not be replaced)
+                    replaceMe = false;
+                    try {
+                        portGet(port, address);
+                    } catch (IOException x) {
+                        x.printStackTrace();
+                        return;
+                    }
+
+                    // event source
+                    short source = unsafe.getShort(address + OFFSETOF_SOURCE);
+                    if (source != PORT_SOURCE_FD) {
+                        // user event is trigger to invoke task or shutdown
+                        if (source == PORT_SOURCE_USER) {
+                            Runnable task = pollTask();
+                            if (task == null) {
+                                // shutdown request
+                                return;
+                            }
+                            // run task (may throw error/exception)
+                            replaceMe = true;
+                            task.run();
+                        }
+                        // ignore
+                        continue;
+                    }
+
+                    // pe->portev_object is file descriptor
+                    int fd = (int)unsafe.getAddress(address + OFFSETOF_OBJECT);
+                    // pe->portev_events
+                    int events = unsafe.getInt(address + OFFSETOF_EVENTS);
+
+                    // lookup channel
+                    PollableChannel ch;
+                    fdToChannelLock.readLock().lock();
+                    try {
+                        ch = fdToChannel.get(fd);
+                    } finally {
+                        fdToChannelLock.readLock().unlock();
+                    }
+
+                    // notify channel
+                    if (ch != null) {
+                        replaceMe = true;
+                        // no need to translate events
+                        ch.onEvent(events);
+                    }
+                }
+            } finally {
+                // free per-thread resources
+                unsafe.freeMemory(address);
+                // last task to exit when shutdown release resources
+                int remaining = threadExit(this, replaceMe);
+                if (remaining == 0 && isShutdown())
+                    implClose();
+            }
+        }
+    }
+
+    // -- Native methods --
+
+    private static native void init();
+
+    private static native int portCreate() throws IOException;
+
+    private static native void portAssociate(int port, int source, long object,
+        int events) throws IOException;
+
+    private static native void portGet(int port, long pe) throws IOException;
+
+    private static native int portGetn(int port, long address, int max)
+        throws IOException;
+
+    private static native void portSend(int port, int events) throws IOException;
+
+    private static native void portClose(int port);
+
+    static {
+        Util.load();
+        init();
+    }
+}