jdk/src/windows/classes/sun/nio/ch/Iocp.java
author alanb
Sun, 15 Feb 2009 12:25:54 +0000
changeset 2057 3acf8e5e2ca0
child 3632 399359a027de
permissions -rw-r--r--
6781363: New I/O: Update socket-channel API to jsr203/nio2-b99 4313887: New I/O: Improved filesystem interface 4607272: New I/O: Support asynchronous I/O Reviewed-by: sherman, chegar

/*
 * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Sun designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Sun in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
 * CA 95054 USA or visit www.sun.com if you need additional information or
 * have any questions.
 */

package sun.nio.ch;

import java.nio.channels.*;
import java.nio.channels.spi.AsynchronousChannelProvider;
import java.io.Closeable;
import java.io.IOException;
import java.io.FileDescriptor;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import sun.misc.Unsafe;

/**
 * Windows implementation of AsynchronousChannelGroup encapsulating an I/O
 * completion port.
 */

class Iocp extends AsynchronousChannelGroupImpl {
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long INVALID_HANDLE_VALUE  = -1L;

    // maps completion key to channel
    private final ReadWriteLock keyToChannelLock = new ReentrantReadWriteLock();
    private final Map<Integer,OverlappedChannel> keyToChannel =
        new HashMap<Integer,OverlappedChannel>();
    private int nextCompletionKey;

    // handle to completion port
    private final long port;

    // true if port has been closed
    private boolean closed;

    // the set of "stale" OVERLAPPED structures. These OVERLAPPED structures
    // relate to I/O operations where the completion notification was not
    // received in a timely manner after the channel is closed.
    private final Set<Long> staleIoSet = new HashSet<Long>();

