jdk/src/solaris/classes/sun/nio/ch/SolarisEventPort.java
changeset 2057 3acf8e5e2ca0
child 3632 399359a027de
equal deleted inserted replaced
2056:115e09b7a004 2057:3acf8e5e2ca0
       
     1 /*
       
     2  * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Sun designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Sun in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    22  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    23  * have any questions.
       
    24  */
       
    25 
       
    26 package sun.nio.ch;
       
    27 
       
    28 import java.nio.channels.spi.AsynchronousChannelProvider;
       
    29 import java.util.concurrent.RejectedExecutionException;
       
    30 import java.io.IOException;
       
    31 import sun.misc.Unsafe;
       
    32 
       
    33 /**
       
    34  * AsynchronousChannelGroup implementation based on the Solaris 10 event port
       
    35  * framework.
       
    36  */
       
    37 
       
    38 class SolarisEventPort
       
    39     extends Port
       
    40 {
       
    41     private static final Unsafe unsafe = Unsafe.getUnsafe();
       
    42     private static final int addressSize = unsafe.addressSize();
       
    43 
       
    44     private static int dependsArch(int value32, int value64) {
       
    45         return (addressSize == 4) ? value32 : value64;
       
    46     }
       
    47 
       
    48     /*
       
    49      * typedef struct port_event {
       
    50      *     int             portev_events;
       
    51      *     ushort_t        portev_source;
       
    52      *     ushort_t        portev_pad;
       
    53      *     uintptr_t       portev_object;
       
    54      *     void            *portev_user;
       
    55      * } port_event_t;
       
    56      */
       
    57     private static final int SIZEOF_PORT_EVENT  = dependsArch(16, 24);
       
    58     private static final int OFFSETOF_EVENTS    = 0;
       
    59     private static final int OFFSETOF_SOURCE    = 4;
       
    60     private static final int OFFSETOF_OBJECT    = 8;
       
    61 
       
    62     // port sources
       
    63     private static final short PORT_SOURCE_USER     = 3;
       
    64     private static final short PORT_SOURCE_FD       = 4;
       
    65 
       
    66     // file descriptor to event port.
       
    67     private final int port;
       
    68 
       
    69     // true when port is closed
       
    70     private boolean closed;
       
    71 
       
    72     SolarisEventPort(AsynchronousChannelProvider provider, ThreadPool pool)
       
    73         throws IOException
       
    74     {
       
    75         super(provider, pool);
       
    76 
       
    77         // create event port
       
    78         this.port = portCreate();
       
    79     }
       
    80 
       
    81     SolarisEventPort start() {
       
    82         startThreads(new EventHandlerTask());
       
    83         return this;
       
    84     }
       
    85 
       
    86     // releass resources
       
    87     private void implClose() {
       
    88         synchronized (this) {
       
    89             if (closed)
       
    90                 return;
       
    91             closed = true;
       
    92         }
       
    93         portClose(port);
       
    94     }
       
    95 
       
    96     private void wakeup() {
       
    97         try {
       
    98             portSend(port, 0);
       
    99         } catch (IOException x) {
       
   100             throw new AssertionError(x);
       
   101         }
       
   102     }
       
   103 
       
   104     @Override
       
   105     void executeOnHandlerTask(Runnable task) {
       
   106         synchronized (this) {
       
   107             if (closed)
       
   108                 throw new RejectedExecutionException();
       
   109             offerTask(task);
       
   110             wakeup();
       
   111         }
       
   112     }
       
   113 
       
   114     @Override
       
   115     void shutdownHandlerTasks() {
       
   116        /*
       
   117          * If no tasks are running then just release resources; otherwise
       
   118          * write to the one end of the socketpair to wakeup any polling threads..
       
   119          */
       
   120         int nThreads = threadCount();
       
   121         if (nThreads == 0) {
       
   122             implClose();
       
   123         } else {
       
   124             // send user event to wakeup each thread
       
   125             while (nThreads-- > 0) {
       
   126                 try {
       
   127                     portSend(port, 0);
       
   128                 } catch (IOException x) {
       
   129                     throw new AssertionError(x);
       
   130                 }
       
   131             }
       
   132         }
       
   133     }
       
   134 
       
   135     @Override
       
   136     void startPoll(int fd, int events) {
       
   137         // (re-)associate file descriptor
       
   138         // no need to translate events
       
   139         try {
       
   140             portAssociate(port, PORT_SOURCE_FD, fd, events);
       
   141         } catch (IOException x) {
       
   142             throw new AssertionError();     // should not happen
       
   143         }
       
   144     }
       
   145 
       
   146     /*
       
   147      * Task to read a single event from the port and dispatch it to the
       
   148      * channel's onEvent handler.
       
   149      */
       
   150     private class EventHandlerTask implements Runnable {
       
   151         public void run() {
       
   152             Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
       
   153                 Invoker.getGroupAndInvokeCount();
       
   154             boolean replaceMe = false;
       
   155             long address = unsafe.allocateMemory(SIZEOF_PORT_EVENT);
       
   156             try {
       
   157                 for (;;) {
       
   158                     // reset invoke count
       
   159                     if (myGroupAndInvokeCount != null)
       
   160                         myGroupAndInvokeCount.resetInvokeCount();
       
   161 
       
   162                     // wait for I/O completion event
       
   163                     // A error here is fatal (thread will not be replaced)
       
   164                     replaceMe = false;
       
   165                     try {
       
   166                         portGet(port, address);
       
   167                     } catch (IOException x) {
       
   168                         x.printStackTrace();
       
   169                         return;
       
   170                     }
       
   171 
       
   172                     // event source
       
   173                     short source = unsafe.getShort(address + OFFSETOF_SOURCE);
       
   174                     if (source != PORT_SOURCE_FD) {
       
   175                         // user event is trigger to invoke task or shutdown
       
   176                         if (source == PORT_SOURCE_USER) {
       
   177                             Runnable task = pollTask();
       
   178                             if (task == null) {
       
   179                                 // shutdown request
       
   180                                 return;
       
   181                             }
       
   182                             // run task (may throw error/exception)
       
   183                             replaceMe = true;
       
   184                             task.run();
       
   185                         }
       
   186                         // ignore
       
   187                         continue;
       
   188                     }
       
   189 
       
   190                     // pe->portev_object is file descriptor
       
   191                     int fd = (int)unsafe.getAddress(address + OFFSETOF_OBJECT);
       
   192                     // pe->portev_events
       
   193                     int events = unsafe.getInt(address + OFFSETOF_EVENTS);
       
   194 
       
   195                     // lookup channel
       
   196                     PollableChannel ch;
       
   197                     fdToChannelLock.readLock().lock();
       
   198                     try {
       
   199                         ch = fdToChannel.get(fd);
       
   200                     } finally {
       
   201                         fdToChannelLock.readLock().unlock();
       
   202                     }
       
   203 
       
   204                     // notify channel
       
   205                     if (ch != null) {
       
   206                         replaceMe = true;
       
   207                         // no need to translate events
       
   208                         ch.onEvent(events);
       
   209                     }
       
   210                 }
       
   211             } finally {
       
   212                 // free per-thread resources
       
   213                 unsafe.freeMemory(address);
       
   214                 // last task to exit when shutdown release resources
       
   215                 int remaining = threadExit(this, replaceMe);
       
   216                 if (remaining == 0 && isShutdown())
       
   217                     implClose();
       
   218             }
       
   219         }
       
   220     }
       
   221 
       
   222     // -- Native methods --
       
   223 
       
   224     private static native void init();
       
   225 
       
   226     private static native int portCreate() throws IOException;
       
   227 
       
   228     private static native void portAssociate(int port, int source, long object,
       
   229         int events) throws IOException;
       
   230 
       
   231     private static native void portGet(int port, long pe) throws IOException;
       
   232 
       
   233     private static native int portGetn(int port, long address, int max)
       
   234         throws IOException;
       
   235 
       
   236     private static native void portSend(int port, int events) throws IOException;
       
   237 
       
   238     private static native void portClose(int port);
       
   239 
       
   240     static {
       
   241         Util.load();
       
   242         init();
       
   243     }
       
   244 }