jdk/src/aix/classes/sun/nio/ch/AixPollPort.java
changeset 22597 7515a991bb37
child 22604 9b394795e216
equal deleted inserted replaced
22596:62542b8be764 22597:7515a991bb37
       
     1 /*
       
     2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
       
     3  * Copyright 2012 SAP AG. All rights reserved.
       
     4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     5  *
       
     6  * This code is free software; you can redistribute it and/or modify it
       
     7  * under the terms of the GNU General Public License version 2 only, as
       
     8  * published by the Free Software Foundation.  Oracle designates this
       
     9  * particular file as subject to the "Classpath" exception as provided
       
    10  * by Oracle in the LICENSE file that accompanied this code.
       
    11  *
       
    12  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    14  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    15  * version 2 for more details (a copy is included in the LICENSE file that
       
    16  * accompanied this code).
       
    17  *
       
    18  * You should have received a copy of the GNU General Public License version
       
    19  * 2 along with this work; if not, write to the Free Software Foundation,
       
    20  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    21  *
       
    22  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    23  * or visit www.oracle.com if you need additional information or have any
       
    24  * questions.
       
    25  */
       
    26 
       
    27 package sun.nio.ch;
       
    28 
       
    29 import java.nio.channels.spi.AsynchronousChannelProvider;
       
    30 import java.io.IOException;
       
    31 import java.util.HashSet;
       
    32 import java.util.Iterator;
       
    33 import java.util.concurrent.ArrayBlockingQueue;
       
    34 import java.util.concurrent.RejectedExecutionException;
       
    35 import java.util.concurrent.atomic.AtomicInteger;
       
    36 import java.util.concurrent.locks.ReentrantLock;
       
    37 import sun.misc.Unsafe;
       
    38 
       
    39 /**
       
    40  * AsynchronousChannelGroup implementation based on the AIX pollset framework.
       
    41  */
       
    42 final class AixPollPort
       
    43     extends Port
       
    44 {
       
    45     private static final Unsafe unsafe = Unsafe.getUnsafe();
       
    46 
       
    47     static {
       
    48         IOUtil.load();
       
    49         init();
       
    50     }
       
    51 
       
    52     /**
       
    53      * struct pollfd {
       
    54      *     int fd;
       
    55      *     short events;
       
    56      *     short revents;
       
    57      * }
       
    58      */
       
    59     private static final int SIZEOF_POLLFD    = eventSize();
       
    60     private static final int OFFSETOF_EVENTS  = eventsOffset();
       
    61     private static final int OFFSETOF_REVENTS = reventsOffset();
       
    62     private static final int OFFSETOF_FD      = fdOffset();
       
    63 
       
    64     // opcodes
       
    65     private static final int PS_ADD     = 0x0;
       
    66     private static final int PS_MOD     = 0x1;
       
    67     private static final int PS_DELETE  = 0x2;
       
    68 
       
    69     // maximum number of events to poll at a time
       
    70     private static final int MAX_POLL_EVENTS = 512;
       
    71 
       
    72     // pollset ID
       
    73     private final int pollset;
       
    74 
       
    75     // true if port is closed
       
    76     private boolean closed;
       
    77 
       
    78     // socket pair used for wakeup
       
    79     private final int sp[];
       
    80 
       
    81     // socket pair used to indicate pending pollsetCtl calls
       
    82     // Background info: pollsetCtl blocks when another thread is in a pollsetPoll call.
       
    83     private final int ctlSp[];
       
    84 
       
    85     // number of wakeups pending
       
    86     private final AtomicInteger wakeupCount = new AtomicInteger();
       
    87 
       
    88     // address of the poll array passed to pollset_poll
       
    89     private final long address;
       
    90 
       
    91     // encapsulates an event for a channel
       
    92     static class Event {
       
    93         final PollableChannel channel;
       
    94         final int events;
       
    95 
       
    96         Event(PollableChannel channel, int events) {
       
    97             this.channel = channel;
       
    98             this.events = events;
       
    99         }
       
   100 
       
   101         PollableChannel channel()   { return channel; }
       
   102         int events()                { return events; }
       
   103     }
       
   104 
       
   105     // queue of events for cases that a polling thread dequeues more than one
       
   106     // event
       
   107     private final ArrayBlockingQueue<Event> queue;
       
   108     private final Event NEED_TO_POLL = new Event(null, 0);
       
   109     private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
       
   110 
       
   111     // encapsulates a pollset control event for a file descriptor
       
   112     static class ControlEvent {
       
   113         final int fd;
       
   114         final int events;
       
   115         final boolean removeOnly;
       
   116         int error = 0;
       
   117 
       
   118         ControlEvent(int fd, int events, boolean removeOnly) {
       
   119             this.fd = fd;
       
   120             this.events = events;
       
   121             this.removeOnly = removeOnly;
       
   122         }
       
   123 
       
   124         int fd()                 { return fd; }
       
   125         int events()             { return events; }
       
   126         boolean removeOnly()     { return removeOnly; }
       
   127         int error()              { return error; }
       
   128         void setError(int error) { this.error = error; }
       
   129     }
       
   130 
       
   131     // queue of control events that need to be processed
       
   132     // (this object is also used for synchronization)
       
   133     private final HashSet<ControlEvent> controlQueue = new HashSet<ControlEvent>();
       
   134 
       
   135     // lock used to check whether a poll operation is ongoing
       
   136     private final ReentrantLock controlLock = new ReentrantLock();
       
   137 
       
   138     AixPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
       
   139         throws IOException
       
   140     {
       
   141         super(provider, pool);
       
   142 
       
   143         // open pollset
       
   144         this.pollset = pollsetCreate();
       
   145 
       
   146         // create socket pair for wakeup mechanism
       
   147         int[] sv = new int[2];
       
   148         try {
       
   149             socketpair(sv);
       
   150             // register one end with pollset
       
   151             pollsetCtl(pollset, PS_ADD, sv[0], POLLIN);
       
   152         } catch (IOException x) {
       
   153             pollsetDestroy(pollset);
       
   154             throw x;
       
   155         }
       
   156         this.sp = sv;
       
   157 
       
   158         // create socket pair for pollset control mechanism
       
   159         sv = new int[2];
       
   160         try {
       
   161             socketpair(sv);
       
   162             // register one end with pollset
       
   163             pollsetCtl(pollset, PS_ADD, sv[0], POLLIN);
       
   164         } catch (IOException x) {
       
   165             pollsetDestroy(pollset);
       
   166             throw x;
       
   167         }
       
   168         this.ctlSp = sv;
       
   169 
       
   170         // allocate the poll array
       
   171         this.address = allocatePollArray(MAX_POLL_EVENTS);
       
   172 
       
   173         // create the queue and offer the special event to ensure that the first
       
   174         // threads polls
       
   175         this.queue = new ArrayBlockingQueue<Event>(MAX_POLL_EVENTS);
       
   176         this.queue.offer(NEED_TO_POLL);
       
   177     }
       
   178 
       
   179     AixPollPort start() {
       
   180         startThreads(new EventHandlerTask());
       
   181         return this;
       
   182     }
       
   183 
       
   184     /**
       
   185      * Release all resources
       
   186      */
       
   187     private void implClose() {
       
   188         synchronized (this) {
       
   189             if (closed)
       
   190                 return;
       
   191             closed = true;
       
   192         }
       
   193         freePollArray(address);
       
   194         close0(sp[0]);
       
   195         close0(sp[1]);
       
   196         close0(ctlSp[0]);
       
   197         close0(ctlSp[1]);
       
   198         pollsetDestroy(pollset);
       
   199     }
       
   200 
       
   201     private void wakeup() {
       
   202         if (wakeupCount.incrementAndGet() == 1) {
       
   203             // write byte to socketpair to force wakeup
       
   204             try {
       
   205                 interrupt(sp[1]);
       
   206             } catch (IOException x) {
       
   207                 throw new AssertionError(x);
       
   208             }
       
   209         }
       
   210     }
       
   211 
       
   212     @Override
       
   213     void executeOnHandlerTask(Runnable task) {
       
   214         synchronized (this) {
       
   215             if (closed)
       
   216                 throw new RejectedExecutionException();
       
   217             offerTask(task);
       
   218             wakeup();
       
   219         }
       
   220     }
       
   221 
       
   222     @Override
       
   223     void shutdownHandlerTasks() {
       
   224         /*
       
   225          * If no tasks are running then just release resources; otherwise
       
   226          * write to the one end of the socketpair to wakeup any polling threads.
       
   227          */
       
   228         int nThreads = threadCount();
       
   229         if (nThreads == 0) {
       
   230             implClose();
       
   231         } else {
       
   232             // send interrupt to each thread
       
   233             while (nThreads-- > 0) {
       
   234                 wakeup();
       
   235             }
       
   236         }
       
   237     }
       
   238 
       
   239     // invoke by clients to register a file descriptor
       
   240     @Override
       
   241     void startPoll(int fd, int events) {
       
   242         queueControlEvent(new ControlEvent(fd, events, false));
       
   243     }
       
   244 
       
   245     // Callback method for implementations that need special handling when fd is removed
       
   246     @Override
       
   247     protected void preUnregister(int fd) {
       
   248         queueControlEvent(new ControlEvent(fd, 0, true));
       
   249     }
       
   250 
       
   251     // Add control event into queue and wait for completion.
       
   252     // In case the control lock is free, this method also tries to apply the control change directly.
       
   253     private void queueControlEvent(ControlEvent ev) {
       
   254         // pollsetCtl blocks when a poll call is ongoing. This is very probable.
       
   255         // Therefore we let the polling thread do the pollsetCtl call.
       
   256         synchronized (controlQueue) {
       
   257             controlQueue.add(ev);
       
   258             // write byte to socketpair to force wakeup
       
   259             try {
       
   260                 interrupt(ctlSp[1]);
       
   261             } catch (IOException x) {
       
   262                 throw new AssertionError(x);
       
   263             }
       
   264             do {
       
   265                 // Directly empty queue if no poll call is ongoing.
       
   266                 if (controlLock.tryLock()) {
       
   267                     try {
       
   268                         processControlQueue();
       
   269                     } finally {
       
   270                         controlLock.unlock();
       
   271                     }
       
   272                 } else {
       
   273                     try {
       
   274                         // Do not starve in case the polling thread returned before
       
   275                         // we could write to ctlSp[1] but the polling thread did not
       
   276                         // release the control lock until we checked. Therefore, use
       
   277                         // a timed wait for the time being.
       
   278                         controlQueue.wait(100);
       
   279                     } catch (InterruptedException e) {
       
   280                         // ignore exception and try again
       
   281                     }
       
   282                 }
       
   283             } while (controlQueue.contains(ev));
       
   284         }
       
   285         if (ev.error() != 0) {
       
   286             throw new AssertionError();
       
   287         }
       
   288     }
       
   289 
       
   290     // Process all events currently stored in the control queue.
       
   291     private void processControlQueue() {
       
   292         synchronized (controlQueue) {
       
   293             // On Aix it is only possible to set the event
       
   294             // bits on the first call of pollsetCtl. Later
       
   295             // calls only add bits, but cannot remove them.
       
   296             // Therefore, we always remove the file
       
   297             // descriptor ignoring the error and then add it.
       
   298             Iterator<ControlEvent> iter = controlQueue.iterator();
       
   299             while (iter.hasNext()) {
       
   300                 ControlEvent ev = iter.next();
       
   301                 pollsetCtl(pollset, PS_DELETE, ev.fd(), 0);
       
   302                 if (!ev.removeOnly()) {
       
   303                     ev.setError(pollsetCtl(pollset, PS_MOD, ev.fd(), ev.events()));
       
   304                 }
       
   305                 iter.remove();
       
   306             }
       
   307             controlQueue.notifyAll();
       
   308         }
       
   309     }
       
   310 
       
   311     /*
       
   312      * Task to process events from pollset and dispatch to the channel's
       
   313      * onEvent handler.
       
   314      *
       
   315      * Events are retreived from pollset in batch and offered to a BlockingQueue
       
   316      * where they are consumed by handler threads. A special "NEED_TO_POLL"
       
   317      * event is used to signal one consumer to re-poll when all events have
       
   318      * been consumed.
       
   319      */
       
   320     private class EventHandlerTask implements Runnable {
       
   321         private Event poll() throws IOException {
       
   322             try {
       
   323                 for (;;) {
       
   324                     int n;
       
   325                     controlLock.lock();
       
   326                     try {
       
   327                         n = pollsetPoll(pollset, address, MAX_POLL_EVENTS);
       
   328                     } finally {
       
   329                         controlLock.unlock();
       
   330                     }
       
   331                     /*
       
   332                      * 'n' events have been read. Here we map them to their
       
   333                      * corresponding channel in batch and queue n-1 so that
       
   334                      * they can be handled by other handler threads. The last
       
   335                      * event is handled by this thread (and so is not queued).
       
   336                      */
       
   337                     fdToChannelLock.readLock().lock();
       
   338                     try {
       
   339                         while (n-- > 0) {
       
   340                             long eventAddress = getEvent(address, n);
       
   341                             int fd = getDescriptor(eventAddress);
       
   342 
       
   343                             // To emulate one shot semantic we need to remove
       
   344                             // the file descriptor here.
       
   345                             pollsetCtl(pollset, PS_DELETE, fd, 0);
       
   346 
       
   347                             // wakeup
       
   348                             if (fd == sp[0]) {
       
   349                                 if (wakeupCount.decrementAndGet() == 0) {
       
   350                                     // no more wakeups so drain pipe
       
   351                                     drain1(sp[0]);
       
   352                                 }
       
   353 
       
   354                                 // This is the only file descriptor without
       
   355                                 // one shot semantic => register it again.
       
   356                                 pollsetCtl(pollset, PS_ADD, sp[0], POLLIN);
       
   357 
       
   358                                 // queue special event if there are more events
       
   359                                 // to handle.
       
   360                                 if (n > 0) {
       
   361                                     queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
       
   362                                     continue;
       
   363                                 }
       
   364                                 return EXECUTE_TASK_OR_SHUTDOWN;
       
   365                             }
       
   366 
       
   367                             // wakeup to process control event
       
   368                             if (fd == ctlSp[0]) {
       
   369                                 synchronized (controlQueue) {
       
   370                                     drain1(ctlSp[0]);
       
   371                                     // This file descriptor does not have
       
   372                                     // one shot semantic => register it again.
       
   373                                     pollsetCtl(pollset, PS_ADD, ctlSp[0], POLLIN);
       
   374                                     processControlQueue();
       
   375                                 }
       
   376                                 continue;
       
   377                             }
       
   378 
       
   379                             PollableChannel channel = fdToChannel.get(fd);
       
   380                             if (channel != null) {
       
   381                                 int events = getRevents(eventAddress);
       
   382                                 Event ev = new Event(channel, events);
       
   383 
       
   384                                 // n-1 events are queued; This thread handles
       
   385                                 // the last one except for the wakeup
       
   386                                 if (n > 0) {
       
   387                                     queue.offer(ev);
       
   388                                 } else {
       
   389                                     return ev;
       
   390                                 }
       
   391                             }
       
   392                         }
       
   393                     } finally {
       
   394                         fdToChannelLock.readLock().unlock();
       
   395                     }
       
   396                 }
       
   397             } finally {
       
   398                 // to ensure that some thread will poll when all events have
       
   399                 // been consumed
       
   400                 queue.offer(NEED_TO_POLL);
       
   401             }
       
   402         }
       
   403 
       
   404         public void run() {
       
   405             Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
       
   406                 Invoker.getGroupAndInvokeCount();
       
   407             final boolean isPooledThread = (myGroupAndInvokeCount != null);
       
   408             boolean replaceMe = false;
       
   409             Event ev;
       
   410             try {
       
   411                 for (;;) {
       
   412                     // reset invoke count
       
   413                     if (isPooledThread)
       
   414                         myGroupAndInvokeCount.resetInvokeCount();
       
   415 
       
   416                     try {
       
   417                         replaceMe = false;
       
   418                         ev = queue.take();
       
   419 
       
   420                         // no events and this thread has been "selected" to
       
   421                         // poll for more.
       
   422                         if (ev == NEED_TO_POLL) {
       
   423                             try {
       
   424                                 ev = poll();
       
   425                             } catch (IOException x) {
       
   426                                 x.printStackTrace();
       
   427                                 return;
       
   428                             }
       
   429                         }
       
   430                     } catch (InterruptedException x) {
       
   431                         continue;
       
   432                     }
       
   433 
       
   434                     // handle wakeup to execute task or shutdown
       
   435                     if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
       
   436                         Runnable task = pollTask();
       
   437                         if (task == null) {
       
   438                             // shutdown request
       
   439                             return;
       
   440                         }
       
   441                         // run task (may throw error/exception)
       
   442                         replaceMe = true;
       
   443                         task.run();
       
   444                         continue;
       
   445                     }
       
   446 
       
   447                     // process event
       
   448                     try {
       
   449                         ev.channel().onEvent(ev.events(), isPooledThread);
       
   450                     } catch (Error x) {
       
   451                         replaceMe = true; throw x;
       
   452                     } catch (RuntimeException x) {
       
   453                         replaceMe = true; throw x;
       
   454                     }
       
   455                 }
       
   456             } finally {
       
   457                 // last handler to exit when shutdown releases resources
       
   458                 int remaining = threadExit(this, replaceMe);
       
   459                 if (remaining == 0 && isShutdown()) {
       
   460                     implClose();
       
   461                 }
       
   462             }
       
   463         }
       
   464     }
       
   465 
       
   466     /**
       
   467      * Allocates a poll array to handle up to {@code count} events.
       
   468      */
       
   469     private static long allocatePollArray(int count) {
       
   470         return unsafe.allocateMemory(count * SIZEOF_POLLFD);
       
   471     }
       
   472 
       
   473     /**
       
   474      * Free a poll array
       
   475      */
       
   476     private static void freePollArray(long address) {
       
   477         unsafe.freeMemory(address);
       
   478     }
       
   479 
       
   480     /**
       
   481      * Returns event[i];
       
   482      */
       
   483     private static long getEvent(long address, int i) {
       
   484         return address + (SIZEOF_POLLFD*i);
       
   485     }
       
   486 
       
   487     /**
       
   488      * Returns event->fd
       
   489      */
       
   490     private static int getDescriptor(long eventAddress) {
       
   491         return unsafe.getInt(eventAddress + OFFSETOF_FD);
       
   492     }
       
   493 
       
   494     /**
       
   495      * Returns event->events
       
   496      */
       
   497     private static int getEvents(long eventAddress) {
       
   498         return unsafe.getChar(eventAddress + OFFSETOF_EVENTS);
       
   499     }
       
   500 
       
   501     /**
       
   502      * Returns event->revents
       
   503      */
       
   504     private static int getRevents(long eventAddress) {
       
   505         return unsafe.getChar(eventAddress + OFFSETOF_REVENTS);
       
   506     }
       
   507 
       
   508     // -- Native methods --
       
   509 
       
   510     private static native void init();
       
   511 
       
   512     private static native int eventSize();
       
   513 
       
   514     private static native int eventsOffset();
       
   515 
       
   516     private static native int reventsOffset();
       
   517 
       
   518     private static native int fdOffset();
       
   519 
       
   520     private static native int pollsetCreate() throws IOException;
       
   521 
       
   522     private static native int pollsetCtl(int pollset, int opcode, int fd, int events);
       
   523 
       
   524     private static native int pollsetPoll(int pollset, long pollAddress, int numfds)
       
   525         throws IOException;
       
   526 
       
   527     private static native void pollsetDestroy(int pollset);
       
   528 
       
   529     private static native void socketpair(int[] sv) throws IOException;
       
   530 
       
   531     private static native void interrupt(int fd) throws IOException;
       
   532 
       
   533     private static native void drain1(int fd) throws IOException;
       
   534 
       
   535     private static native void close0(int fd);
       
   536 }