jdk/src/solaris/classes/sun/nio/ch/EPollPort.java
changeset 2057 3acf8e5e2ca0
child 3632 399359a027de
equal deleted inserted replaced
2056:115e09b7a004 2057:3acf8e5e2ca0
       
     1 /*
       
     2  * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Sun designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Sun in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
       
    22  * CA 95054 USA or visit www.sun.com if you need additional information or
       
    23  * have any questions.
       
    24  */
       
    25 
       
    26 package sun.nio.ch;
       
    27 
       
    28 import java.nio.channels.spi.AsynchronousChannelProvider;
       
    29 import java.io.IOException;
       
    30 import java.util.concurrent.ArrayBlockingQueue;
       
    31 import java.util.concurrent.RejectedExecutionException;
       
    32 import java.util.concurrent.atomic.AtomicInteger;
       
    33 import static sun.nio.ch.EPoll.*;
       
    34 
       
    35 /**
       
    36  * AsynchronousChannelGroup implementation based on the Linux epoll facility.
       
    37  */
       
    38 
       
    39 final class EPollPort
       
    40     extends Port
       
    41 {
       
    42     // maximum number of events to poll at a time
       
    43     private static final int MAX_EPOLL_EVENTS = 512;
       
    44 
       
    45     // errors
       
    46     private static final int ENOENT     = 2;
       
    47 
       
    48     // epoll file descriptor
       
    49     private final int epfd;
       
    50 
       
    51     // true if epoll closed
       
    52     private boolean closed;
       
    53 
       
    54     // socket pair used for wakeup
       
    55     private final int sp[];
       
    56 
       
    57     // number of wakeups pending
       
    58     private final AtomicInteger wakeupCount = new AtomicInteger();
       
    59 
       
    60     // address of the poll array passed to epoll_wait
       
    61     private final long address;
       
    62 
       
    63     // encapsulates an event for a channel
       
    64     static class Event {
       
    65         final PollableChannel channel;
       
    66         final int events;
       
    67 
       
    68         Event(PollableChannel channel, int events) {
       
    69             this.channel = channel;
       
    70             this.events = events;
       
    71         }
       
    72 
       
    73         PollableChannel channel()   { return channel; }
       
    74         int events()                { return events; }
       
    75     }
       
    76 
       
    77     // queue of events for cases that a polling thread dequeues more than one
       
    78     // event
       
    79     private final ArrayBlockingQueue<Event> queue;
       
    80     private final Event NEED_TO_POLL = new Event(null, 0);
       
    81     private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
       
    82 
       
    83     EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
       
    84         throws IOException
       
    85     {
       
    86         super(provider, pool);
       
    87 
       
    88         // open epoll
       
    89         this.epfd = epollCreate();
       
    90 
       
    91         // create socket pair for wakeup mechanism
       
    92         int[] sv = new int[2];
       
    93         try {
       
    94             socketpair(sv);
       
    95             // register one end with epoll
       
    96             epollCtl(epfd, EPOLL_CTL_ADD, sv[0], POLLIN);
       
    97         } catch (IOException x) {
       
    98             close0(epfd);
       
    99             throw x;
       
   100         }
       
   101         this.sp = sv;
       
   102 
       
   103         // allocate the poll array
       
   104         this.address = allocatePollArray(MAX_EPOLL_EVENTS);
       
   105 
       
   106         // create the queue and offer the special event to ensure that the first
       
   107         // threads polls
       
   108         this.queue = new ArrayBlockingQueue<Event>(MAX_EPOLL_EVENTS);
       
   109         this.queue.offer(NEED_TO_POLL);
       
   110     }
       
   111 
       
   112     EPollPort start() {
       
   113         startThreads(new EventHandlerTask());
       
   114         return this;
       
   115     }
       
   116 
       
   117     /**
       
   118      * Release all resources
       
   119      */
       
   120     private void implClose() {
       
   121         synchronized (this) {
       
   122             if (closed)
       
   123                 return;
       
   124             closed = true;
       
   125         }
       
   126         freePollArray(address);
       
   127         close0(sp[0]);
       
   128         close0(sp[1]);
       
   129         close0(epfd);
       
   130     }
       
   131 
       
   132     private void wakeup() {
       
   133         if (wakeupCount.incrementAndGet() == 1) {
       
   134             // write byte to socketpair to force wakeup
       
   135             try {
       
   136                 interrupt(sp[1]);
       
   137             } catch (IOException x) {
       
   138                 throw new AssertionError(x);
       
   139             }
       
   140         }
       
   141     }
       
   142 
       
   143     @Override
       
   144     void executeOnHandlerTask(Runnable task) {
       
   145         synchronized (this) {
       
   146             if (closed)
       
   147                 throw new RejectedExecutionException();
       
   148             offerTask(task);
       
   149             wakeup();
       
   150         }
       
   151     }
       
   152 
       
   153     @Override
       
   154     void shutdownHandlerTasks() {
       
   155         /*
       
   156          * If no tasks are running then just release resources; otherwise
       
   157          * write to the one end of the socketpair to wakeup any polling threads.
       
   158          */
       
   159         int nThreads = threadCount();
       
   160         if (nThreads == 0) {
       
   161             implClose();
       
   162         } else {
       
   163             // send interrupt to each thread
       
   164             while (nThreads-- > 0) {
       
   165                 wakeup();
       
   166             }
       
   167         }
       
   168     }
       
   169 
       
   170     // invoke by clients to register a file descriptor
       
   171     @Override
       
   172     void startPoll(int fd, int events) {
       
   173         // update events (or add to epoll on first usage)
       
   174         int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
       
   175         if (err == ENOENT)
       
   176             err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
       
   177         if (err != 0)
       
   178             throw new AssertionError();     // should not happen
       
   179     }
       
   180 
       
   181     /*
       
   182      * Task to process events from epoll and dispatch to the channel's
       
   183      * onEvent handler.
       
   184      *
       
   185      * Events are retreived from epoll in batch and offered to a BlockingQueue
       
   186      * where they are consumed by handler threads. A special "NEED_TO_POLL"
       
   187      * event is used to signal one consumer to re-poll when all events have
       
   188      * been consumed.
       
   189      */
       
   190     private class EventHandlerTask implements Runnable {
       
   191         private Event poll() throws IOException {
       
   192             try {
       
   193                 for (;;) {
       
   194                     int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);
       
   195                     /*
       
   196                      * 'n' events have been read. Here we map them to their
       
   197                      * corresponding channel in batch and queue n-1 so that
       
   198                      * they can be handled by other handler threads. The last
       
   199                      * event is handled by this thread (and so is not queued).
       
   200                      */
       
   201                     fdToChannelLock.readLock().lock();
       
   202                     try {
       
   203                         while (n-- > 0) {
       
   204                             long eventAddress = getEvent(address, n);
       
   205                             int fd = getDescriptor(eventAddress);
       
   206 
       
   207                             // wakeup
       
   208                             if (fd == sp[0]) {
       
   209                                 if (wakeupCount.decrementAndGet() == 0) {
       
   210                                     // no more wakeups so drain pipe
       
   211                                     drain1(sp[0]);
       
   212                                 }
       
   213 
       
   214                                 // queue special event if there are more events
       
   215                                 // to handle.
       
   216                                 if (n > 0) {
       
   217                                     queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
       
   218                                     continue;
       
   219                                 }
       
   220                                 return EXECUTE_TASK_OR_SHUTDOWN;
       
   221                             }
       
   222 
       
   223                             PollableChannel channel = fdToChannel.get(fd);
       
   224                             if (channel != null) {
       
   225                                 int events = getEvents(eventAddress);
       
   226                                 Event ev = new Event(channel, events);
       
   227 
       
   228                                 // n-1 events are queued; This thread handles
       
   229                                 // the last one except for the wakeup
       
   230                                 if (n > 0) {
       
   231                                     queue.offer(ev);
       
   232                                 } else {
       
   233                                     return ev;
       
   234                                 }
       
   235                             }
       
   236                         }
       
   237                     } finally {
       
   238                         fdToChannelLock.readLock().unlock();
       
   239                     }
       
   240                 }
       
   241             } finally {
       
   242                 // to ensure that some thread will poll when all events have
       
   243                 // been consumed
       
   244                 queue.offer(NEED_TO_POLL);
       
   245             }
       
   246         }
       
   247 
       
   248         public void run() {
       
   249             Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
       
   250                 Invoker.getGroupAndInvokeCount();
       
   251             boolean replaceMe = false;
       
   252             Event ev;
       
   253             try {
       
   254                 for (;;) {
       
   255                     // reset invoke count
       
   256                     if (myGroupAndInvokeCount != null)
       
   257                         myGroupAndInvokeCount.resetInvokeCount();
       
   258 
       
   259                     try {
       
   260                         replaceMe = false;
       
   261                         ev = queue.take();
       
   262 
       
   263                         // no events and this thread has been "selected" to
       
   264                         // poll for more.
       
   265                         if (ev == NEED_TO_POLL) {
       
   266                             try {
       
   267                                 ev = poll();
       
   268                             } catch (IOException x) {
       
   269                                 x.printStackTrace();
       
   270                                 return;
       
   271                             }
       
   272                         }
       
   273                     } catch (InterruptedException x) {
       
   274                         continue;
       
   275                     }
       
   276 
       
   277                     // handle wakeup to execute task or shutdown
       
   278                     if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
       
   279                         Runnable task = pollTask();
       
   280                         if (task == null) {
       
   281                             // shutdown request
       
   282                             return;
       
   283                         }
       
   284                         // run task (may throw error/exception)
       
   285                         replaceMe = true;
       
   286                         task.run();
       
   287                         continue;
       
   288                     }
       
   289 
       
   290                     // process event
       
   291                     try {
       
   292                         ev.channel().onEvent(ev.events());
       
   293                     } catch (Error x) {
       
   294                         replaceMe = true; throw x;
       
   295                     } catch (RuntimeException x) {
       
   296                         replaceMe = true; throw x;
       
   297                     }
       
   298                 }
       
   299             } finally {
       
   300                 // last handler to exit when shutdown releases resources
       
   301                 int remaining = threadExit(this, replaceMe);
       
   302                 if (remaining == 0 && isShutdown()) {
       
   303                     implClose();
       
   304                 }
       
   305             }
       
   306         }
       
   307     }
       
   308 
       
   309     // -- Native methods --
       
   310 
       
   311     private static native void socketpair(int[] sv) throws IOException;
       
   312 
       
   313     private static native void interrupt(int fd) throws IOException;
       
   314 
       
   315     private static native void drain1(int fd) throws IOException;
       
   316 
       
   317     private static native void close0(int fd);
       
   318 
       
   319     static {
       
   320         Util.load();
       
   321     }
       
   322 }