--- a/src/java.base/macosx/classes/sun/nio/ch/KQueuePort.java Fri Mar 23 09:51:02 2018 +0100
+++ b/src/java.base/macosx/classes/sun/nio/ch/KQueuePort.java Fri Mar 23 14:18:18 2018 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -30,7 +30,11 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
-import static sun.nio.ch.KQueue.*;
+
+import static sun.nio.ch.KQueue.EVFILT_READ;
+import static sun.nio.ch.KQueue.EVFILT_WRITE;
+import static sun.nio.ch.KQueue.EV_ADD;
+import static sun.nio.ch.KQueue.EV_ONESHOT;
/**
* AsynchronousChannelGroup implementation based on the BSD kqueue facility.
@@ -45,6 +49,9 @@
// kqueue file descriptor
private final int kqfd;
+ // address of the poll array passed to kqueue_wait
+ private final long address;
+
// true if kqueue closed
private boolean closed;
@@ -54,9 +61,6 @@
// number of wakeups pending
private final AtomicInteger wakeupCount = new AtomicInteger();
- // address of the poll array passed to kqueue_wait
- private final long address;
-
// encapsulates an event for a channel
static class Event {
final PollableChannel channel;
@@ -82,28 +86,25 @@
{
super(provider, pool);
- // open kqueue
- this.kqfd = kqueue();
+ this.kqfd = KQueue.create();
+ this.address = KQueue.allocatePollArray(MAX_KEVENTS_TO_POLL);
// create socket pair for wakeup mechanism
- int[] sv = new int[2];
try {
- socketpair(sv);
+ long fds = IOUtil.makePipe(true);
+ this.sp = new int[]{(int) (fds >>> 32), (int) fds};
+ } catch (IOException ioe) {
+ KQueue.freePollArray(address);
+ FileDispatcherImpl.closeIntFD(kqfd);
+ throw ioe;
+ }
- // register one end with kqueue
- keventRegister(kqfd, sv[0], EVFILT_READ, EV_ADD);
- } catch (IOException x) {
- close0(kqfd);
- throw x;
- }
- this.sp = sv;
-
- // allocate the poll array
- this.address = allocatePollArray(MAX_KEVENTS_TO_POLL);
+ // register one end with kqueue
+ KQueue.register(kqfd, sp[0], EVFILT_READ, EV_ADD);
// create the queue and offer the special event to ensure that the first
// threads polls
- this.queue = new ArrayBlockingQueue<Event>(MAX_KEVENTS_TO_POLL);
+ this.queue = new ArrayBlockingQueue<>(MAX_KEVENTS_TO_POLL);
this.queue.offer(NEED_TO_POLL);
}
@@ -121,17 +122,18 @@
return;
closed = true;
}
- freePollArray(address);
- close0(sp[0]);
- close0(sp[1]);
- close0(kqfd);
+
+ try { FileDispatcherImpl.closeIntFD(kqfd); } catch (IOException ioe) { }
+ try { FileDispatcherImpl.closeIntFD(sp[0]); } catch (IOException ioe) { }
+ try { FileDispatcherImpl.closeIntFD(sp[1]); } catch (IOException ioe) { }
+ KQueue.freePollArray(address);
}
private void wakeup() {
if (wakeupCount.incrementAndGet() == 1) {
// write byte to socketpair to force wakeup
try {
- interrupt(sp[1]);
+ IOUtil.write1(sp[1], (byte)0);
} catch (IOException x) {
throw new AssertionError(x);
}
@@ -173,9 +175,9 @@
int err = 0;
int flags = (EV_ADD|EV_ONESHOT);
if ((events & Net.POLLIN) > 0)
- err = keventRegister(kqfd, fd, EVFILT_READ, flags);
+ err = KQueue.register(kqfd, fd, EVFILT_READ, flags);
if (err == 0 && (events & Net.POLLOUT) > 0)
- err = keventRegister(kqfd, fd, EVFILT_WRITE, flags);
+ err = KQueue.register(kqfd, fd, EVFILT_WRITE, flags);
if (err != 0)
throw new InternalError("kevent failed: " + err); // should not happen
}
@@ -193,7 +195,11 @@
private Event poll() throws IOException {
try {
for (;;) {
- int n = keventPoll(kqfd, address, MAX_KEVENTS_TO_POLL);
+ int n;
+ do {
+ n = KQueue.poll(kqfd, address, MAX_KEVENTS_TO_POLL, -1L);
+ } while (n == IOStatus.INTERRUPTED);
+
/*
* 'n' events have been read. Here we map them to their
* corresponding channel in batch and queue n-1 so that
@@ -203,14 +209,14 @@
fdToChannelLock.readLock().lock();
try {
while (n-- > 0) {
- long keventAddress = getEvent(address, n);
- int fd = getDescriptor(keventAddress);
+ long keventAddress = KQueue.getEvent(address, n);
+ int fd = KQueue.getDescriptor(keventAddress);
// wakeup
if (fd == sp[0]) {
if (wakeupCount.decrementAndGet() == 0) {
// no more wakeups so drain pipe
- drain1(sp[0]);
+ IOUtil.drain(sp[0]);
}
// queue special event if there are more events
@@ -224,7 +230,7 @@
PollableChannel channel = fdToChannel.get(fd);
if (channel != null) {
- int filter = getFilter(keventAddress);
+ int filter = KQueue.getFilter(keventAddress);
int events = 0;
if (filter == EVFILT_READ)
events = Net.POLLIN;
@@ -314,18 +320,4 @@
}
}
}
-
- // -- Native methods --
-
- private static native void socketpair(int[] sv) throws IOException;
-
- private static native void interrupt(int fd) throws IOException;
-
- private static native void drain1(int fd) throws IOException;
-
- private static native void close0(int fd);
-
- static {
- IOUtil.load();
- }
}