--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/solaris/classes/sun/nio/ch/SolarisEventPort.java Sun Feb 15 12:25:54 2009 +0000
@@ -0,0 +1,244 @@
+/*
+ * Copyright 2008-2009 Sun Microsystems, Inc. All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+package sun.nio.ch;
+
+import java.nio.channels.spi.AsynchronousChannelProvider;
+import java.util.concurrent.RejectedExecutionException;
+import java.io.IOException;
+import sun.misc.Unsafe;
+
+/**
+ * AsynchronousChannelGroup implementation based on the Solaris 10 event port
+ * framework.
+ */
+
+class SolarisEventPort
+ extends Port
+{
+ private static final Unsafe unsafe = Unsafe.getUnsafe();
+ private static final int addressSize = unsafe.addressSize();
+
+ private static int dependsArch(int value32, int value64) {
+ return (addressSize == 4) ? value32 : value64;
+ }
+
+ /*
+ * typedef struct port_event {
+ * int portev_events;
+ * ushort_t portev_source;
+ * ushort_t portev_pad;
+ * uintptr_t portev_object;
+ * void *portev_user;
+ * } port_event_t;
+ */
+ private static final int SIZEOF_PORT_EVENT = dependsArch(16, 24);
+ private static final int OFFSETOF_EVENTS = 0;
+ private static final int OFFSETOF_SOURCE = 4;
+ private static final int OFFSETOF_OBJECT = 8;
+
+ // port sources
+ private static final short PORT_SOURCE_USER = 3;
+ private static final short PORT_SOURCE_FD = 4;
+
+ // file descriptor to event port.
+ private final int port;
+
+ // true when port is closed
+ private boolean closed;
+
+ SolarisEventPort(AsynchronousChannelProvider provider, ThreadPool pool)
+ throws IOException
+ {
+ super(provider, pool);
+
+ // create event port
+ this.port = portCreate();
+ }
+
+ SolarisEventPort start() {
+ startThreads(new EventHandlerTask());
+ return this;
+ }
+
+ // releass resources
+ private void implClose() {
+ synchronized (this) {
+ if (closed)
+ return;
+ closed = true;
+ }
+ portClose(port);
+ }
+
+ private void wakeup() {
+ try {
+ portSend(port, 0);
+ } catch (IOException x) {
+ throw new AssertionError(x);
+ }
+ }
+
+ @Override
+ void executeOnHandlerTask(Runnable task) {
+ synchronized (this) {
+ if (closed)
+ throw new RejectedExecutionException();
+ offerTask(task);
+ wakeup();
+ }
+ }
+
+ @Override
+ void shutdownHandlerTasks() {
+ /*
+ * If no tasks are running then just release resources; otherwise
+ * write to the one end of the socketpair to wakeup any polling threads..
+ */
+ int nThreads = threadCount();
+ if (nThreads == 0) {
+ implClose();
+ } else {
+ // send user event to wakeup each thread
+ while (nThreads-- > 0) {
+ try {
+ portSend(port, 0);
+ } catch (IOException x) {
+ throw new AssertionError(x);
+ }
+ }
+ }
+ }
+
+ @Override
+ void startPoll(int fd, int events) {
+ // (re-)associate file descriptor
+ // no need to translate events
+ try {
+ portAssociate(port, PORT_SOURCE_FD, fd, events);
+ } catch (IOException x) {
+ throw new AssertionError(); // should not happen
+ }
+ }
+
+ /*
+ * Task to read a single event from the port and dispatch it to the
+ * channel's onEvent handler.
+ */
+ private class EventHandlerTask implements Runnable {
+ public void run() {
+ Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
+ Invoker.getGroupAndInvokeCount();
+ boolean replaceMe = false;
+ long address = unsafe.allocateMemory(SIZEOF_PORT_EVENT);
+ try {
+ for (;;) {
+ // reset invoke count
+ if (myGroupAndInvokeCount != null)
+ myGroupAndInvokeCount.resetInvokeCount();
+
+ // wait for I/O completion event
+ // A error here is fatal (thread will not be replaced)
+ replaceMe = false;
+ try {
+ portGet(port, address);
+ } catch (IOException x) {
+ x.printStackTrace();
+ return;
+ }
+
+ // event source
+ short source = unsafe.getShort(address + OFFSETOF_SOURCE);
+ if (source != PORT_SOURCE_FD) {
+ // user event is trigger to invoke task or shutdown
+ if (source == PORT_SOURCE_USER) {
+ Runnable task = pollTask();
+ if (task == null) {
+ // shutdown request
+ return;
+ }
+ // run task (may throw error/exception)
+ replaceMe = true;
+ task.run();
+ }
+ // ignore
+ continue;
+ }
+
+ // pe->portev_object is file descriptor
+ int fd = (int)unsafe.getAddress(address + OFFSETOF_OBJECT);
+ // pe->portev_events
+ int events = unsafe.getInt(address + OFFSETOF_EVENTS);
+
+ // lookup channel
+ PollableChannel ch;
+ fdToChannelLock.readLock().lock();
+ try {
+ ch = fdToChannel.get(fd);
+ } finally {
+ fdToChannelLock.readLock().unlock();
+ }
+
+ // notify channel
+ if (ch != null) {
+ replaceMe = true;
+ // no need to translate events
+ ch.onEvent(events);
+ }
+ }
+ } finally {
+ // free per-thread resources
+ unsafe.freeMemory(address);
+ // last task to exit when shutdown release resources
+ int remaining = threadExit(this, replaceMe);
+ if (remaining == 0 && isShutdown())
+ implClose();
+ }
+ }
+ }
+
+ // -- Native methods --
+
+ private static native void init();
+
+ private static native int portCreate() throws IOException;
+
+ private static native void portAssociate(int port, int source, long object,
+ int events) throws IOException;
+
+ private static native void portGet(int port, long pe) throws IOException;
+
+ private static native int portGetn(int port, long address, int max)
+ throws IOException;
+
+ private static native void portSend(int port, int events) throws IOException;
+
+ private static native void portClose(int port);
+
+ static {
+ Util.load();
+ init();
+ }
+}