--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/java.base/aix/classes/sun/nio/ch/AixPollPort.java Tue Sep 12 19:03:39 2017 +0200
@@ -0,0 +1,542 @@
+/*
+ * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012 SAP SE. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package sun.nio.ch;
+
+import java.nio.channels.spi.AsynchronousChannelProvider;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import jdk.internal.misc.Unsafe;
+
+/**
+ * AsynchronousChannelGroup implementation based on the AIX pollset framework.
+ */
+final class AixPollPort
+ extends Port
+{
+ private static final Unsafe unsafe = Unsafe.getUnsafe();
+
+ static {
+ IOUtil.load();
+ init();
+ }
+
+ /**
+ * struct pollfd {
+ * int fd;
+ * short events;
+ * short revents;
+ * }
+ */
+ private static final int SIZEOF_POLLFD = eventSize();
+ private static final int OFFSETOF_EVENTS = eventsOffset();
+ private static final int OFFSETOF_REVENTS = reventsOffset();
+ private static final int OFFSETOF_FD = fdOffset();
+
+ // opcodes
+ private static final int PS_ADD = 0x0;
+ private static final int PS_MOD = 0x1;
+ private static final int PS_DELETE = 0x2;
+
+ // maximum number of events to poll at a time
+ private static final int MAX_POLL_EVENTS = 512;
+
+ // pollset ID
+ private final int pollset;
+
+ // true if port is closed
+ private boolean closed;
+
+ // socket pair used for wakeup
+ private final int sp[];
+
+ // socket pair used to indicate pending pollsetCtl calls
+ // Background info: pollsetCtl blocks when another thread is in a pollsetPoll call.
+ private final int ctlSp[];
+
+ // number of wakeups pending
+ private final AtomicInteger wakeupCount = new AtomicInteger();
+
+ // address of the poll array passed to pollset_poll
+ private final long address;
+
+ // encapsulates an event for a channel
+ static class Event {
+ final PollableChannel channel;
+ final int events;
+
+ Event(PollableChannel channel, int events) {
+ this.channel = channel;
+ this.events = events;
+ }
+
+ PollableChannel channel() { return channel; }
+ int events() { return events; }
+ }
+
+ // queue of events for cases that a polling thread dequeues more than one
+ // event
+ private final ArrayBlockingQueue<Event> queue;
+ private final Event NEED_TO_POLL = new Event(null, 0);
+ private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
+ private final Event CONTINUE_AFTER_CTL_EVENT = new Event(null, 0);
+
+ // encapsulates a pollset control event for a file descriptor
+ static class ControlEvent {
+ final int fd;
+ final int events;
+ final boolean removeOnly;
+ int error = 0;
+
+ ControlEvent(int fd, int events, boolean removeOnly) {
+ this.fd = fd;
+ this.events = events;
+ this.removeOnly = removeOnly;
+ }
+
+ int fd() { return fd; }
+ int events() { return events; }
+ boolean removeOnly() { return removeOnly; }
+ int error() { return error; }
+ void setError(int error) { this.error = error; }
+ }
+
+ // queue of control events that need to be processed
+ // (this object is also used for synchronization)
+ private final HashSet<ControlEvent> controlQueue = new HashSet<ControlEvent>();
+
+ // lock used to check whether a poll operation is ongoing
+ private final ReentrantLock controlLock = new ReentrantLock();
+
+ AixPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
+ throws IOException
+ {
+ super(provider, pool);
+
+ // open pollset
+ this.pollset = pollsetCreate();
+
+ // create socket pair for wakeup mechanism
+ int[] sv = new int[2];
+ try {
+ socketpair(sv);
+ // register one end with pollset
+ pollsetCtl(pollset, PS_ADD, sv[0], Net.POLLIN);
+ } catch (IOException x) {
+ pollsetDestroy(pollset);
+ throw x;
+ }
+ this.sp = sv;
+
+ // create socket pair for pollset control mechanism
+ sv = new int[2];
+ try {
+ socketpair(sv);
+ // register one end with pollset
+ pollsetCtl(pollset, PS_ADD, sv[0], Net.POLLIN);
+ } catch (IOException x) {
+ pollsetDestroy(pollset);
+ throw x;
+ }
+ this.ctlSp = sv;
+
+ // allocate the poll array
+ this.address = allocatePollArray(MAX_POLL_EVENTS);
+
+ // create the queue and offer the special event to ensure that the first
+ // threads polls
+ this.queue = new ArrayBlockingQueue<Event>(MAX_POLL_EVENTS);
+ this.queue.offer(NEED_TO_POLL);
+ }
+
+ AixPollPort start() {
+ startThreads(new EventHandlerTask());
+ return this;
+ }
+
+ /**
+ * Release all resources
+ */
+ private void implClose() {
+ synchronized (this) {
+ if (closed)
+ return;
+ closed = true;
+ }
+ freePollArray(address);
+ close0(sp[0]);
+ close0(sp[1]);
+ close0(ctlSp[0]);
+ close0(ctlSp[1]);
+ pollsetDestroy(pollset);
+ }
+
+ private void wakeup() {
+ if (wakeupCount.incrementAndGet() == 1) {
+ // write byte to socketpair to force wakeup
+ try {
+ interrupt(sp[1]);
+ } catch (IOException x) {
+ throw new AssertionError(x);
+ }
+ }
+ }
+
+ @Override
+ void executeOnHandlerTask(Runnable task) {
+ synchronized (this) {
+ if (closed)
+ throw new RejectedExecutionException();
+ offerTask(task);
+ wakeup();
+ }
+ }
+
+ @Override
+ void shutdownHandlerTasks() {
+ /*
+ * If no tasks are running then just release resources; otherwise
+ * write to the one end of the socketpair to wakeup any polling threads.
+ */
+ int nThreads = threadCount();
+ if (nThreads == 0) {
+ implClose();
+ } else {
+ // send interrupt to each thread
+ while (nThreads-- > 0) {
+ wakeup();
+ }
+ }
+ }
+
+ // invoke by clients to register a file descriptor
+ @Override
+ void startPoll(int fd, int events) {
+ queueControlEvent(new ControlEvent(fd, events, false));
+ }
+
+ // Callback method for implementations that need special handling when fd is removed
+ @Override
+ protected void preUnregister(int fd) {
+ queueControlEvent(new ControlEvent(fd, 0, true));
+ }
+
+ // Add control event into queue and wait for completion.
+ // In case the control lock is free, this method also tries to apply the control change directly.
+ private void queueControlEvent(ControlEvent ev) {
+ // pollsetCtl blocks when a poll call is ongoing. This is very probable.
+ // Therefore we let the polling thread do the pollsetCtl call.
+ synchronized (controlQueue) {
+ controlQueue.add(ev);
+ // write byte to socketpair to force wakeup
+ try {
+ interrupt(ctlSp[1]);
+ } catch (IOException x) {
+ throw new AssertionError(x);
+ }
+ do {
+ // Directly empty queue if no poll call is ongoing.
+ if (controlLock.tryLock()) {
+ try {
+ processControlQueue();
+ } finally {
+ controlLock.unlock();
+ }
+ } else {
+ try {
+ // Do not starve in case the polling thread returned before
+ // we could write to ctlSp[1] but the polling thread did not
+ // release the control lock until we checked. Therefore, use
+ // a timed wait for the time being.
+ controlQueue.wait(100);
+ } catch (InterruptedException e) {
+ // ignore exception and try again
+ }
+ }
+ } while (controlQueue.contains(ev));
+ }
+ if (ev.error() != 0) {
+ throw new AssertionError();
+ }
+ }
+
+ // Process all events currently stored in the control queue.
+ private void processControlQueue() {
+ synchronized (controlQueue) {
+ // On Aix it is only possible to set the event
+ // bits on the first call of pollsetCtl. Later
+ // calls only add bits, but cannot remove them.
+ // Therefore, we always remove the file
+ // descriptor ignoring the error and then add it.
+ Iterator<ControlEvent> iter = controlQueue.iterator();
+ while (iter.hasNext()) {
+ ControlEvent ev = iter.next();
+ pollsetCtl(pollset, PS_DELETE, ev.fd(), 0);
+ if (!ev.removeOnly()) {
+ ev.setError(pollsetCtl(pollset, PS_MOD, ev.fd(), ev.events()));
+ }
+ iter.remove();
+ }
+ controlQueue.notifyAll();
+ }
+ }
+
+ /*
+ * Task to process events from pollset and dispatch to the channel's
+ * onEvent handler.
+ *
+ * Events are retreived from pollset in batch and offered to a BlockingQueue
+ * where they are consumed by handler threads. A special "NEED_TO_POLL"
+ * event is used to signal one consumer to re-poll when all events have
+ * been consumed.
+ */
+ private class EventHandlerTask implements Runnable {
+ private Event poll() throws IOException {
+ try {
+ for (;;) {
+ int n;
+ controlLock.lock();
+ try {
+ n = pollsetPoll(pollset, address, MAX_POLL_EVENTS);
+ } finally {
+ controlLock.unlock();
+ }
+ /*
+ * 'n' events have been read. Here we map them to their
+ * corresponding channel in batch and queue n-1 so that
+ * they can be handled by other handler threads. The last
+ * event is handled by this thread (and so is not queued).
+ */
+ fdToChannelLock.readLock().lock();
+ try {
+ while (n-- > 0) {
+ long eventAddress = getEvent(address, n);
+ int fd = getDescriptor(eventAddress);
+
+ // To emulate one shot semantic we need to remove
+ // the file descriptor here.
+ if (fd != sp[0] && fd != ctlSp[0]) {
+ synchronized (controlQueue) {
+ pollsetCtl(pollset, PS_DELETE, fd, 0);
+ }
+ }
+
+ // wakeup
+ if (fd == sp[0]) {
+ if (wakeupCount.decrementAndGet() == 0) {
+ // no more wakeups so drain pipe
+ drain1(sp[0]);
+ }
+
+ // queue special event if there are more events
+ // to handle.
+ if (n > 0) {
+ queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
+ continue;
+ }
+ return EXECUTE_TASK_OR_SHUTDOWN;
+ }
+
+ // wakeup to process control event
+ if (fd == ctlSp[0]) {
+ synchronized (controlQueue) {
+ drain1(ctlSp[0]);
+ processControlQueue();
+ }
+ if (n > 0) {
+ continue;
+ }
+ return CONTINUE_AFTER_CTL_EVENT;
+ }
+
+ PollableChannel channel = fdToChannel.get(fd);
+ if (channel != null) {
+ int events = getRevents(eventAddress);
+ Event ev = new Event(channel, events);
+
+ // n-1 events are queued; This thread handles
+ // the last one except for the wakeup
+ if (n > 0) {
+ queue.offer(ev);
+ } else {
+ return ev;
+ }
+ }
+ }
+ } finally {
+ fdToChannelLock.readLock().unlock();
+ }
+ }
+ } finally {
+ // to ensure that some thread will poll when all events have
+ // been consumed
+ queue.offer(NEED_TO_POLL);
+ }
+ }
+
+ public void run() {
+ Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
+ Invoker.getGroupAndInvokeCount();
+ final boolean isPooledThread = (myGroupAndInvokeCount != null);
+ boolean replaceMe = false;
+ Event ev;
+ try {
+ for (;;) {
+ // reset invoke count
+ if (isPooledThread)
+ myGroupAndInvokeCount.resetInvokeCount();
+
+ try {
+ replaceMe = false;
+ ev = queue.take();
+
+ // no events and this thread has been "selected" to
+ // poll for more.
+ if (ev == NEED_TO_POLL) {
+ try {
+ ev = poll();
+ } catch (IOException x) {
+ x.printStackTrace();
+ return;
+ }
+ }
+ } catch (InterruptedException x) {
+ continue;
+ }
+
+ // contine after we processed a control event
+ if (ev == CONTINUE_AFTER_CTL_EVENT) {
+ continue;
+ }
+
+ // handle wakeup to execute task or shutdown
+ if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
+ Runnable task = pollTask();
+ if (task == null) {
+ // shutdown request
+ return;
+ }
+ // run task (may throw error/exception)
+ replaceMe = true;
+ task.run();
+ continue;
+ }
+
+ // process event
+ try {
+ ev.channel().onEvent(ev.events(), isPooledThread);
+ } catch (Error x) {
+ replaceMe = true; throw x;
+ } catch (RuntimeException x) {
+ replaceMe = true; throw x;
+ }
+ }
+ } finally {
+ // last handler to exit when shutdown releases resources
+ int remaining = threadExit(this, replaceMe);
+ if (remaining == 0 && isShutdown()) {
+ implClose();
+ }
+ }
+ }
+ }
+
+ /**
+ * Allocates a poll array to handle up to {@code count} events.
+ */
+ private static long allocatePollArray(int count) {
+ return unsafe.allocateMemory(count * SIZEOF_POLLFD);
+ }
+
+ /**
+ * Free a poll array
+ */
+ private static void freePollArray(long address) {
+ unsafe.freeMemory(address);
+ }
+
+ /**
+ * Returns event[i];
+ */
+ private static long getEvent(long address, int i) {
+ return address + (SIZEOF_POLLFD*i);
+ }
+
+ /**
+ * Returns event->fd
+ */
+ private static int getDescriptor(long eventAddress) {
+ return unsafe.getInt(eventAddress + OFFSETOF_FD);
+ }
+
+ /**
+ * Returns event->events
+ */
+ private static int getEvents(long eventAddress) {
+ return unsafe.getChar(eventAddress + OFFSETOF_EVENTS);
+ }
+
+ /**
+ * Returns event->revents
+ */
+ private static int getRevents(long eventAddress) {
+ return unsafe.getChar(eventAddress + OFFSETOF_REVENTS);
+ }
+
+ // -- Native methods --
+
+ private static native void init();
+
+ private static native int eventSize();
+
+ private static native int eventsOffset();
+
+ private static native int reventsOffset();
+
+ private static native int fdOffset();
+
+ private static native int pollsetCreate() throws IOException;
+
+ private static native int pollsetCtl(int pollset, int opcode, int fd, int events);
+
+ private static native int pollsetPoll(int pollset, long pollAddress, int numfds)
+ throws IOException;
+
+ private static native void pollsetDestroy(int pollset);
+
+ private static native void socketpair(int[] sv) throws IOException;
+
+ private static native void interrupt(int fd) throws IOException;
+
+ private static native void drain1(int fd) throws IOException;
+
+ private static native void close0(int fd);
+}