--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/windows/classes/sun/nio/ch/Iocp.java Sun Feb 15 12:25:54 2009 +0000
@@ -0,0 +1,437 @@
+/*
+ * Copyright 2008-2009 Sun Microsystems, Inc. All Rights Reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Sun designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Sun in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ */
+
+package sun.nio.ch;
+
+import java.nio.channels.*;
+import java.nio.channels.spi.AsynchronousChannelProvider;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.FileDescriptor;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import sun.misc.Unsafe;
+
+/**
+ * Windows implementation of AsynchronousChannelGroup encapsulating an I/O
+ * completion port.
+ */
+
+class Iocp extends AsynchronousChannelGroupImpl {
+ private static final Unsafe unsafe = Unsafe.getUnsafe();
+ private static final long INVALID_HANDLE_VALUE = -1L;
+
+ // maps completion key to channel
+ private final ReadWriteLock keyToChannelLock = new ReentrantReadWriteLock();
+ private final Map<Integer,OverlappedChannel> keyToChannel =
+ new HashMap<Integer,OverlappedChannel>();
+ private int nextCompletionKey;
+
+ // handle to completion port
+ private final long port;
+
+ // true if port has been closed
+ private boolean closed;
+
+ // the set of "stale" OVERLAPPED structures. These OVERLAPPED structures
+ // relate to I/O operations where the completion notification was not
+ // received in a timely manner after the channel is closed.
+ private final Set<Long> staleIoSet = new HashSet<Long>();
+
+ Iocp(AsynchronousChannelProvider provider, ThreadPool pool)
+ throws IOException
+ {
+ super(provider, pool);
+ this.port =
+ createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, fixedThreadCount());
+ this.nextCompletionKey = 1;
+ }
+
+ Iocp start() {
+ startThreads(new EventHandlerTask());
+ return this;
+ }
+
+ /*
+ * Channels implements this interface support overlapped I/O and can be
+ * associated with a completion port.
+ */
+ static interface OverlappedChannel extends Closeable {
+ /**
+ * Returns a reference to the pending I/O result.
+ */
+ <V,A> PendingFuture<V,A> getByOverlapped(long overlapped);
+ }
+
+ // release all resources
+ void implClose() {
+ synchronized (this) {
+ if (closed)
+ return;
+ closed = true;
+ }
+ close0(port);
+ synchronized (staleIoSet) {
+ for (Long ov: staleIoSet) {
+ unsafe.freeMemory(ov);
+ }
+ staleIoSet.clear();
+ }
+ }
+
+ @Override
+ boolean isEmpty() {
+ keyToChannelLock.writeLock().lock();
+ try {
+ return keyToChannel.isEmpty();
+ } finally {
+ keyToChannelLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ final Object attachForeignChannel(final Channel channel, FileDescriptor fdObj)
+ throws IOException
+ {
+ int key = associate(new OverlappedChannel() {
+ public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
+ return null;
+ }
+ public void close() throws IOException {
+ channel.close();
+ }
+ }, 0L);
+ return Integer.valueOf(key);
+ }
+
+ @Override
+ final void detachForeignChannel(Object key) {
+ disassociate((Integer)key);
+ }
+
+ @Override
+ void closeAllChannels() {
+ /**
+ * On Windows the close operation will close the socket/file handle
+ * and then wait until all outstanding I/O operations have aborted.
+ * This is necessary as each channel's cache of OVERLAPPED structures
+ * can only be freed once all I/O operations have completed. As I/O
+ * completion requires a lookup of the keyToChannel then we must close
+ * the channels when not holding the write lock.
+ */
+ final int MAX_BATCH_SIZE = 32;
+ OverlappedChannel channels[] = new OverlappedChannel[MAX_BATCH_SIZE];
+ int count;
+ do {
+ // grab a batch of up to 32 channels
+ keyToChannelLock.writeLock().lock();
+ count = 0;
+ try {
+ for (Integer key: keyToChannel.keySet()) {
+ channels[count++] = keyToChannel.get(key);
+ if (count >= MAX_BATCH_SIZE)
+ break;
+ }
+ } finally {
+ keyToChannelLock.writeLock().unlock();
+ }
+
+ // close them
+ for (int i=0; i<count; i++) {
+ try {
+ channels[i].close();
+ } catch (IOException ignore) { }
+ }
+ } while (count > 0);
+ }
+
+ private void wakeup() {
+ try {
+ postQueuedCompletionStatus(port, 0);
+ } catch (IOException e) {
+ // should not happen
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ void executeOnHandlerTask(Runnable task) {
+ synchronized (this) {
+ if (closed)
+ throw new RejectedExecutionException();
+ offerTask(task);
+ wakeup();
+ }
+
+ }
+
+ @Override
+ void shutdownHandlerTasks() {
+ // shutdown all handler threads
+ int nThreads = threadCount();
+ while (nThreads-- > 0) {
+ wakeup();
+ }
+ }
+
+ /**
+ * Associate the given handle with this group
+ */
+ int associate(OverlappedChannel ch, long handle) throws IOException {
+ keyToChannelLock.writeLock().lock();
+
+ // generate a completion key (if not shutdown)
+ int key;
+ try {
+ if (isShutdown())
+ throw new ShutdownChannelGroupException();
+
+ // generate unique key
+ do {
+ key = nextCompletionKey++;
+ } while ((key == 0) || keyToChannel.containsKey(key));
+
+ // associate with I/O completion port
+ if (handle != 0L)
+ createIoCompletionPort(handle, port, key, 0);
+
+ // setup mapping
+ keyToChannel.put(key, ch);
+ } finally {
+ keyToChannelLock.writeLock().unlock();
+ }
+ return key;
+ }
+
+ /**
+ * Disassociate channel from the group.
+ */
+ void disassociate(int key) {
+ boolean checkForShutdown = false;
+
+ keyToChannelLock.writeLock().lock();
+ try {
+ keyToChannel.remove(key);
+
+ // last key to be removed so check if group is shutdown
+ if (keyToChannel.isEmpty())
+ checkForShutdown = true;
+
+ } finally {
+ keyToChannelLock.writeLock().unlock();
+ }
+
+ // continue shutdown
+ if (checkForShutdown && isShutdown()) {
+ try {
+ shutdownNow();
+ } catch (IOException ignore) { }
+ }
+ }
+
+ /**
+ * Invoked when a channel associated with this port is closed before
+ * notifications for all outstanding I/O operations have been received.
+ */
+ void makeStale(Long overlapped) {
+ synchronized (staleIoSet) {
+ staleIoSet.add(overlapped);
+ }
+ }
+
+ /**
+ * Checks if the given OVERLAPPED is stale and if so, releases it.
+ */
+ private void checkIfStale(long ov) {
+ synchronized (staleIoSet) {
+ boolean removed = staleIoSet.remove(ov);
+ if (removed) {
+ unsafe.freeMemory(ov);
+ }
+ }
+ }
+
+ /**
+ * The handler for consuming the result of an asynchronous I/O operation.
+ */
+ static interface ResultHandler {
+ /**
+ * Invoked if the I/O operation completes successfully.
+ */
+ public void completed(int bytesTransferred);
+
+ /**
+ * Invoked if the I/O operation fails.
+ */
+ public void failed(int error, IOException ioe);
+ }
+
+ // Creates IOException for the given I/O error.
+ private static IOException translateErrorToIOException(int error) {
+ String msg = getErrorMessage(error);
+ if (msg == null)
+ msg = "Unknown error: 0x0" + Integer.toHexString(error);
+ return new IOException(msg);
+ }
+
+ /**
+ * Long-running task servicing system-wide or per-file completion port
+ */
+ private class EventHandlerTask implements Runnable {
+ public void run() {
+ Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
+ Invoker.getGroupAndInvokeCount();
+ CompletionStatus ioResult = new CompletionStatus();
+ boolean replaceMe = false;
+
+ try {
+ for (;;) {
+ // reset invoke count
+ if (myGroupAndInvokeCount != null)
+ myGroupAndInvokeCount.resetInvokeCount();
+
+ // wait for I/O completion event
+ // A error here is fatal (thread will not be replaced)
+ replaceMe = false;
+ try {
+ getQueuedCompletionStatus(port, ioResult);
+ } catch (IOException x) {
+ // should not happen
+ x.printStackTrace();
+ return;
+ }
+
+ // handle wakeup to execute task or shutdown
+ if (ioResult.completionKey() == 0 &&
+ ioResult.overlapped() == 0L)
+ {
+ Runnable task = pollTask();
+ if (task == null) {
+ // shutdown request
+ return;
+ }
+
+ // run task
+ // (if error/exception then replace thread)
+ replaceMe = true;
+ task.run();
+ continue;
+ }
+
+ // map key to channel
+ OverlappedChannel ch = null;
+ keyToChannelLock.readLock().lock();
+ try {
+ ch = keyToChannel.get(ioResult.completionKey());
+ if (ch == null) {
+ checkIfStale(ioResult.overlapped());
+ continue;
+ }
+ } finally {
+ keyToChannelLock.readLock().unlock();
+ }
+
+ // lookup I/O request
+ PendingFuture<?,?> result = ch.getByOverlapped(ioResult.overlapped());
+ if (result == null) {
+ // we get here if the OVERLAPPED structure is associated
+ // with an I/O operation on a channel that was closed
+ // but the I/O operation event wasn't read in a timely
+ // manner. Alternatively, it may be related to a
+ // tryLock operation as the OVERLAPPED structures for
+ // these operations are not in the I/O cache.
+ checkIfStale(ioResult.overlapped());
+ continue;
+ }
+
+ // synchronize on result in case I/O completed immediately
+ // and was handled by initiator
+ synchronized (result) {
+ if (result.isDone()) {
+ continue;
+ }
+ // not handled by initiator
+ }
+
+ // invoke I/O result handler
+ int error = ioResult.error();
+ ResultHandler rh = (ResultHandler)result.getContext();
+ replaceMe = true; // (if error/exception then replace thread)
+ if (error == 0) {
+ rh.completed(ioResult.bytesTransferred());
+ } else {
+ rh.failed(error, translateErrorToIOException(error));
+ }
+ }
+ } finally {
+ // last thread to exit when shutdown releases resources
+ int remaining = threadExit(this, replaceMe);
+ if (remaining == 0 && isShutdown()) {
+ implClose();
+ }
+ }
+ }
+ }
+
+ /**
+ * Container for data returned by GetQueuedCompletionStatus
+ */
+ private static class CompletionStatus {
+ private int error;
+ private int bytesTransferred;
+ private int completionKey;
+ private long overlapped;
+
+ private CompletionStatus() { }
+ int error() { return error; }
+ int bytesTransferred() { return bytesTransferred; }
+ int completionKey() { return completionKey; }
+ long overlapped() { return overlapped; }
+ }
+
+ // -- native methods --
+
+ private static native void initIDs();
+
+ private static native long createIoCompletionPort(long handle,
+ long existingPort, int completionKey, int concurrency) throws IOException;
+
+ private static native void close0(long handle);
+
+ private static native void getQueuedCompletionStatus(long completionPort,
+ CompletionStatus status) throws IOException;
+
+ private static native void postQueuedCompletionStatus(long completionPort,
+ int completionKey) throws IOException;
+
+ private static native String getErrorMessage(int error);
+
+ static {
+ Util.load();
+ initIDs();
+ }
+}