    Iocp(AsynchronousChannelProvider provider, ThreadPool pool)
        throws IOException
    {
        super(provider, pool);
        this.port =
          createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, fixedThreadCount());
        this.nextCompletionKey = 1;
    }

    Iocp start() {
        startThreads(new EventHandlerTask());
        return this;
    }

    /*
     * Channels implements this interface support overlapped I/O and can be
     * associated with a completion port.
     */
    static interface OverlappedChannel extends Closeable {
        /**
         * Returns a reference to the pending I/O result.
         */
        <V,A> PendingFuture<V,A> getByOverlapped(long overlapped);
    }

    // release all resources
    void implClose() {
        synchronized (this) {
            if (closed)
                return;
            closed = true;
        }
        close0(port);
        synchronized (staleIoSet) {
            for (Long ov: staleIoSet) {
                unsafe.freeMemory(ov);
            }
            staleIoSet.clear();
        }
    }

    @Override
    boolean isEmpty() {
        keyToChannelLock.writeLock().lock();
        try {
            return keyToChannel.isEmpty();
        } finally {
            keyToChannelLock.writeLock().unlock();
        }
    }

    @Override
    final Object attachForeignChannel(final Channel channel, FileDescriptor fdObj)
        throws IOException
    {
        int key = associate(new OverlappedChannel() {
            public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
                return null;
            }
            public void close() throws IOException {
                channel.close();
            }
        }, 0L);
        return Integer.valueOf(key);
    }

    @Override
    final void detachForeignChannel(Object key) {
        disassociate((Integer)key);
    }

    @Override
    void closeAllChannels() {
        /**
         * On Windows the close operation will close the socket/file handle
         * and then wait until all outstanding I/O operations have aborted.
         * This is necessary as each channel's cache of OVERLAPPED structures
         * can only be freed once all I/O operations have completed. As I/O
         * completion requires a lookup of the keyToChannel then we must close
         * the channels when not holding the write lock.
         */
        final int MAX_BATCH_SIZE = 32;
        OverlappedChannel channels[] = new OverlappedChannel[MAX_BATCH_SIZE];
        int count;
        do {
            // grab a batch of up to 32 channels
            keyToChannelLock.writeLock().lock();
            count = 0;
            try {
                for (Integer key: keyToChannel.keySet()) {
                    channels[count++] = keyToChannel.get(key);
                    if (count >= MAX_BATCH_SIZE)
                        break;
                }
            } finally {
                keyToChannelLock.writeLock().unlock();
            }

            // close them
            for (int i=0; i<count; i++) {
                try {
                    channels[i].close();
                } catch (IOException ignore) { }
            }
        } while (count > 0);
    }

    private void wakeup() {
        try {
            postQueuedCompletionStatus(port, 0);
        } catch (IOException e) {
            // should not happen
            throw new AssertionError(e);
        }
    }

    @Override
    void executeOnHandlerTask(Runnable task) {
        synchronized (this) {
            if (closed)
                throw new RejectedExecutionException();
            offerTask(task);
            wakeup();
        }

    }

    @Override
    void shutdownHandlerTasks() {
        // shutdown all handler threads
        int nThreads = threadCount();
        while (nThreads-- > 0) {
            wakeup();
        }
    }

    /**
     * Associate the given handle with this group
     */
    int associate(OverlappedChannel ch, long handle) throws IOException {
        keyToChannelLock.writeLock().lock();

        // generate a completion key (if not shutdown)
        int key;
        try {
            if (isShutdown())
                throw new ShutdownChannelGroupException();

            // generate unique key
            do {
                key = nextCompletionKey++;
            } while ((key == 0) || keyToChannel.containsKey(key));

            // associate with I/O completion port
            if (handle != 0L)
                createIoCompletionPort(handle, port, key, 0);

            // setup mapping
            keyToChannel.put(key, ch);
        } finally {
            keyToChannelLock.writeLock().unlock();
        }
        return key;
    }

    /**
     * Disassociate channel from the group.
     */
    void disassociate(int key) {
        boolean checkForShutdown = false;

        keyToChannelLock.writeLock().lock();
        try {
            keyToChannel.remove(key);

            // last key to be removed so check if group is shutdown
            if (keyToChannel.isEmpty())
                checkForShutdown = true;

        } finally {
            keyToChannelLock.writeLock().unlock();
        }

        // continue shutdown
        if (checkForShutdown && isShutdown()) {
            try {
                shutdownNow();
            } catch (IOException ignore) { }
        }
    }

    /**
     * Invoked when a channel associated with this port is closed before
     * notifications for all outstanding I/O operations have been received.
     */
    void makeStale(Long overlapped) {
        synchronized (staleIoSet) {
            staleIoSet.add(overlapped);
        }
    }

    /**
     * Checks if the given OVERLAPPED is stale and if so, releases it.
     */
    private void checkIfStale(long ov) {
        synchronized (staleIoSet) {
            boolean removed = staleIoSet.remove(ov);
            if (removed) {
                unsafe.freeMemory(ov);
            }
        }
    }

    /**
     * The handler for consuming the result of an asynchronous I/O operation.
     */
    static interface ResultHandler {
        /**
         * Invoked if the I/O operation completes successfully.
         */
        public void completed(int bytesTransferred);

        /**
         * Invoked if the I/O operation fails.
         */
        public void failed(int error, IOException ioe);
    }

    // Creates IOException for the given I/O error.
    private static IOException translateErrorToIOException(int error) {
        String msg = getErrorMessage(error);
        if (msg == null)
            msg = "Unknown error: 0x0" + Integer.toHexString(error);
        return new IOException(msg);
    }

    /**
     * Long-running task servicing system-wide or per-file completion port
     */
    private class EventHandlerTask implements Runnable {
        public void run() {
            Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
                Invoker.getGroupAndInvokeCount();
            CompletionStatus ioResult = new CompletionStatus();
            boolean replaceMe = false;

            try {
                for (;;) {
                    // reset invoke count
                    if (myGroupAndInvokeCount != null)
                        myGroupAndInvokeCount.resetInvokeCount();

                    // wait for I/O completion event
                    // A error here is fatal (thread will not be replaced)
                    replaceMe = false;
                    try {
                        getQueuedCompletionStatus(port, ioResult);
                    } catch (IOException x) {
                        // should not happen
                        x.printStackTrace();
                        return;
                    }

                    // handle wakeup to execute task or shutdown
                    if (ioResult.completionKey() == 0 &&
                        ioResult.overlapped() == 0L)
                    {
                        Runnable task = pollTask();
                        if (task == null) {
                            // shutdown request
                            return;
                        }

                        // run task
                        // (if error/exception then replace thread)
                        replaceMe = true;
                        task.run();
                        continue;
                    }

                    // map key to channel
                    OverlappedChannel ch = null;
                    keyToChannelLock.readLock().lock();
                    try {
                        ch = keyToChannel.get(ioResult.completionKey());
                        if (ch == null) {
                            checkIfStale(ioResult.overlapped());
                            continue;
                        }
                    } finally {
                        keyToChannelLock.readLock().unlock();
                    }

                    // lookup I/O request
                    PendingFuture<?,?> result = ch.getByOverlapped(ioResult.overlapped());
                    if (result == null) {
                        // we get here if the OVERLAPPED structure is associated
                        // with an I/O operation on a channel that was closed
                        // but the I/O operation event wasn't read in a timely
                        // manner. Alternatively, it may be related to a
                        // tryLock operation as the OVERLAPPED structures for
                        // these operations are not in the I/O cache.
                        checkIfStale(ioResult.overlapped());
                        continue;
                    }

                    // synchronize on result in case I/O completed immediately
                    // and was handled by initiator
                    synchronized (result) {
                        if (result.isDone()) {
                            continue;
                        }
                        // not handled by initiator
                    }

                    // invoke I/O result handler
                    int error = ioResult.error();
                    ResultHandler rh = (ResultHandler)result.getContext();
                    replaceMe = true; // (if error/exception then replace thread)
                    if (error == 0) {
                        rh.completed(ioResult.bytesTransferred());
                    } else {
                        rh.failed(error, translateErrorToIOException(error));
                    }
                }
            } finally {
                // last thread to exit when shutdown releases resources
                int remaining = threadExit(this, replaceMe);
                if (remaining == 0 && isShutdown()) {
                    implClose();
                }
            }
        }
    }

    /**
     * Container for data returned by GetQueuedCompletionStatus
     */
    private static class CompletionStatus {
        private int error;
        private int bytesTransferred;
        private int completionKey;
        private long overlapped;

        private CompletionStatus() { }
        int error() { return error; }
        int bytesTransferred() { return bytesTransferred; }
        int completionKey() { return completionKey; }
        long overlapped() { return overlapped; }
    }

    // -- native methods --

    private static native void initIDs();

    private static native long createIoCompletionPort(long handle,
        long existingPort, int completionKey, int concurrency) throws IOException;

    private static native void close0(long handle);

    private static native void getQueuedCompletionStatus(long completionPort,
        CompletionStatus status) throws IOException;

    private static native void postQueuedCompletionStatus(long completionPort,
        int completionKey) throws IOException;

    private static native String getErrorMessage(int error);

    static {
        Util.load();
        initIDs();
    }
}